package websocket import ( "context" "encoding/json" "sync" "github.com/fiskerinc/cloud-services/pkg/common" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/fiskerinc/cloud-services/pkg/scheduler" "github.com/pkg/errors" "github.com/robfig/cron" ) func NewConnections() *Connections { return &Connections{ sessions: make(map[string]SessionInterface), expiration: scheduler.Bucket[SessionInterface]{}, } } type Connections struct { sessions map[string]SessionInterface expiration scheduler.Bucket[SessionInterface] mu sync.RWMutex } func (c *Connections) getSession(id string) (SessionInterface, bool) { c.mu.RLock() session, ok := c.sessions[id] c.mu.RUnlock() return session, ok } func (c *Connections) addSession(key string, session SessionInterface) { c.mu.Lock() c.sessions[key] = session logger.At(logger.Info(), key, "conn"). Str("ip", session.GetIP()). Int("connections", c.length()). Msgf("added connection %s", key) c.mu.Unlock() } func (c *Connections) deleteSession(session SessionInterface) { c.mu.Lock() key := session.Key() delete(c.sessions, key) logger.At(logger.Info(), key, "conn"). Str("ip", session.GetIP()). Int("connections", c.length()). Msgf("removed connection %s", key) c.mu.Unlock() } // Add connection to map func (c *Connections) Add(session SessionInterface) error { key := session.Key() expiredSession, exists := c.getSession(key) if exists { expiredSession.SendMsgToClient(DuplicateConnectionMessage()) // if connection already exists, skip teardown when closing connection // otherwise the car status will be changed to offline expiredSession.SkipTeardown(true) c.Remove(expiredSession) } c.addSession(key, session) if exists && expiredSession != nil { if expiredSession.GetType() == common.HMI.String() { // workaround for HMI sessions // if connection already exists, skip teardown when closing connection // otherwise the car status will be changed to offline logger.At(logger.Info(), "Connections::checkIfExists schedule", key).Send() c.Schedule(expiredSession) } else { expiredSession.Close() logger.At(logger.Info(), key, "conn").Msgf("existing connection %s is closed", key) } logger.At(logger.Info(), key, "conn").Msgf("removing duplicate connection %s", key) } return nil } // Remove connection from map // // if connection is not equal to the connection in map, // does not remove func (c *Connections) Remove(session SessionInterface) error { id := session.Key() expiredSesssion, ok := c.getSession(id) if !ok { return missingWebsocketError(id) } if expiredSesssion != session { return wrongSessionError(id) } c.deleteSession(session) return nil } // Send to websocket connection func (c *Connections) SendMsgToClient(id string, message []byte) error { session, ok := c.getSession(id) if !ok { return missingWebsocketError(id) } return session.SendMsgToClient(message) } func (c *Connections) length() int { return len(c.sessions) } func missingWebsocketError(id string) error { return errors.Errorf("no websocket connection found for ID: %v", id) } func wrongSessionError(id string) error { return errors.Errorf("%v does not match with existing connection", id) } func DuplicateConnectionMessage() []byte { m := common.Message{ Handler: "error", Data: common.MessageString{ Message: "disconnected by duplicate ID", }, } p, _ := json.Marshal(m) return p } func (c *Connections) Schedule(session SessionInterface) error { logger.Info().Msgf("Scheduling session to expire in 6 min %s. type %s", session.GetID(), session.GetType()) c.expiration.Schedule(session) return nil } func (c *Connections) RunExpiration(ctx context.Context) { cr := cron.New() cr.AddFunc("@every 30s", func() { c.expiration.Process(func(session SessionInterface) { logger.Debug().Msgf("RunExpiration::closing session %s ", session.Key()) session.Close() }) }) cr.Start() <-ctx.Done() cr.Stop() }