Files
cloud-services/pkg/usecase_helpers/update_notifier.go

137 lines
3.9 KiB
Go

package usecase_helpers
import (
"fmt"
"net/http"
"strings"
"github.com/fiskerinc/cloud-services/pkg/common"
"github.com/fiskerinc/cloud-services/pkg/common/carupdatestatus"
"github.com/fiskerinc/cloud-services/pkg/db/queries"
e "github.com/fiskerinc/cloud-services/pkg/errors"
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
"github.com/fiskerinc/cloud-services/pkg/kafka"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
type UpdateNotifierInterface interface {
Send(vins []string, manifest common.UpdateManifest, username string) ([]common.CarUpdate, error)
}
func NewUpdateNotifier(
cu queries.CarUpdatesInterface,
pr kafka.ProducerInterface,
) UpdateNotifierInterface {
return &updateNotifier{
carUpdates: cu,
producer: pr,
}
}
type updateNotifier struct {
carUpdates queries.CarUpdatesInterface
producer kafka.ProducerInterface
targetService string
}
// Expect only ota or another service that actually sends the update to the car to call this
func (un *updateNotifier) Send(vins []string, manifest common.UpdateManifest, username string) ([]common.CarUpdate, error) {
carUpdateList := make([]common.CarUpdate, len(vins))
pendingVins := make([]string, 0)
for _, vin := range vins {
pending, err := un.carUpdates.HasPendingUpdates(manifest.ID, vin)
if err != nil {
return nil, err
}
if pending {
pendingVins = append(pendingVins, vin)
}
}
if len(pendingVins) > 0 {
err := e.NewCustomError(fmt.Sprintf("pending update exists for %s", strings.Join(pendingVins, ", ")), http.StatusForbidden)
return carUpdateList, errors.WithStack(err)
}
// For each car, we create a new car_update entry, and then try to send the manifest to the car
for i, vin := range vins {
carUpdate, err := un.insertCarUpdate(un.carUpdates, manifest.ID, vin, username)
if err != nil {
return carUpdateList, err
}
err = un.sendManifest(vin, username, carUpdate.ID)
if err != nil {
un.cancel(un.carUpdates, carUpdate.ID, vin, "Failed to queue message to Kafka, error: "+err.Error())
return carUpdateList, err
}
carUpdateList[i] = carUpdate
}
return carUpdateList, nil
}
func (un *updateNotifier) sendManifest(vin string, username string, carUpdateID int64) error {
data := &kafka_grpc.GRPC_AttendantPayload_UpdateManifest{
UpdateManifest: &kafka_grpc.UpdateManifest{
CarUpdateId: carUpdateID,
},
}
kafkaMSG := kafka_grpc.GRPC_AttendantPayload{
Handler: "send_manifest",
Data: data,
}
binaryPayload, _ := proto.Marshal(&kafkaMSG)
return un.producer.ProduceBinary(kafka.AttendantServiceGRPCKafka, common.Service.Key(vin), binaryPayload, map[string][]byte{
"id": []byte(username),
})
}
func (un *updateNotifier) insertCarUpdate(db queries.CarUpdatesInterface, manifestID int64, vin string, username string) (common.CarUpdate, error) {
up := common.CarUpdate{
UpdateManifestID: manifestID,
VIN: vin,
Status: "pending",
Username: username,
UpdateSource: common.UPDATE_SOURCE_OTA,
}
_, err := db.Insert(&up)
if err != nil {
return up, err
}
_, err = db.LogStatus(&up)
return up, err
}
func (un *updateNotifier) cancel(db queries.CarUpdatesInterface, updateID int64, vin string, info string) {
up := common.CarUpdate{
ID: updateID,
Status: carupdatestatus.ManifestCanceled,
Info: info,
}
db.UpdateStatus(&up)
}
type JSONCarUpdatesRequest struct {
UpdateManifestID int64 `json:"manifest_id" validate:"required"`
VINs []string `json:"vins" validate:"required,gte=1,lte=1000,dive,vin"`
}
type JSONOneCarUpdatesRequest struct {
UpdateManifestID int64 `json:"manifest_id" validate:"required"`
VIN string `json:"vin" validate:"required,vin"`
}
type JSONFleetUpdatesRequest struct {
UpdateManifestID int64 `json:"manifest_id" validate:"required"`
FleetNames []string `json:"fleet_names" validate:"required,gte=1,lte=1000,dive,fleet"`
}