package usecase_helpers import ( "fmt" "net/http" "strings" "github.com/fiskerinc/cloud-services/pkg/common" "github.com/fiskerinc/cloud-services/pkg/common/carupdatestatus" "github.com/fiskerinc/cloud-services/pkg/db/queries" e "github.com/fiskerinc/cloud-services/pkg/errors" "github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc" "github.com/fiskerinc/cloud-services/pkg/kafka" "github.com/pkg/errors" "google.golang.org/protobuf/proto" ) type UpdateNotifierInterface interface { Send(vins []string, manifest common.UpdateManifest, username string) ([]common.CarUpdate, error) } func NewUpdateNotifier( cu queries.CarUpdatesInterface, pr kafka.ProducerInterface, ) UpdateNotifierInterface { return &updateNotifier{ carUpdates: cu, producer: pr, } } type updateNotifier struct { carUpdates queries.CarUpdatesInterface producer kafka.ProducerInterface targetService string } // Expect only ota or another service that actually sends the update to the car to call this func (un *updateNotifier) Send(vins []string, manifest common.UpdateManifest, username string) ([]common.CarUpdate, error) { carUpdateList := make([]common.CarUpdate, len(vins)) pendingVins := make([]string, 0) for _, vin := range vins { pending, err := un.carUpdates.HasPendingUpdates(manifest.ID, vin) if err != nil { return nil, err } if pending { pendingVins = append(pendingVins, vin) } } if len(pendingVins) > 0 { err := e.NewCustomError(fmt.Sprintf("pending update exists for %s", strings.Join(pendingVins, ", ")), http.StatusForbidden) return carUpdateList, errors.WithStack(err) } // For each car, we create a new car_update entry, and then try to send the manifest to the car for i, vin := range vins { carUpdate, err := un.insertCarUpdate(un.carUpdates, manifest.ID, vin, username) if err != nil { return carUpdateList, err } err = un.sendManifest(vin, username, carUpdate.ID) if err != nil { un.cancel(un.carUpdates, carUpdate.ID, vin, "Failed to queue message to Kafka, error: "+err.Error()) return carUpdateList, err } carUpdateList[i] = carUpdate } return carUpdateList, nil } func (un *updateNotifier) sendManifest(vin string, username string, carUpdateID int64) error { data := &kafka_grpc.GRPC_AttendantPayload_UpdateManifest{ UpdateManifest: &kafka_grpc.UpdateManifest{ CarUpdateId: carUpdateID, }, } kafkaMSG := kafka_grpc.GRPC_AttendantPayload{ Handler: "send_manifest", Data: data, } binaryPayload, _ := proto.Marshal(&kafkaMSG) return un.producer.ProduceBinary(kafka.AttendantServiceGRPCKafka, common.Service.Key(vin), binaryPayload, map[string][]byte{ "id": []byte(username), }) } func (un *updateNotifier) insertCarUpdate(db queries.CarUpdatesInterface, manifestID int64, vin string, username string) (common.CarUpdate, error) { up := common.CarUpdate{ UpdateManifestID: manifestID, VIN: vin, Status: "pending", Username: username, UpdateSource: common.UPDATE_SOURCE_OTA, } _, err := db.Insert(&up) if err != nil { return up, err } _, err = db.LogStatus(&up) return up, err } func (un *updateNotifier) cancel(db queries.CarUpdatesInterface, updateID int64, vin string, info string) { up := common.CarUpdate{ ID: updateID, Status: carupdatestatus.ManifestCanceled, Info: info, } db.UpdateStatus(&up) } type JSONCarUpdatesRequest struct { UpdateManifestID int64 `json:"manifest_id" validate:"required"` VINs []string `json:"vins" validate:"required,gte=1,lte=1000,dive,vin"` } type JSONOneCarUpdatesRequest struct { UpdateManifestID int64 `json:"manifest_id" validate:"required"` VIN string `json:"vin" validate:"required,vin"` } type JSONFleetUpdatesRequest struct { UpdateManifestID int64 `json:"manifest_id" validate:"required"` FleetNames []string `json:"fleet_names" validate:"required,gte=1,lte=1000,dive,fleet"` }