Files
cloud-services/pkg/dbc/models/dbc_collection.go

192 lines
4.7 KiB
Go

package models
import (
"fmt"
"github.com/fiskerinc/cloud-services/pkg/common"
"github.com/fiskerinc/cloud-services/pkg/dbc/state"
"github.com/fiskerinc/cloud-services/pkg/digitaltwin"
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/pkg/errors"
)
func GetShortKey(key string) string {
// the version (hash) is represented as hex
// we agreed to use 2 first bytes, so we need 4 characters of hex string
const length = 4
if len(key) < length {
return key
}
return key[:length]
}
type DBCCollectionInterface interface {
Get(dbcHash string) (DBCVersionInterface, error)
AddVersion(dbcHash string, dbcVersion DBCVersionInterface) DBCCollectionInterface
ParseState(vin string, payload *kafka_grpc.GRPC_BatchPayload, cached digitaltwin.DigitalTwinCacheInterface) (map[string]interface{}, error)
GetOrigKey(short string) (string, error)
GetHashesList() []string
}
func NewDBCCollection(c map[string]DBCVersionInterface) DBCCollectionInterface {
return &DBCCollection{
collections: c,
}
}
type DBCCollection struct {
collections map[string]DBCVersionInterface
keys map[string]string
options []DBCOption
}
func (d *DBCCollection) getCollectionMap() map[string]DBCVersionInterface {
if d.collections == nil {
d.collections = map[string]DBCVersionInterface{}
}
return d.collections
}
func (d *DBCCollection) getKeyMap() map[string]string {
if d.keys == nil {
d.keys = map[string]string{}
}
return d.keys
}
func (d *DBCCollection) GetHashesList() []string {
var hashs []string
for _, hash := range d.keys {
hashs = append(hashs, hash)
}
return hashs
}
func (d *DBCCollection) Get(dbcHash string) (DBCVersionInterface, error) {
c := d.getCollectionMap()
key := GetShortKey(dbcHash)
if db, ok := c[key]; ok {
return db, nil
}
return nil, errors.Errorf("DBC %s does not exists", dbcHash)
}
func (d *DBCCollection) AddVersion(dbcHash string, dbcVersion DBCVersionInterface) DBCCollectionInterface {
c := d.getCollectionMap()
m := d.getKeyMap()
key := GetShortKey(dbcHash)
if _, ok := c[key]; ok {
panic(fmt.Sprintf("DBC %s already exists", dbcHash))
}
c[key] = dbcVersion
m[key] = dbcHash
return d
}
func (d *DBCCollection) GetOrigKey(short string) (string, error) {
m := d.getKeyMap()
short = GetShortKey(short)
if key, ok := m[short]; ok {
return key, nil
}
return "", ErrInvalidDBC(short)
}
func (d *DBCCollection) ParseState(vin string, payload *kafka_grpc.GRPC_BatchPayload, cached digitaltwin.DigitalTwinCacheInterface) (map[string]interface{}, error) {
keyvalues := make(map[string]interface{})
dbc, err := d.Get(payload.Version)
if err != nil {
return nil, err
}
updated_at := d.processFrames(dbc, keyvalues, vin, payload.Data.Frames, cached)
d.processOptions(keyvalues, vin)
if len(keyvalues) > 0 {
serialized, err := state.SerializeTimestampUSec(updated_at)
if err != nil {
return keyvalues, err
}
keyvalues[state.UPDATED_AT] = serialized
}
return keyvalues, nil
}
// cached: Local memory and digital twin, check to see if the value for that digital twin value is already saved in its given value
func (d *DBCCollection) processFrames(dbc DBCVersionInterface, keyvaluesOut map[string]interface{}, vin string, frames []*kafka_grpc.GRPC_CANFrame, cached digitaltwin.DigitalTwinCacheInterface) int {
var updated_at int
for _, msg := range frames {
incomingSignals, err := dbc.GenerateCANSignals(vin, msg)
if err != nil {
logger.At(logger.Warn(), common.TRex.Key(vin), "dbc").
Str("signal", fmt.Sprintf("%v", msg)).
Str("dbc", dbc.Hash()).
Err(err).Send()
continue
}
updated_at = int(msg.Epoch)
err = dbc.ParseState(int(msg.ID), keyvaluesOut, vin, updated_at, incomingSignals, cached)
if err != nil {
logger.At(logger.Warn(), common.TRex.Key(vin), "dbc").
Str("signal", fmt.Sprintf("%v", msg)).
Str("dbc", dbc.Hash()).
Err(err).Send()
}
}
return updated_at
}
func (d *DBCCollection) processOptions(values map[string]interface{}, vin string) error {
for _, option := range d.options {
err := option.Handler(values, vin)
if err != nil {
return err
}
}
return nil
}
/*
func (d *DBCCollection) DiagnosticAlert(vin string, payload *kafka_grpc.GRPC_BatchPayload) ([]DiagnosticSignal, error) {
var flags = make([]DiagnosticSignal, 0)
dbc, err := d.Get(payload.Version)
if err != nil {
return nil, err
}
for _, msg := range payload.Data.Frames {
signals, err := dbc.GenerateCANSignals(vin, msg)
if err != nil {
logger.Warn().Str("id", vin).Err(err).Msgf("%+v", msg)
continue
}
alerts, err := dbc.DiagnosticAlert(int(msg.ID), vin, int(msg.Epoch), signals)
if err != nil {
logger.Warn().Str("id", vin).Err(err).Msgf("%+v", msg)
}
append(flags, alerts...)
}
return flags, err
}
*/