934 lines
25 KiB
Go
934 lines
25 KiB
Go
package models
|
|
|
|
import (
|
|
"context"
|
|
"github.com/fiskerinc/cloud-services/services/jetfire/utils"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
|
|
"github.com/ClickHouse/ch-go"
|
|
"github.com/ClickHouse/ch-go/proto"
|
|
"github.com/pkg/errors"
|
|
"github.com/rs/zerolog"
|
|
"github.com/sony/gobreaker"
|
|
)
|
|
|
|
type InsertBlockType int
|
|
|
|
const (
|
|
NilBlockType InsertBlockType = -1 //insert buffers using this block type will have a nil block.
|
|
|
|
PivotBlockType InsertBlockType = iota
|
|
SignalBlockType
|
|
VinLastBlockType
|
|
)
|
|
|
|
var (
|
|
initialBlockPoolSize = envtool.GetEnvInt("JETFIRE_MIN_BLOCKS", 2)
|
|
maxBlockPoolSize = envtool.GetEnvInt("JETFIRE_MAX_BLOCKS", 4)
|
|
maxBufferBytes = envtool.GetEnvInt("JETFIRE_BUFFER_MAX_BYTES", 128<<20) / maxBlockPoolSize
|
|
)
|
|
|
|
const NO_TIMESTAMP_LOG_SAMPLE_RATE = 10000
|
|
var logSampler zerolog.Logger
|
|
// Creating a log sampler
|
|
func init(){
|
|
logSampler = logger.Sample(zerolog.RandomSampler(NO_TIMESTAMP_LOG_SAMPLE_RATE))
|
|
}
|
|
|
|
// InsertBuffer abstracts buffer appends and insertions to clickhouse, even for different underlying schemas.
|
|
// A pool of blocks are allocated for each InsertBuffer.
|
|
//
|
|
// These pools do not implement leaky buffer pattern to ensure data order and timeliness.
|
|
type InsertBuffer struct {
|
|
InsertTime time.Time
|
|
insertDelay time.Duration
|
|
|
|
blockPool []insertBlock //pool of all blocks, including busy blocks.
|
|
|
|
//linked list of available blocks in the pool
|
|
freeLock sync.Mutex //mutex for linked list
|
|
freeHead *BlockNode
|
|
freeTail *BlockNode
|
|
|
|
blockType InsertBlockType
|
|
tripInfo bool
|
|
|
|
TableName string
|
|
}
|
|
|
|
// linked list node for insertBlock
|
|
type BlockNode struct {
|
|
block insertBlock
|
|
next *BlockNode
|
|
}
|
|
|
|
// Command struct for inserter goroutine
|
|
type InsertCommand struct {
|
|
Buffer *InsertBuffer
|
|
Block insertBlock
|
|
}
|
|
|
|
func NewInsertBuffer(tripInfo bool, signalNames []string, tableName string, insertDelay time.Duration, blockType InsertBlockType) *InsertBuffer {
|
|
newBlockPool := []insertBlock{}
|
|
|
|
for i := 0; i < initialBlockPoolSize; i++ {
|
|
var newBlock insertBlock = AllocateNewBlock(signalNames, maxBufferBytes, blockType, tripInfo)
|
|
|
|
if newBlock != nil {
|
|
newBlockPool = append(newBlockPool, newBlock)
|
|
}
|
|
}
|
|
|
|
newBuffer := new(InsertBuffer)
|
|
newBuffer.insertDelay = insertDelay
|
|
newBuffer.blockPool = newBlockPool
|
|
newBuffer.TableName = tableName
|
|
newBuffer.blockType = blockType
|
|
newBuffer.tripInfo = tripInfo
|
|
|
|
newBuffer.InitFreeBlocksList()
|
|
|
|
return newBuffer
|
|
}
|
|
|
|
// memory block linked list functions. These are for available blocks that are ready to accept data.
|
|
func (buffer *InsertBuffer) AppendFreeBlock(block insertBlock) {
|
|
buffer.freeLock.Lock()
|
|
defer buffer.freeLock.Unlock()
|
|
|
|
if buffer.freeHead == nil {
|
|
buffer.freeHead = &BlockNode{block: block}
|
|
|
|
buffer.freeTail = buffer.freeHead
|
|
return
|
|
}
|
|
if buffer.freeTail == nil { //this should never occur!
|
|
err := errors.Errorf("Unexpected nil tail for InsertBuffer blocks with non nil head!")
|
|
logger.Error().Err(err).Send()
|
|
return
|
|
}
|
|
|
|
buffer.freeTail.next = &BlockNode{block: block}
|
|
buffer.freeTail = buffer.freeTail.next
|
|
}
|
|
|
|
func (buffer *InsertBuffer) PopFreeBlock() insertBlock {
|
|
buffer.freeLock.Lock()
|
|
defer buffer.freeLock.Unlock()
|
|
|
|
blockNode := buffer.freeHead
|
|
if blockNode == buffer.freeTail {
|
|
buffer.freeTail = nil
|
|
}
|
|
if blockNode == nil {
|
|
return nil
|
|
}
|
|
|
|
buffer.freeHead = blockNode.next
|
|
return blockNode.block
|
|
}
|
|
|
|
func (buffer *InsertBuffer) PeekFreeBlock() insertBlock {
|
|
buffer.freeLock.Lock()
|
|
defer buffer.freeLock.Unlock()
|
|
|
|
blockNode := buffer.freeHead
|
|
if blockNode == nil {
|
|
return nil
|
|
}
|
|
return blockNode.block
|
|
}
|
|
|
|
// allocates a new block with given params and returns it
|
|
func AllocateNewBlock(signalNames []string, maxBufferBytes int, blockType InsertBlockType, tripInfo bool) insertBlock {
|
|
var newBlock insertBlock = nil
|
|
if blockType == PivotBlockType {
|
|
newBlock = NewProtoPivotBlock(signalNames, maxBufferBytes, tripInfo)
|
|
} else if blockType == SignalBlockType {
|
|
newBlock = NewProtoSignalBlock(signalNames, maxBufferBytes)
|
|
} else if blockType == VinLastBlockType {
|
|
newBlock = NewProtoVinLastBlock(signalNames, maxVinCount, tripInfo)
|
|
}
|
|
|
|
return newBlock
|
|
}
|
|
|
|
// appends a row to the buffer.
|
|
// Row must match the expected type by the underlying proto block.
|
|
func (buffer *InsertBuffer) AppendRow(signalNames []string, row interface{}, producerChannel chan InsertCommand) error {
|
|
block := buffer.PeekFreeBlock()
|
|
loggedWaiting := false
|
|
|
|
for block != nil && block.IsFull() {
|
|
//if the block is full before append, then pop it and put it on the producer channel.
|
|
buffer.ProduceBlock(producerChannel)
|
|
block = buffer.PeekFreeBlock()
|
|
}
|
|
|
|
for block == nil {
|
|
//no more free blocks are available...
|
|
// if the buffer is able to allocate more blocks, then do so. otherwise, wait for a block...
|
|
if len(buffer.blockPool) == maxBlockPoolSize {
|
|
if !loggedWaiting {
|
|
loggedWaiting = true
|
|
logger.Warn().Msgf("no available %s blocks for appending, waiting for available block...", buffer.TableName)
|
|
}
|
|
//wait
|
|
time.Sleep(100 * time.Millisecond)
|
|
block = buffer.PeekFreeBlock()
|
|
} else {
|
|
logger.Info().Msgf("%s no available block, allocating new block", buffer.TableName)
|
|
block = AllocateNewBlock(signalNames, maxBufferBytes, buffer.blockType, buffer.tripInfo)
|
|
buffer.blockPool = append(buffer.blockPool, block)
|
|
buffer.AppendFreeBlock(block)
|
|
}
|
|
}
|
|
|
|
err := block.AppendRow(signalNames, row)
|
|
|
|
//if block is full after append, pop it from linked list and put onto producer channel
|
|
if block.IsFull() || block.TimeSinceThreshold(buffer.insertDelay) {
|
|
buffer.ProduceBlock(producerChannel)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (buffer *InsertBuffer) ProduceBlock(producerChannel chan InsertCommand) {
|
|
if producerChannel != nil {
|
|
logger.Debug().Msgf("ProduceBlock...")
|
|
producerChannel <- InsertCommand{
|
|
Buffer: buffer,
|
|
Block: buffer.PopFreeBlock(),
|
|
}
|
|
}
|
|
}
|
|
|
|
// searches blockpool for nonEmpty blocks
|
|
func (buffer *InsertBuffer) getNonEmptyBlock() insertBlock {
|
|
for _, block := range buffer.blockPool {
|
|
if block.Len() > 0 {
|
|
return block
|
|
}
|
|
}
|
|
return buffer.blockPool[0]
|
|
}
|
|
|
|
// returns any nonempty input from buffer
|
|
func (buffer *InsertBuffer) GetInput() proto.Input {
|
|
return buffer.getNonEmptyBlock().GetProtoInput(true, nil)
|
|
}
|
|
|
|
// reinitializes the blocks list, assuming any empty block in the pool is free and can be added to list.
|
|
// returns size of blocks list
|
|
func (buffer *InsertBuffer) InitFreeBlocksList() int {
|
|
buffer.freeHead = nil
|
|
buffer.freeTail = nil
|
|
count := 0
|
|
for _, block := range buffer.blockPool {
|
|
if block.Len() != 0 {
|
|
continue
|
|
}
|
|
buffer.AppendFreeBlock(block)
|
|
count++
|
|
}
|
|
return count
|
|
}
|
|
|
|
// Inserts and flushes only the head block.
|
|
func InsertAndFlushHead(buffer *InsertBuffer, ctx context.Context, conn *ch.Client, breaker *gobreaker.CircuitBreaker, signalsSet map[string]bool) (int, error) {
|
|
block := buffer.PopFreeBlock()
|
|
|
|
if buffer == nil || block == nil {
|
|
return 0, errors.Errorf("attempted to insert with nil buffer")
|
|
}
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
|
|
rows := block.Len()
|
|
protoInput := block.GetProtoInput(false, signalsSet)
|
|
if protoInput == nil {
|
|
// empty buffer, just return
|
|
return 0, nil
|
|
}
|
|
queryContext, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
defer cancel()
|
|
|
|
insertFunc := func() (interface{}, error) {
|
|
return nil, conn.Do(queryContext, ch.Query{
|
|
Body: protoInput.Into(buffer.TableName),
|
|
Input: protoInput,
|
|
})
|
|
}
|
|
_, err := breaker.Execute(insertFunc)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
block.Flush(false)
|
|
buffer.AppendFreeBlock(block)
|
|
|
|
return rows, err
|
|
}
|
|
|
|
// Inserts a block and flushes it, does not readd to free blocks list if error occurred
|
|
func InsertAndFlush(command InsertCommand, ctx context.Context, conn *ch.Client, breaker *gobreaker.CircuitBreaker, signalsSet map[string]bool) (int, error) {
|
|
buffer := command.Buffer
|
|
block := command.Block
|
|
|
|
if buffer == nil || block == nil {
|
|
return 0, errors.Errorf("attempted to insert with nil buffer")
|
|
}
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
|
|
rows := block.Len()
|
|
protoInput := block.GetProtoInput(false, signalsSet)
|
|
if protoInput == nil {
|
|
// empty buffer, just return
|
|
return 0, nil
|
|
}
|
|
queryContext, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
|
defer cancel()
|
|
|
|
logger.Debug().Msgf("Inserting %d rows to %s...", rows, conn.ServerInfo().DisplayName)
|
|
|
|
insertFunc := func() (interface{}, error) {
|
|
return nil, conn.Do(queryContext, ch.Query{
|
|
Body: protoInput.Into(buffer.TableName),
|
|
Input: protoInput,
|
|
})
|
|
}
|
|
_, err := breaker.Execute(insertFunc)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
block.Flush(false)
|
|
buffer.AppendFreeBlock(block)
|
|
|
|
return rows, err
|
|
}
|
|
|
|
func (buffer *InsertBuffer) Len() int {
|
|
length := 0
|
|
for _, block := range buffer.blockPool {
|
|
length += block.Len()
|
|
}
|
|
return length
|
|
}
|
|
|
|
func (buffer *InsertBuffer) Cap() int {
|
|
length := 0
|
|
for _, block := range buffer.blockPool {
|
|
length += block.Cap()
|
|
}
|
|
return length
|
|
}
|
|
|
|
func (buffer *InsertBuffer) Resize(signals []string) {
|
|
for _, block := range buffer.blockPool {
|
|
block.Resize(signals, maxBufferBytes)
|
|
}
|
|
}
|
|
|
|
type insertBlock interface {
|
|
AppendRow(signalNames []string, row interface{}) error
|
|
GetProtoInput(useLock bool, signalsSet map[string]bool) proto.Input
|
|
Flush(useLock bool)
|
|
IsFull() bool
|
|
GetInsertBlockType() InsertBlockType
|
|
Resize(signals []string, maxBytes int)
|
|
Lock()
|
|
Unlock()
|
|
Len() int
|
|
Cap() int
|
|
IsBusy() bool
|
|
TimeSinceThreshold(time.Duration) bool
|
|
}
|
|
|
|
type protoPivotBlock struct {
|
|
lock sync.Mutex
|
|
busy bool
|
|
|
|
length int
|
|
capacity int
|
|
|
|
maxBytes int
|
|
|
|
vin proto.ColStr
|
|
timestamp proto.ColDateTime64
|
|
tripStart proto.ColDateTime64
|
|
tripID proto.ColStr
|
|
data []proto.ColFloat64
|
|
|
|
startTime *time.Time
|
|
|
|
columnNames []string
|
|
|
|
appendTripInfo bool
|
|
}
|
|
|
|
// block type with issue
|
|
func NewProtoPivotBlock(signalNames []string, maxBufferBytes int, tripInfo bool) *protoPivotBlock {
|
|
newBlock := protoPivotBlock{
|
|
columnNames: signalNames,
|
|
appendTripInfo: tripInfo,
|
|
}
|
|
logger.Debug().Msgf("NEW PIVOT BLOCK: %d %d", len(signalNames), maxBufferBytes)
|
|
newBlock.Resize(signalNames, maxBufferBytes)
|
|
return &newBlock
|
|
}
|
|
|
|
func (block *protoPivotBlock) Lock() {
|
|
block.lock.Lock()
|
|
block.busy = true
|
|
}
|
|
|
|
func (block *protoPivotBlock) Unlock() {
|
|
block.lock.Unlock()
|
|
block.busy = false
|
|
}
|
|
|
|
func (block *protoPivotBlock) IsBusy() bool {
|
|
return block.busy
|
|
}
|
|
|
|
func (block *protoPivotBlock) Len() int {
|
|
return block.length
|
|
}
|
|
|
|
func (block *protoPivotBlock) Cap() int {
|
|
return block.capacity
|
|
}
|
|
|
|
func (block *protoPivotBlock) TimeSinceThreshold(threshold time.Duration) bool {
|
|
return block.startTime == nil || time.Since(*block.startTime) > threshold
|
|
}
|
|
|
|
// Resizes this pivotBlock to the desired number of signal columns, and scales rows to not exceed maxBytes
|
|
func (block *protoPivotBlock) Resize(signals []string, maxBytes int) {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
|
|
if len(signals) == len(block.columnNames) && maxBytes == block.maxBytes {
|
|
block.Flush(false)
|
|
return
|
|
}
|
|
|
|
oldCapacity := block.capacity
|
|
oldWidth := len(block.columnNames)
|
|
|
|
block.columnNames = signals
|
|
block.maxBytes = maxBytes
|
|
|
|
block.capacity = block.estimateMaxRows(len(signals), maxBytes)
|
|
block.length = 0
|
|
|
|
logger.Debug().Msgf("resizing block to %d rows, %d columns", block.capacity, len(signals))
|
|
|
|
//reallocate memory only if block dimensions have changed
|
|
if oldCapacity != block.capacity || oldWidth != len(signals) {
|
|
block.vin.Buf = make([]byte, 0, utils.MaxVinLength*block.capacity)
|
|
block.vin.Pos = make([]proto.Position, 0, block.capacity)
|
|
|
|
block.timestamp.Data = make([]proto.DateTime64, 0, block.capacity)
|
|
block.tripStart.Data = make([]proto.DateTime64, 0, block.capacity)
|
|
block.timestamp.WithPrecision(proto.PrecisionNano)
|
|
block.tripStart.WithPrecision(proto.PrecisionNano)
|
|
|
|
block.tripID.Buf = make([]byte, 0, (utils.MaxVinLength+utils.MaxTimestampLength+1)*block.capacity)
|
|
block.tripID.Pos = make([]proto.Position, 0, block.capacity)
|
|
|
|
block.data = make([]proto.ColFloat64, len(signals))
|
|
for i := range block.data {
|
|
block.data[i] = make([]float64, 0, block.capacity)
|
|
}
|
|
}
|
|
|
|
block.Flush(false)
|
|
}
|
|
|
|
func (block *protoPivotBlock) GetInsertBlockType() InsertBlockType {
|
|
return PivotBlockType
|
|
}
|
|
|
|
func (block *protoPivotBlock) IsFull() bool {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
return block.length >= block.capacity
|
|
}
|
|
|
|
// Given upper bound for bytes, estimate the maximum number of rows to allocate
|
|
func (block *protoPivotBlock) estimateMaxRows(numDataColumns int, maxBytes int) int {
|
|
rowBytes := 20 + 2 + 8 + 8*numDataColumns //vin, timestamp, data columns per row
|
|
if block.appendTripInfo {
|
|
rowBytes += 8 + 44 + 2 //tripstart and tripid
|
|
}
|
|
return maxBytes / rowBytes
|
|
}
|
|
|
|
// Appends a VehicleState as a row to this Block.
|
|
func (block *protoPivotBlock) AppendRow(signalNames []string, row interface{}) error {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
|
|
if block.length >= block.capacity {
|
|
return errors.WithStack(utils.ErrInsertFullBlock)
|
|
}
|
|
if len(signalNames) != len(block.data) {
|
|
return errors.WithStack(utils.ErrInsertWrongColumns)
|
|
}
|
|
state, valid := row.(*VehicleState)
|
|
if !valid {
|
|
return errors.WithStack(utils.ErrInvalidAppendType)
|
|
}
|
|
block.length++
|
|
|
|
if block.startTime == nil {
|
|
time := time.Now()
|
|
block.startTime = &time
|
|
}
|
|
|
|
block.vin.Append(state.VIN)
|
|
block.timestamp.Append(state.Timestamp)
|
|
if block.appendTripInfo {
|
|
block.tripStart.Append(state.TripStart)
|
|
block.tripID.Append(state.TripID)
|
|
}
|
|
|
|
// 3 hour leeway
|
|
// Saw a lot of signals with a delay around 3 hours, going to now only check for signals over a day old
|
|
catchTime := state.Timestamp.Add(-time.Hour * 24)
|
|
for i, signal := range signalNames {
|
|
value, valid := state.StateValues[signal]
|
|
if !valid {
|
|
value = math.NaN()
|
|
} else {
|
|
// If the signal is not included, I'm not going to check its timing.
|
|
// should no linger see the !ok case
|
|
// Block that should have my issue
|
|
signalTime, ok := state.StateTimes[signal]
|
|
if !ok {
|
|
// So a lot of signals do not have a timestamp on them, not sure why
|
|
logSampler.Warn().Str("Location", "protoPivotBlock").Str("VIN", state.VIN).Str("Signal", signal).
|
|
Str("Location", "protoPivotBlock").Float64("Value", value).Int("Sample Rate", NO_TIMESTAMP_LOG_SAMPLE_RATE).Msg("AppendRow No Timestamp")
|
|
} else {
|
|
// If the signal is from 3 hours before the suggested time of the signal
|
|
if signalTime.Before(catchTime) {
|
|
logger.Warn().Str("VIN", state.VIN).
|
|
Str("Signal", signal).
|
|
Float64("Value", value).
|
|
Time("timestamp.state", state.Timestamp).
|
|
Time("timestamp.signal", signalTime).
|
|
Str("Location", "protoPivotBlock").
|
|
Msg("AppendRow Timestamp Old")
|
|
}
|
|
}
|
|
}
|
|
|
|
block.data[i].Append(value)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Clears and Resets the buffers in the Block.
|
|
func (block *protoPivotBlock) Flush(useLock bool) {
|
|
if useLock {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
}
|
|
block.length = 0
|
|
block.startTime = nil
|
|
|
|
block.vin.Reset()
|
|
block.timestamp.Reset()
|
|
block.tripStart.Reset()
|
|
block.tripID.Reset()
|
|
|
|
for i := range block.data {
|
|
block.data[i].Reset()
|
|
}
|
|
|
|
}
|
|
|
|
// Gets protocol input struct for ch-go batch insertion
|
|
func (block *protoPivotBlock) GetProtoInput(useLock bool, signalsSet map[string]bool) proto.Input {
|
|
if useLock {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
}
|
|
|
|
if block.length == 0 {
|
|
return nil
|
|
}
|
|
|
|
input := proto.Input{
|
|
{Name: "VIN", Data: block.vin},
|
|
{Name: "Timestamp", Data: block.timestamp},
|
|
}
|
|
|
|
if block.appendTripInfo {
|
|
input = append(input, proto.InputColumn{Name: "TripStart", Data: block.tripStart})
|
|
input = append(input, proto.InputColumn{Name: "TripID", Data: block.tripID})
|
|
}
|
|
|
|
for i := range block.data {
|
|
columnName := block.columnNames[i]
|
|
ok := true
|
|
if len(signalsSet) > 0 {
|
|
_, ok = signalsSet[columnName]
|
|
}
|
|
if !ok {
|
|
continue
|
|
}
|
|
input = append(input, proto.InputColumn{Name: columnName, Data: block.data[i]})
|
|
}
|
|
return input
|
|
}
|
|
|
|
// Block struct for feature_table_last type of schema
|
|
// This block aggregates only 1 row per VIN. vinIndexMap maps the VIN to a row index.
|
|
type protoVinLastBlock struct {
|
|
protoPivotBlock
|
|
vinIndexMap map[string]int
|
|
}
|
|
|
|
func NewProtoVinLastBlock(signalNames []string, numVINs int, tripInfo bool) *protoVinLastBlock {
|
|
newBlock := protoVinLastBlock{}
|
|
newBlock.columnNames = append(newBlock.columnNames, signalNames...)
|
|
newBlock.vinIndexMap = make(map[string]int)
|
|
newBlock.appendTripInfo = tripInfo
|
|
|
|
logger.Debug().Msgf("NEW VIN LAST BLOCK: %d %d", len(signalNames), maxBufferBytes)
|
|
|
|
bytesPerVIN := 20 + 16 + 16 + 20 //VIN (str), Timestamp, TripStart (timestamp), TripID (str)
|
|
bytesPerVIN += 8 * len(signalNames)
|
|
|
|
newBlock.Resize(signalNames, bytesPerVIN*numVINs)
|
|
return &newBlock
|
|
}
|
|
|
|
func (block *protoVinLastBlock) Flush(useLock bool) {
|
|
if useLock {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
}
|
|
block.protoPivotBlock.Flush(false)
|
|
|
|
block.vinIndexMap = make(map[string]int)
|
|
}
|
|
|
|
// Appends a VehicleState as a row to this Block.
|
|
// Investigate this code vs protoPivotBlock. WHy the differences, why two of the same thing?
|
|
func (block *protoVinLastBlock) AppendRow(signalNames []string, row interface{}) error {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
|
|
if block.length >= block.capacity {
|
|
return errors.WithStack(utils.ErrInsertFullBlock)
|
|
}
|
|
if len(signalNames) != len(block.data) {
|
|
return errors.WithStack(utils.ErrInsertWrongColumns)
|
|
}
|
|
state, valid := row.(*VehicleState)
|
|
if !valid {
|
|
return errors.WithStack(utils.ErrInvalidAppendType)
|
|
}
|
|
|
|
vinIndex, ok := block.vinIndexMap[state.VIN]
|
|
|
|
if block.startTime == nil {
|
|
time := time.Now()
|
|
block.startTime = &time
|
|
}
|
|
|
|
if !ok {
|
|
// append state to the end of the buffer
|
|
block.length++
|
|
|
|
block.vin.Append(state.VIN)
|
|
block.timestamp.Append(state.Timestamp)
|
|
if block.appendTripInfo {
|
|
block.tripStart.Append(state.TripStart)
|
|
block.tripID.Append(state.TripID)
|
|
}
|
|
|
|
// catchTime := state.Timestamp.Add(-time.Hour * 3)
|
|
for i, signal := range signalNames {
|
|
value, valid := state.StateValues[signal]
|
|
if !valid {
|
|
value = math.NaN()
|
|
} else {
|
|
// If the signal is not included, I'm not going to check its timing.
|
|
// should no linger see the !ok case
|
|
// signalTime, ok := state.StateTimes[signal]
|
|
// if !ok {
|
|
// logger.Warn().Str("Location", "protoVinLastBlock, !ok").Msgf("AppendRow no timestamp for %s", signal)
|
|
// } else {
|
|
// // If the signal is from 3 hours before the suggested time of the signal
|
|
// if signalTime.Before(catchTime) {
|
|
// logger.Warn().Str("VIN", state.VIN).
|
|
// Str("Signal", signal).
|
|
// Time("timestamp.state", state.Timestamp).
|
|
// Time("timestamp.signal", signalTime).
|
|
// Str("Location", "protoVinLastBlock, !ok").
|
|
// Msg("AppendRow Timestamp Old")
|
|
// }
|
|
// }
|
|
}
|
|
|
|
block.data[i].Append(value)
|
|
}
|
|
|
|
block.vinIndexMap[state.VIN] = block.length - 1
|
|
} else {
|
|
// override only the row mapped to the vin
|
|
block.timestamp.Data[vinIndex] = proto.ToDateTime64(state.Timestamp, block.timestamp.Precision)
|
|
block.tripStart.Data[vinIndex] = proto.ToDateTime64(state.TripStart, block.timestamp.Precision)
|
|
SetColStr(&block.tripID, state.TripID, vinIndex)
|
|
|
|
//catchTime := state.Timestamp.Add(-time.Hour * 3)
|
|
for i, signal := range signalNames {
|
|
value, valid := state.StateValues[signal]
|
|
if !valid {
|
|
value = math.NaN()
|
|
} else {
|
|
// If the signal is not included, I'm not going to check its timing.
|
|
// should no linger see the !ok case
|
|
// signalTime, ok := state.StateTimes[signal]
|
|
// if !ok {
|
|
// logger.Warn().Str("Location", "protoVinLastBlock, ok").Msgf("AppendRow no timestamp for %s", signal)
|
|
// } else {
|
|
// // If the signal is from 3 hours before the suggested time of the signal
|
|
// if signalTime.Before(catchTime) {
|
|
// logger.Warn().Str("VIN", state.VIN).
|
|
// Str("Signal", signal).
|
|
// Time("timestamp.state", state.Timestamp).
|
|
// Time("timestamp.signal", signalTime).
|
|
// Str("Location", "protoVinLastBlock, ok").
|
|
// Msg("AppendRow Timestamp Old")
|
|
// }
|
|
// }
|
|
}
|
|
|
|
block.data[i][vinIndex] = value
|
|
}
|
|
}
|
|
return nil
|
|
|
|
}
|
|
|
|
// Block struct for vehicle_signal type of schema
|
|
type protoSignalBlock struct {
|
|
lock sync.Mutex
|
|
busy bool
|
|
|
|
length int
|
|
capacity int
|
|
|
|
maxBytes int
|
|
|
|
startTime *time.Time
|
|
|
|
vin proto.ColStr
|
|
timestamp proto.ColDateTime64
|
|
id proto.ColInt16
|
|
name proto.ColStr
|
|
value proto.ColFloat64
|
|
}
|
|
|
|
func NewProtoSignalBlock(signalNames []string, maxBufferBytes int) *protoSignalBlock {
|
|
newBlock := protoSignalBlock{}
|
|
newBlock.Resize(signalNames, maxBufferBytes)
|
|
return &newBlock
|
|
}
|
|
|
|
func (block *protoSignalBlock) Lock() {
|
|
block.lock.Lock()
|
|
block.busy = true
|
|
}
|
|
|
|
func (block *protoSignalBlock) Unlock() {
|
|
block.lock.Unlock()
|
|
block.busy = false
|
|
}
|
|
|
|
func (block *protoSignalBlock) IsBusy() bool {
|
|
return block.busy
|
|
}
|
|
|
|
func (block *protoSignalBlock) Len() int {
|
|
return block.length
|
|
}
|
|
|
|
func (block *protoSignalBlock) Cap() int {
|
|
return block.capacity
|
|
}
|
|
|
|
func (block *protoSignalBlock) TimeSinceThreshold(threshold time.Duration) bool {
|
|
return block.startTime == nil || time.Since(*block.startTime) > threshold
|
|
}
|
|
|
|
// Resizes allocated memory for the Block, given maxBytes as the upper bound for memory size
|
|
func (block *protoSignalBlock) Resize(signals []string, maxBytes int) {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
|
|
if maxBytes == block.maxBytes {
|
|
block.Flush(false)
|
|
return
|
|
}
|
|
|
|
oldCapacity := block.capacity
|
|
|
|
block.maxBytes = maxBytes
|
|
block.capacity = block.estimateMaxRows(maxBytes)
|
|
|
|
logger.Debug().Msgf("resizing block to %d rows", block.capacity)
|
|
|
|
//reallocate memory only if block dimensions have changed
|
|
if oldCapacity != block.capacity {
|
|
block.vin.Buf = make([]byte, 0, utils.MaxVinLength*block.capacity)
|
|
block.vin.Pos = make([]proto.Position, 0, block.capacity)
|
|
|
|
block.timestamp.Data = make([]proto.DateTime64, 0, block.capacity)
|
|
block.timestamp.WithPrecision(proto.PrecisionMicro)
|
|
|
|
block.id = make([]int16, 0, block.capacity)
|
|
|
|
block.name.Buf = make([]byte, 0, 20*block.capacity)
|
|
block.name.Pos = make([]proto.Position, 0, block.capacity)
|
|
|
|
block.value = make([]float64, 0, block.capacity)
|
|
}
|
|
|
|
block.Flush(false)
|
|
}
|
|
|
|
func (block *protoSignalBlock) GetInsertBlockType() InsertBlockType {
|
|
return SignalBlockType
|
|
}
|
|
|
|
func (block *protoSignalBlock) IsFull() bool {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
return block.length >= block.capacity
|
|
}
|
|
|
|
// Given upper bound for bytes, estimate the maximum number of rows to allocate
|
|
func (block *protoSignalBlock) estimateMaxRows(maxBytes int) int {
|
|
const rowBytes int = 17 + 8 + 2 + 30 + 8 + 2 + 2 //Each row is ~65 bytes.
|
|
return maxBytes / rowBytes
|
|
}
|
|
|
|
// Clears and Resets the buffers in the Block.
|
|
func (block *protoSignalBlock) Flush(useLock bool) {
|
|
if useLock {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
}
|
|
block.vin.Reset()
|
|
block.timestamp.Reset()
|
|
block.id.Reset()
|
|
block.name.Reset()
|
|
block.value.Reset()
|
|
|
|
block.startTime = nil
|
|
block.length = 0
|
|
}
|
|
|
|
// Appends a kafka_grpc.GRPC_CANSignal to this block as a row
|
|
func (block *protoSignalBlock) AppendRow(signalNames []string, row interface{}) error {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
|
|
if block.length >= block.capacity {
|
|
return errors.WithStack(utils.ErrInsertFullBlock)
|
|
}
|
|
signal, valid := row.(*kafka_grpc.GRPC_CANSignal)
|
|
if !valid {
|
|
return errors.WithStack(utils.ErrInvalidAppendType)
|
|
|
|
}
|
|
|
|
if block.startTime == nil {
|
|
time := time.Now()
|
|
block.startTime = &time
|
|
}
|
|
|
|
timestamp := utils.FloatToTime(signal.Timestamp)
|
|
|
|
block.vin.Append(signal.Vin)
|
|
block.timestamp.Append(timestamp)
|
|
block.id.Append(int16(signal.Id))
|
|
block.name.Append(signal.Name)
|
|
block.value.Append(signal.Value)
|
|
|
|
block.length++
|
|
|
|
return nil
|
|
}
|
|
|
|
// Gets protocol input struct for ch-go batch insertion
|
|
func (block *protoSignalBlock) GetProtoInput(useLock bool, signalsSet map[string]bool) proto.Input {
|
|
if useLock {
|
|
block.Lock()
|
|
defer block.Unlock()
|
|
}
|
|
|
|
if block.length == 0 {
|
|
return nil
|
|
}
|
|
input := proto.Input{
|
|
{Name: "VIN", Data: block.vin},
|
|
{Name: "Timestamp", Data: block.timestamp},
|
|
{Name: "Name", Data: block.name},
|
|
{Name: "Value", Data: block.value},
|
|
{Name: "ID", Data: block.id},
|
|
}
|
|
return input
|
|
}
|
|
|
|
/// helper methods ///
|
|
|
|
// Sets the value of string in-place in a proto colstr.
|
|
// This will reallocate memory as needed.
|
|
func SetColStr(col *proto.ColStr, value string, index int) {
|
|
pos := col.Pos[index]
|
|
oldLen := pos.End - pos.Start
|
|
newLen := len(value)
|
|
offset := newLen - oldLen
|
|
|
|
newEnd := pos.Start + len(value)
|
|
|
|
oldBufLen := len(col.Buf)
|
|
|
|
//need to resize buffers
|
|
if offset > 0 {
|
|
// grow buffer by appending zero bytes, then truncating the length (not the cap) of slice
|
|
col.Buf = append(col.Buf, make([]byte, offset)...)[:oldBufLen]
|
|
}
|
|
|
|
// need to shift EVERYTHING after index. This can be slow.
|
|
if offset != 0 {
|
|
// copy buffer into itself with offset.
|
|
copy(col.Buf[newEnd:len(col.Buf)], col.Buf[pos.End:oldBufLen])
|
|
|
|
for i := index + 1; i < len(col.Pos); i++ {
|
|
col.Pos[i].Start += offset
|
|
col.Pos[i].End += offset
|
|
}
|
|
}
|
|
|
|
//insert
|
|
copy(col.Buf[pos.Start:newEnd], value)
|
|
col.Pos[index].End = newEnd
|
|
|
|
}
|