package websocket import ( "context" "fmt" "net/http" "time" "github.com/fiskerinc/cloud-services/pkg/dbc/models" "google.golang.org/protobuf/proto" "github.com/fiskerinc/cloud-services/pkg/common" "github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc" "github.com/fiskerinc/cloud-services/pkg/kafka" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/gobwas/ws" "github.com/gobwas/ws/wsflate" "github.com/pkg/errors" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) // NewTRexSession serves as the constructor for TRex sessions func NewTRexSession(w http.ResponseWriter, r *http.Request, vin, version, iccid string) (SessionInterface, error) { var s SessionInterface var compressionNegotiator = wsflate.Extension{ Parameters: wsflate.DefaultParameters, } var websocketUpgrader = ws.HTTPUpgrader{ Negotiate: compressionNegotiator.Negotiate, } conn, _, _, err := websocketUpgrader.Upgrade(r, w) if err != nil { return s, errors.WithStack(err) } if _, ok := compressionNegotiator.Accepted(); !ok { conn.Close() return s, errors.Errorf("didn't negotiate compression for %s", conn.RemoteAddr()) } dbc := ParseDBCFromRequest(r) return &SessionTRex{ Session: &Session{ Websocket: conn, Type: common.TRex, Version: version, ID: vin, epoch: time.Now().UnixNano(), }, DBC: dbc, ICCID: iccid, }, nil } // SessionTRex utilizes a specialized listener type SessionTRex struct { *Session DBC string ICCID string } // Authenticate returns id if proper authentication, else returns error func (s *SessionTRex) Authenticate() error { return nil } func (s *SessionTRex) Receive() ([]byte, ws.OpCode, error) { return s.receive(s.extendDeadline) } // Listen to websocket session and use handler upon message received func (s *SessionTRex) Listen(ctx context.Context, producer kafka.ProducerInterface) error { span, _ := tracer.StartSpanFromContext(ctx, "listen") defer span.Finish() defer s.Close() for { key := s.Key() msg, op, err := s.Receive() if op == ws.OpClose { logger.At(logger.Info(), key, "conn").Msg("OpClose") return nil } else if err != nil { logger.At(logger.Info(), key, "conn").Err(err).Send() return err } err = s.Route(producer, msg) if err != nil { logger.At(logger.Warn(), key, "route").Err(err).Send() } } } // Route TRex messages func (s *SessionTRex) Route(producer kafka.ProducerInterface, data []byte) error { // TODO Unmarshal message and extract CAN frames into Kafka var m common.MessageRawJSON var err error err = m.Unmarshal(data) if err != nil { return errors.Wrap(err, fmt.Sprintf("msg %s", string(data))) } key := s.Key() topic, ok := kafka.TRexHandlerTopics[m.Handler] if !ok { return ErrInvalidHandler(m.Handler) } switch topic { case kafka.VehicleData: m.Version = models.GetShortKey(s.DBC) car := common.CarDataBatchPayload{} grpcCan, err := car.ToGrpc(m, s.GetID()) if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to convert to GRPC")).Send() return err } if s.ID == "VCF1ZBU29PG004227" { ids := "" event := logger.Warn() for _, f := range grpcCan.Data.Frames{ // f.Value ids = fmt.Sprintf("%s, %d", ids, f.ID) event.Str(fmt.Sprintf("%d",f.ID), fmt.Sprintf("%X", f.Value)) } event.Msg(s.ID) } grpcData, err := proto.Marshal(grpcCan) if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to marshal GRPC")).Send() return err } err = producer.ProduceBinary(kafka.VehicleData, s.GetID(), grpcData, nil) grpcData = nil grpcCan = nil if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "Producer failed")).Send() return err } case kafka.LogService: var grpcLogs *kafka_grpc.TRexLogs_BatchPayload switch m.Handler { case "trex_log": logs := common.TRexLogs{} grpcLogs, err = logs.ToGrpc(m) case "error": errorr := common.TRexError{} grpcLogs, err = errorr.ToGrpc(m) } if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to convert trex logs to GRPC"+string(m.Data[:]))).Send() return err } grpcData, err := proto.Marshal(grpcLogs) if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to marshal trexlogs GRPC"+string(m.Data[:]))).Send() return err } err = producer.ProduceBinary(kafka.LogServiceGRPCKafka, key, grpcData, nil) grpcData = nil grpcLogs = nil if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "Producer for trex logs failed")).Send() return err } case kafka.LogServiceGRPCKafka: // This case should not be necessary, but just in case someone gets confused, it is in here var grpcLogs *kafka_grpc.TRexLogs_BatchPayload switch m.Handler { case "trex_log": logs := common.TRexLogs{} grpcLogs, err = logs.ToGrpc(m) case "error": errorr := common.TRexError{} grpcLogs, err = errorr.ToGrpc(m) } // TODO: unable to convert trex logs msg {"code":-32601,"message":"The handler does not exist or is not available"}: json: cannot unmarshal string into Go struct field TRexError.message of type []common.TRexErrorMessage if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to convert trex logs msg "+string(m.Data[:]))).Send() return err } grpcData, err := proto.Marshal(grpcLogs) if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to marshal trexlogs GRPC"+string(m.Data[:]))).Send() return errors.WithStack(err) } err = producer.ProduceBinary(kafka.LogServiceGRPCKafka, key, grpcData, nil) grpcData = nil grpcLogs = nil if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "Producer for trex logs failed")).Send() return err } case kafka.ValetServiceGRPCKafka: valetData := common.ValetRouteTRexPayloadGRPC(m) grpcData, err := proto.Marshal(valetData) if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to marshal trexlogs GRPC "+topic)).Send() return errors.WithStack(err) } err = producer.ProduceBinary(kafka.ValetServiceGRPCKafka, key, grpcData, nil) grpcData = nil if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "Producer for trex logs failed")).Send() return err } case kafka.DepotServiceGRPCKafka: valetData := common.DepotRouteTRexToGRPC(m) grpcData, err := proto.Marshal(valetData) if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to marshal trexlogs GRPC "+topic)).Send() return errors.WithStack(err) } err = producer.ProduceBinary(kafka.DepotServiceGRPCKafka, key, grpcData, nil) grpcData = nil if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "Producer for trex logs failed")).Send() return err } case kafka.AttendantServiceGRPCKafka: valetData := common.AttendantRouteTRexGRPC(m) grpcData, err := proto.Marshal(valetData) if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "unable to marshal trexlogs GRPC "+topic)).Send() return errors.WithStack(err) } err = producer.ProduceBinary(kafka.AttendantServiceGRPCKafka, key, grpcData, nil) grpcData = nil if err != nil { logger.At(logger.Error(), key, "route"). Err(errors.Wrap(err, "Producer for trex logs failed")).Send() return err } default: err = producer.Produce(topic, key, m, nil) if err != nil { return err } } return nil } func (s *SessionTRex) KafkaEndSessionMarker(producer kafka.ProducerInterface) error { can := kafka_grpc.GRPC_BatchPayload{ Handler: "", Data: nil, Version: models.GetShortKey(s.DBC), } grpcData, _ := proto.Marshal(&can) key := s.Key() logger.At(logger.Info(), key, "conn"). Msgf("closing connection %s", key) return producer.ProduceBinary(kafka.VehicleData, s.GetID(), grpcData, nil) } func (s *SessionTRex) Teardown(producer kafka.ProducerInterface) error { s.KafkaEndSessionMarker(producer) return s.Session.Teardown(producer) } // Load the session - distributes messages to system notifing of new connection func (s *SessionTRex) Load(producer kafka.ProducerInterface) error { m := &kafka_grpc.GRPC_DepotPayload_InitPayload{ InitPayload: &kafka_grpc.InitPayload{ Data: map[string]string{ "version": s.Version, "iccid": s.ICCID, "ip": s.GetIP(), "dbc_version": s.DBC, }, }, } payload := kafka_grpc.GRPC_DepotPayload{ Handler: "init", Data: m, } binaryPayload, _ := proto.Marshal(&payload) return producer.ProduceBinary(kafka.DepotServiceGRPCKafka, s.Key(), binaryPayload, nil) }