57 lines
1.5 KiB
Go
57 lines
1.5 KiB
Go
package handlers
|
|
|
|
import (
|
|
"github.com/fiskerinc/cloud-services/services/optimus/services"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/common"
|
|
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/fiskerinc/cloud-services/pkg/validator"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
func ProduceSignalsFromMessageBatch(vin string, payload *kafka_grpc.GRPC_BatchPayload) ([]common.CANSignal, error) {
|
|
var batch []common.CANSignal
|
|
if payload == nil {
|
|
return nil, errors.New("payload is nil")
|
|
}
|
|
if payload.Data == nil {
|
|
return nil, errors.New("payload.Data is nil")
|
|
}
|
|
// TODO Where to report epoch_usec, dropped, and filtered stats
|
|
dbcVersion := payload.Version
|
|
dbc, err := services.GetDBCCollection().Get(dbcVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// r := services.RedisClientPool().GetFromPool()
|
|
// defer r.Close()
|
|
|
|
clickClient, err := services.GetClickhouseClient()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
filters := services.GetVehicleMessageFilters().GetFiltersForVehicle(clickClient, vin)
|
|
for _, msg := range payload.Data.Frames {
|
|
if err := validator.ValidateStruct(msg); err != nil {
|
|
logger.Warn().Str("id", vin).Err(err).Msgf("%+v", msg)
|
|
continue
|
|
}
|
|
|
|
if !filters.AllowMessage(int(msg.ID), int(msg.GetEpoch())) {
|
|
continue
|
|
}
|
|
|
|
signals, err := dbc.GenerateCANSignals(vin, msg)
|
|
if err == nil && len(signals) > 0 {
|
|
batch = append(batch, signals...)
|
|
} else if err != nil {
|
|
logger.At(logger.Warn(), vin, "dbc").Err(err).Send()
|
|
}
|
|
}
|
|
|
|
return batch, nil
|
|
}
|