97 lines
2.7 KiB
Go
97 lines
2.7 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/fiskerinc/cloud-services/services/optimus/handlers"
|
|
"github.com/fiskerinc/cloud-services/services/optimus/services"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/common"
|
|
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
|
|
"github.com/fiskerinc/cloud-services/pkg/kafka"
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/protobuf/proto"
|
|
"github.com/fiskerinc/cloud-services/pkg/loggerdataresp"
|
|
)
|
|
|
|
var freeze = envtool.GetEnvInt("OPTIMUS_KAFKA_BATCH_SIZE_FREEZE", 200000)
|
|
var OPTIMUS_CONCURRENCY = envtool.GetEnvInt("OPTIMUS_CONCURRENCY", 20)
|
|
|
|
// StartConsumer runs consumer and puts events into a channel for router
|
|
func StartConsumer(ctx context.Context, topics []string) {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
logger.Error().Msgf("PanicConsumer %v", err)
|
|
}
|
|
}()
|
|
|
|
events := make(chan *kafka.Message)
|
|
go routeEvents(ctx, events)
|
|
|
|
logger.Info().Msgf("consumer intialized for topic: %v", topics)
|
|
consumer, err := services.GetKafkaConsumer()
|
|
if err != nil {
|
|
panic(errors.WithStack(err))
|
|
}
|
|
for {
|
|
err = consumer.ConsumeToChannel(topics, events)
|
|
if loggerdataresp.BadDataError(err, loggerdataresp.EofErrorCheck) {
|
|
time.Sleep(500 * time.Millisecond)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func routeEvents(ctx context.Context, events chan *kafka.Message) {
|
|
for event := range events {
|
|
start := time.Now()
|
|
canData := kafka_grpc.GRPC_BatchPayload{}
|
|
vin := string(event.Key)
|
|
err := proto.Unmarshal(event.Value, &canData)
|
|
if loggerdataresp.BadDataError(errors.WithStack(err)) {
|
|
continue
|
|
}
|
|
batch, err := handlers.ProduceSignalsFromMessageBatch(vin, &canData)
|
|
if loggerdataresp.BadDataError(err) || len(batch) == 0 {
|
|
log(start, vin)
|
|
continue
|
|
}
|
|
|
|
prod, err := services.GetKafkaProducer()
|
|
if loggerdataresp.BadDataError(err, loggerdataresp.EofErrorCheck) {
|
|
continue
|
|
}
|
|
|
|
if prod.Len() > freeze {
|
|
prod.Flush(int(time.Second.Milliseconds()) * 5)
|
|
}
|
|
|
|
cansignal := common.CANSignalBatchPayload{}
|
|
grpcCanSignal := cansignal.ToGrpc(batch)
|
|
grpcData, err := proto.Marshal(grpcCanSignal)
|
|
|
|
if err != nil {
|
|
logger.Error().Str("id", vin).Err(err).Send()
|
|
continue
|
|
}
|
|
|
|
go func(prodVal kafka.ProducerInterface, vinVal string, batchVal []byte) {
|
|
err = prodVal.ProduceBinary(kafka.VehicleSignal, vinVal, batchVal, nil)
|
|
loggerdataresp.BadDataError(err)
|
|
}(prod, vin, grpcData)
|
|
|
|
log(start, vin)
|
|
}
|
|
|
|
}
|
|
|
|
func log(start time.Time, vin string) {
|
|
finish := time.Now()
|
|
logger.Debug().Str("id", vin).Msgf("vehicle batch data time elapsed %v, goroutines count: %v", finish.Sub(start), runtime.NumGoroutine())
|
|
}
|