package server import ( "encoding/json" "github.com/fiskerinc/cloud-services/services/depot/handlers" "github.com/fiskerinc/cloud-services/services/depot/services" "github.com/fiskerinc/cloud-services/pkg/common" "github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc" "github.com/fiskerinc/cloud-services/pkg/kafka" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/fiskerinc/cloud-services/pkg/loggerdataresp" "google.golang.org/protobuf/proto" ) // StartConsumer runs consumer and puts events into a channel for router func StartConsumer(topic, oldTopic string) { defer func() { if err := recover(); err != nil { logger.Error().Msgf("PanicConsumer %v", err) } }() eventsJSON := make(chan common.EventRawJSON) events := make(chan *kafka.Message) go routeEvents(events) go routeOldEvents(eventsJSON) logger.Info().Msgf("consumer initialized for topic: %v", topic) consumer, oldConsumer, err := services.GetKafkaConsumer() if err != nil { panic(err) } go func() { err = oldConsumer.ConsumeToChannelJson([]string{oldTopic}, eventsJSON) loggerdataresp.BadDataError(err, loggerdataresp.EofErrorCheck) }() err = consumer.ConsumeToChannel([]string{topic}, events) loggerdataresp.BadDataError(err, loggerdataresp.EofErrorCheck) } func routeOldEvents(events chan common.EventRawJSON) { p := services.GetDB() defer p.Close() for { event := <-events var err error var dt common.Payload err = dt.Unmarshal(event.Payload) device, k := common.ParseDeviceKey(event.Key) payload := &common.ConsumerPayload{ Handler: dt.Handler, Data: dt.Data, } logger.Debug().Str("id", k).Msgf("source: %s, type: %s, handler: %s", k, device, payload.GetHandler()) switch device { case common.TRex: err = routeTRex(k, payload) case common.HMI: err = routeHMI(p, k, payload) case common.Mobile: err = routeMobile(p, k, payload) default: err = ErrInvalidDevice } loggerdataresp.BadDataError(err, loggerdataresp.EofErrorCheck) } } func routeEvents(events chan *kafka.Message) { p := services.GetDB() defer p.Close() for { event := <-events var err error payload := &kafka_grpc.GRPC_DepotPayload{} err = proto.Unmarshal(event.Value, payload) device, k := common.ParseDeviceKey(string(event.Key)) logger.Debug().Str("id", k).Msgf("source: %s, type: %s, handler: %s", k, device, payload.GetHandler()) switch device { case common.TRex: d, _ := common.DepotRouteTRexPayload(payload) err = routeTRex(k, d) case common.HMI: d, _ := common.DepotRouteHMIPayload(payload) err = routeHMI(p, k, d) case common.Mobile: d, _ := common.DepotRouteMobilePayload(payload) err = routeMobile(p, k, d) default: err = ErrInvalidDevice } loggerdataresp.BadDataError(err, loggerdataresp.EofErrorCheck) } } func routeTRex(id string, d common.ConsumerPayloadInterface) error { // route TRex messages var err error switch d.GetHandler() { case "init": var vMap map[string]string err = json.Unmarshal(d.GetData(), &vMap) if err != nil { return err } err = handlers.TRexInit(id, vMap) case "del": err = handlers.TRexDel(id) default: err = kafka.ErrUnhandledMessage(common.TRex, id, d.GetHandler(), string(d.GetData())) } return err } func routeHMI(p *services.DB, id string, d common.ConsumerPayloadInterface) error { // route HMI messages var err error switch d.GetHandler() { case "init": err = handlers.HMIInit(p, id, d.GetData()) case "del": err = handlers.HMIDel(p, id) default: err = kafka.ErrUnhandledMessage(common.HMI, id, d.GetHandler(), string(d.GetData())) } return err } func routeMobile(p *services.DB, id string, d common.ConsumerPayloadInterface) error { // route mobile messages var err error switch d.GetHandler() { case "init": err = handlers.MobileInit(p, id) case "del": err = handlers.MobileDel(p, id) default: err = kafka.ErrUnhandledMessage(common.Mobile, id, d.GetHandler(), string(d.GetData())) } return err }