package server import ( "context" "time" "github.com/fiskerinc/cloud-services/services/jetfire/handlers" "github.com/fiskerinc/cloud-services/services/jetfire/models" "github.com/fiskerinc/cloud-services/services/jetfire/services" "github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc" "github.com/fiskerinc/cloud-services/pkg/kafka" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/fiskerinc/cloud-services/pkg/loggerdataresp" "github.com/intel-go/fastjson" "github.com/pkg/errors" "google.golang.org/protobuf/proto" ) // StartConsumer runs consumer and puts vehicle signals into a channel for router func StartConsumer(ctx context.Context, topic string) { logger.Debug().Str("StartConsumer", "").Send() defer func() { if err := recover(); err != nil { logger.Error().Msgf("PanicConsumer %v", err) } }() producerChannel := make(chan models.InsertCommand, 100) signalsChannel := make(chan *kafka.Message) rebalanceChannel := make(chan struct{}) consumer, err := services.GetKafkaConsumer() if err != nil { logger.Fatal().Err(err) panic(err) } logger.Debug().Msgf("Starting routeEvents and InserterLoop...") go routeEvents(signalsChannel, rebalanceChannel, producerChannel) go InserterLoop(producerChannel, ctx) for { logger.Debug().Msgf("Calling ConsumeOrRebalancedCatch...") consumer.Subscribe([]string{topic}) err = consumer.ConsumeOrRebalancedCatch([]string{topic}, signalsChannel, rebalanceChannel) if err != nil { logger.Error().Err(err).Send() } if !loggerdataresp.BadDataError(err, loggerdataresp.EofErrorCheck) { check := consumer.Check(ctx) if check != nil { logger.Error().Err(check).Send() } } time.Sleep(500 * time.Millisecond) // reset the kafka consumer and gc the old one consumer = nil for consumer == nil || err != nil { consumer, err = services.ResetKafkaConsumer() if err != nil { logger.Error().Err(err).Send() } } } } // processes signals in batch, handles caching of vehicle state and appending to insertion batch caches func routeEvents(signalsChannel chan *kafka.Message, rebalanceChannel <-chan struct{}, producerChannel chan models.InsertCommand) { var jsonErr error var protoErr error var batchData []*kafka_grpc.GRPC_CANSignal initialized := false //Consumer loop rebalanceFlag := true for { select { case signal := <-signalsChannel: if signal == nil { continue } rebalanceFlag = true if !initialized { // init cache after rebalancing logger.Debug().Msgf("INITIALIZING CACHE...") time.Sleep(time.Second) services.InitCacheFromClickhouse() initialized = true logger.Debug().Msgf("INITIALIZED...") } batchData, protoErr = unmarshalProtobufSignal(signal) if protoErr != nil { batchData, jsonErr = unmarshalJSONSignal(signal) if jsonErr != nil { logger.Error().Err(protoErr).Msg("Failed to unmarshal signal as either Protobuf or JSON") continue } } _, err := handlers.HandleSignalBatch(batchData, services.GetVehicleCache(), producerChannel) if err != nil { logger.Error().Err(errors.WithStack(err)).Send() } case <-rebalanceChannel: if rebalanceFlag { rebalanceFlag = false logger.Info().Msgf("kafka rebalancing...") initialized = false } } } } func unmarshalProtobufSignal(event *kafka.Message) ([]*kafka_grpc.GRPC_CANSignal, error) { if event == nil { err := errors.Errorf("trying to unmarshall null event ptr") return nil, err } batchData := kafka_grpc.GRPC_CANSignalBatchPayload{} err := proto.Unmarshal(event.Value, &batchData) if err != nil { return nil, err } return batchData.Data.Cansignals, nil } func unmarshalJSONSignal(event *kafka.Message) ([]*kafka_grpc.GRPC_CANSignal, error) { if event == nil { err := errors.Errorf("trying to unmarshall null event ptr") return nil, err } //JSON handling is generally slower; //not only is payload much larger but character insertions to fix json format from optimus is very slow. batchData := []kafka_grpc.GRPC_CANSignal{} dataBuffer := make([]byte, len(event.Value)+2) copy(dataBuffer[1:], event.Value) dataBuffer[0] = '[' dataBuffer[len(event.Value)+1] = ']' err := fastjson.Unmarshal(dataBuffer, &batchData) if err != nil { logger.Error().Err(err).Send() return nil, err } ptrs := make([]*kafka_grpc.GRPC_CANSignal, len(batchData)) for i := range batchData { ptrs[i] = &batchData[i] } return ptrs, nil }