165 lines
4.4 KiB
Go
165 lines
4.4 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/fiskerinc/cloud-services/services/jetfire/handlers"
|
|
"github.com/fiskerinc/cloud-services/services/jetfire/models"
|
|
"github.com/fiskerinc/cloud-services/services/jetfire/services"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
|
|
"github.com/fiskerinc/cloud-services/pkg/kafka"
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/fiskerinc/cloud-services/pkg/loggerdataresp"
|
|
|
|
"github.com/intel-go/fastjson"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
// StartConsumer runs consumer and puts vehicle signals into a channel for router
|
|
func StartConsumer(ctx context.Context, topic string) {
|
|
logger.Debug().Str("StartConsumer", "").Send()
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
logger.Error().Msgf("PanicConsumer %v", err)
|
|
}
|
|
}()
|
|
|
|
producerChannel := make(chan models.InsertCommand, 100)
|
|
signalsChannel := make(chan *kafka.Message)
|
|
rebalanceChannel := make(chan struct{})
|
|
|
|
consumer, err := services.GetKafkaConsumer()
|
|
if err != nil {
|
|
logger.Fatal().Err(err)
|
|
panic(err)
|
|
}
|
|
|
|
logger.Debug().Msgf("Starting routeEvents and InserterLoop...")
|
|
|
|
go routeEvents(signalsChannel, rebalanceChannel, producerChannel)
|
|
|
|
go InserterLoop(producerChannel, ctx)
|
|
|
|
for {
|
|
logger.Debug().Msgf("Calling ConsumeOrRebalancedCatch...")
|
|
consumer.Subscribe([]string{topic})
|
|
err = consumer.ConsumeOrRebalancedCatch([]string{topic}, signalsChannel, rebalanceChannel)
|
|
if err != nil {
|
|
logger.Error().Err(err).Send()
|
|
}
|
|
if !loggerdataresp.BadDataError(err, loggerdataresp.EofErrorCheck) {
|
|
check := consumer.Check(ctx)
|
|
if check != nil {
|
|
logger.Error().Err(check).Send()
|
|
}
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
// reset the kafka consumer and gc the old one
|
|
consumer = nil
|
|
for consumer == nil || err != nil {
|
|
consumer, err = services.ResetKafkaConsumer()
|
|
if err != nil {
|
|
logger.Error().Err(err).Send()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processes signals in batch, handles caching of vehicle state and appending to insertion batch caches
|
|
func routeEvents(signalsChannel chan *kafka.Message, rebalanceChannel <-chan struct{}, producerChannel chan models.InsertCommand) {
|
|
var jsonErr error
|
|
var protoErr error
|
|
var batchData []*kafka_grpc.GRPC_CANSignal
|
|
|
|
initialized := false
|
|
|
|
//Consumer loop
|
|
rebalanceFlag := true
|
|
for {
|
|
select {
|
|
case signal := <-signalsChannel:
|
|
if signal == nil {
|
|
continue
|
|
}
|
|
rebalanceFlag = true
|
|
|
|
if !initialized {
|
|
// init cache after rebalancing
|
|
logger.Debug().Msgf("INITIALIZING CACHE...")
|
|
time.Sleep(time.Second)
|
|
services.InitCacheFromClickhouse()
|
|
|
|
initialized = true
|
|
logger.Debug().Msgf("INITIALIZED...")
|
|
}
|
|
|
|
batchData, protoErr = unmarshalProtobufSignal(signal)
|
|
if protoErr != nil {
|
|
batchData, jsonErr = unmarshalJSONSignal(signal)
|
|
if jsonErr != nil {
|
|
logger.Error().Err(protoErr).Msg("Failed to unmarshal signal as either Protobuf or JSON")
|
|
continue
|
|
}
|
|
}
|
|
|
|
_, err := handlers.HandleSignalBatch(batchData, services.GetVehicleCache(), producerChannel)
|
|
if err != nil {
|
|
logger.Error().Err(errors.WithStack(err)).Send()
|
|
}
|
|
|
|
case <-rebalanceChannel:
|
|
if rebalanceFlag {
|
|
rebalanceFlag = false
|
|
|
|
logger.Info().Msgf("kafka rebalancing...")
|
|
initialized = false
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func unmarshalProtobufSignal(event *kafka.Message) ([]*kafka_grpc.GRPC_CANSignal, error) {
|
|
if event == nil {
|
|
err := errors.Errorf("trying to unmarshall null event ptr")
|
|
return nil, err
|
|
}
|
|
batchData := kafka_grpc.GRPC_CANSignalBatchPayload{}
|
|
err := proto.Unmarshal(event.Value, &batchData)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return batchData.Data.Cansignals, nil
|
|
}
|
|
|
|
func unmarshalJSONSignal(event *kafka.Message) ([]*kafka_grpc.GRPC_CANSignal, error) {
|
|
if event == nil {
|
|
err := errors.Errorf("trying to unmarshall null event ptr")
|
|
return nil, err
|
|
}
|
|
//JSON handling is generally slower;
|
|
//not only is payload much larger but character insertions to fix json format from optimus is very slow.
|
|
|
|
batchData := []kafka_grpc.GRPC_CANSignal{}
|
|
dataBuffer := make([]byte, len(event.Value)+2)
|
|
copy(dataBuffer[1:], event.Value)
|
|
dataBuffer[0] = '['
|
|
dataBuffer[len(event.Value)+1] = ']'
|
|
|
|
err := fastjson.Unmarshal(dataBuffer, &batchData)
|
|
if err != nil {
|
|
logger.Error().Err(err).Send()
|
|
return nil, err
|
|
}
|
|
|
|
ptrs := make([]*kafka_grpc.GRPC_CANSignal, len(batchData))
|
|
for i := range batchData {
|
|
ptrs[i] = &batchData[i]
|
|
}
|
|
return ptrs, nil
|
|
}
|