Files
cloud-services/services/optimus/handlers/batch.go

57 lines
1.5 KiB
Go

package handlers
import (
"github.com/fiskerinc/cloud-services/services/optimus/services"
"github.com/fiskerinc/cloud-services/pkg/common"
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/fiskerinc/cloud-services/pkg/validator"
"github.com/pkg/errors"
)
func ProduceSignalsFromMessageBatch(vin string, payload *kafka_grpc.GRPC_BatchPayload) ([]common.CANSignal, error) {
var batch []common.CANSignal
if payload == nil {
return nil, errors.New("payload is nil")
}
if payload.Data == nil {
return nil, errors.New("payload.Data is nil")
}
// TODO Where to report epoch_usec, dropped, and filtered stats
dbcVersion := payload.Version
dbc, err := services.GetDBCCollection().Get(dbcVersion)
if err != nil {
return nil, err
}
// r := services.RedisClientPool().GetFromPool()
// defer r.Close()
clickClient, err := services.GetClickhouseClient()
if err != nil {
return nil, err
}
filters := services.GetVehicleMessageFilters().GetFiltersForVehicle(clickClient, vin)
for _, msg := range payload.Data.Frames {
if err := validator.ValidateStruct(msg); err != nil {
logger.Warn().Str("id", vin).Err(err).Msgf("%+v", msg)
continue
}
if !filters.AllowMessage(int(msg.ID), int(msg.GetEpoch())) {
continue
}
signals, err := dbc.GenerateCANSignals(vin, msg)
if err == nil && len(signals) > 0 {
batch = append(batch, signals...)
} else if err != nil {
logger.At(logger.Warn(), vin, "dbc").Err(err).Send()
}
}
return batch, nil
}