Files

167 lines
3.9 KiB
Go

package websocket
import (
"context"
"encoding/json"
"sync"
"github.com/fiskerinc/cloud-services/pkg/common"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/fiskerinc/cloud-services/pkg/scheduler"
"github.com/pkg/errors"
"github.com/robfig/cron"
)
func NewConnections() *Connections {
return &Connections{
sessions: make(map[string]SessionInterface),
expiration: scheduler.Bucket[SessionInterface]{},
}
}
type Connections struct {
sessions map[string]SessionInterface
expiration scheduler.Bucket[SessionInterface]
mu sync.RWMutex
}
func (c *Connections) getSession(id string) (SessionInterface, bool) {
c.mu.RLock()
session, ok := c.sessions[id]
c.mu.RUnlock()
return session, ok
}
func (c *Connections) addSession(key string, session SessionInterface) {
c.mu.Lock()
c.sessions[key] = session
logger.At(logger.Info(), key, "conn").
Str("ip", session.GetIP()).
Int("connections", c.length()).
Msgf("added connection %s", key)
c.mu.Unlock()
}
func (c *Connections) deleteSession(session SessionInterface) {
c.mu.Lock()
key := session.Key()
delete(c.sessions, key)
logger.At(logger.Info(), key, "conn").
Str("ip", session.GetIP()).
Int("connections", c.length()).
Msgf("removed connection %s", key)
c.mu.Unlock()
}
// Add connection to map
func (c *Connections) Add(session SessionInterface) error {
key := session.Key()
expiredSession, exists := c.getSession(key)
if exists {
expiredSession.SendMsgToClient(DuplicateConnectionMessage())
// if connection already exists, skip teardown when closing connection
// otherwise the car status will be changed to offline
expiredSession.SkipTeardown(true)
c.Remove(expiredSession)
}
c.addSession(key, session)
if exists && expiredSession != nil {
if expiredSession.GetType() == common.HMI.String() { // workaround for HMI sessions
// if connection already exists, skip teardown when closing connection
// otherwise the car status will be changed to offline
logger.At(logger.Info(), "Connections::checkIfExists schedule", key).Send()
c.Schedule(expiredSession)
} else {
expiredSession.Close()
logger.At(logger.Info(), key, "conn").Msgf("existing connection %s is closed", key)
}
logger.At(logger.Info(), key, "conn").Msgf("removing duplicate connection %s", key)
}
return nil
}
// Remove connection from map
//
// if connection is not equal to the connection in map,
// does not remove
func (c *Connections) Remove(session SessionInterface) error {
id := session.Key()
expiredSesssion, ok := c.getSession(id)
if !ok {
return missingWebsocketError(id)
}
if expiredSesssion != session {
return wrongSessionError(id)
}
c.deleteSession(session)
return nil
}
// Send to websocket connection
func (c *Connections) SendMsgToClient(id string, message []byte) error {
session, ok := c.getSession(id)
if !ok {
return missingWebsocketError(id)
}
return session.SendMsgToClient(message)
}
func (c *Connections) length() int {
return len(c.sessions)
}
func missingWebsocketError(id string) error {
return errors.Errorf("no websocket connection found for ID: %v", id)
}
func wrongSessionError(id string) error {
return errors.Errorf("%v does not match with existing connection", id)
}
func DuplicateConnectionMessage() []byte {
m := common.Message{
Handler: "error",
Data: common.MessageString{
Message: "disconnected by duplicate ID",
},
}
p, _ := json.Marshal(m)
return p
}
func (c *Connections) Schedule(session SessionInterface) error {
logger.Info().Msgf("Scheduling session to expire in 6 min %s. type %s", session.GetID(), session.GetType())
c.expiration.Schedule(session)
return nil
}
func (c *Connections) RunExpiration(ctx context.Context) {
cr := cron.New()
cr.AddFunc("@every 30s", func() {
c.expiration.Process(func(session SessionInterface) {
logger.Debug().Msgf("RunExpiration::closing session %s ", session.Key())
session.Close()
})
})
cr.Start()
<-ctx.Done()
cr.Stop()
}