Files

112 lines
3.4 KiB
Go

package handlers
import (
"context"
"net/http"
"github.com/fiskerinc/cloud-services/services/gateway/services"
"github.com/fiskerinc/cloud-services/services/gateway/websocket"
"github.com/fiskerinc/cloud-services/pkg/logger"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
// SecureSessionWebsocketHandler initiates a websocket connection off an HTTP request
func SecureSessionWebsocketHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
session, err := websocket.NewSecureSession(w, r)
if err != nil {
logger.Warn().Err(err).Send()
logger.Warn().Msgf("wshandler: bad request %v", websocket.PrintRequest(r))
return
}
go runSessionLifeCycle(ctx, session)
}
// InsecureSessionWebsocketHandler initiates a websocket connection off an HTTP request from mobile
func InsecureSessionWebsocketHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
session, err := websocket.NewInsecureSession(w, r)
if err != nil {
logger.Warn().Err(err).Send()
logger.Warn().Msgf("wshandler: bad request %v", websocket.PrintRequest(r))
return
}
go runSessionLifeCycle(ctx, session)
}
// WebsocketSession handles the life cycle of a websocket
func runSessionLifeCycle(ctx context.Context, session websocket.SessionInterface) {
defer session.Close()
span, ctx := tracer.StartSpanFromContext(ctx, "websocket")
defer span.Finish()
err := session.Authenticate()
if err != nil {
logger.At(logger.Warn(), session.Key(), "conn").Str("ip", session.GetIP()).Err(err).Send()
return
}
addServices(session)
defer removeServices(session)
producer, err := services.GetKafkaProducer()
if err != nil {
logger.Error().Str("id", session.Key()).Err(err).Send()
return
}
logger.Debug().Msgf("websocket session: start listening%v", session.GetID())
err = session.Listen(ctx, producer)
if err != nil {
logger.At(logger.Warn(), session.Key(), "conn").Err(err).Send()
return
}
}
// addServices notifies all services upon websocket connection
func addServices(session websocket.SessionInterface) error {
id := session.Key()
services.GetConnections().Add(session)
logger.Debug().Msgf("websocket session: addServices lifecycle %v", id)
services.AddRemoveRedisListeners(true, id)
producer, err := services.GetKafkaProducer()
if err != nil {
logger.At(logger.Error(), session.Key(), "Kafka producer failed").Err(err).Send()
return err
}
if err = session.Load(producer); err != nil {
logger.At(logger.Warn(), session.Key(), "conn").Err(err).Send()
}
logger.At(logger.Debug(), "Session", id).Msgf("connection added %s", id)
return nil
}
// removeServices notifies all services upon websocket disconnection
func removeServices(session websocket.SessionInterface) error {
id := session.Key()
if err := services.GetConnections().Remove(session); err != nil {
// if error returned, the session did not exist or is different session
// that means we should not remove the current id from pub sub and queues
logger.At(logger.Error(), session.Key(), "conn").Err(err).Send()
return err
}
services.AddRemoveRedisListeners(false, id)
producer, err := services.GetKafkaProducer()
if err != nil {
logger.At(logger.Error(), session.Key(), "Kafka producer").Err(err).Send()
return err
}
if err = session.Teardown(producer); err != nil {
logger.At(logger.Warn(), session.Key(), "conn").Err(err).Send()
}
logger.At(logger.Debug(), "Session", id).Msgf("connection removed %s", id)
return nil
}