Files
cloud-services/pkg/redis/queues.go

242 lines
5.5 KiB
Go

package redis
import (
"context"
"sync"
"time"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/fiskerinc/cloud-services/pkg/scheduler"
)
const (
blockTimeout = 5
reconnectTimeout = 5
)
type Payload struct {
channel string
data []byte
}
// Queues is a struct used for tracking Redis queues
// follows the Listener interface
type Queues struct {
connection ClientPoolInterface
queues Set
args redis.Args
mu sync.RWMutex
retryQueue scheduler.Bucket[Payload]
}
// NewQueues generates a new working Queues object
func NewQueues(args ...Pool) *Queues {
q := make(Set)
var clientPool ClientPoolInterface
if len(args) > 0 {
clientPool = NewClientPool(args[0])
} else {
clientPool = NewClientPool()
}
return &Queues{
connection: clientPool,
queues: q,
args: queuesToArgs(q),
retryQueue: scheduler.Bucket[Payload]{},
}
}
// Add adds queue for listener to block on
// follows the format "queue:<id>"
func (q *Queues) Add(id string) error {
q.mu.Lock()
defer q.mu.Unlock()
ok := q.queues.Add(id)
if !ok {
return errors.Errorf("%v already in queues", id)
}
q.args = queuesToArgs(q.queues)
logger.At(logger.Debug(), "Queues::Add conn", id).Send()
return nil
}
// Remove queue from listener blocking
// follows the format "queue:<id>"
func (q *Queues) Remove(id string) error {
q.mu.Lock()
defer q.mu.Unlock()
ok := q.queues.Remove(id)
if !ok {
return errors.Errorf("%v does not exist in queues", id)
}
q.args = queuesToArgs(q.queues)
logger.At(logger.Debug(), "Queues::Remove conn", id).Send()
return nil
}
func (q *Queues) getArgs() redis.Args {
q.mu.RLock()
defer q.mu.RUnlock()
return q.args
}
// Listen to redis by blocking on all lists
// currently within the set
func (q *Queues) Listen(ctx context.Context, handler func(string, []byte) error) error {
q.QueryRun(ctx)
isListening := true
done := make(chan error, 1)
go func() {
select {
case <-ctx.Done():
isListening = false
break
case <-done:
return
}
}()
sampled := logger.Sample(zerolog.LevelSampler{
DebugSampler: &zerolog.BurstSampler{
Burst: 1,
Period: 1 * time.Minute,
},
})
queueMaps := make(map[string][]byte)
for isListening {
clear(queueMaps)
err := q.Process(handler, sampled, queueMaps)
if err != nil {
done <- err
return errors.WithStack(err)
}
}
return nil
}
// ListenChannel dumps redis messages to channel rather
// than using a traditional handler
func (q *Queues) ListenChannel() error {
// stub
return nil
}
// Length returns number of queues
func (q *Queues) Length() int {
return len(q.queues)
}
// Restart re-initializes queues connection
func (q *Queues) Restart() error {
q.connection = NewClientPool()
return nil
}
func (c *Queues) timerFunc() {
c.retryQueue.Process(func(payload Payload) {
logger.Debug().Msgf("QueryRun::closing session %s ", payload.channel)
client := c.connection.GetFromPool()
// Attempt to stop base64 encoding messages
err := client.SafeQueueMessage(payload.channel, string(payload.data))
client.Close()
if err != nil {
logger.At(logger.Error(), payload.channel, "redis").Err(err).Send()
} else {
logger.Info().Msgf("Unable to send to websocket, added %s back to queue", payload.channel)
}
})
}
func (c *Queues) QueryRun(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
go func() {
for {
select {
case <-ticker.C:
c.timerFunc()
case <-ctx.Done():
ticker.Stop()
return
}
}
}()
}
func (q *Queues) retry(key string, payload []byte) {
qItem := Payload{
channel: key,
data: payload,
}
q.retryQueue.Schedule(qItem)
}
func (q *Queues) Process(handler func(string, []byte) error, sampleLogger zerolog.Logger, queueMaps map[string][]byte) error {
args := q.getArgs()
sleepTime := blockTimeout * time.Millisecond
if len(args) == 0 {
sampleLogger.Debug().Msgf("Queue:Process no args")
time.Sleep(sleepTime)
return nil
}
poppedMap, err := q.lPop(args, sampleLogger, queueMaps)
if err != nil && len(poppedMap) == 0 {
if err != nil {
sampleLogger.Debug().Msgf("Queue:Process lPop failed")
logger.At(logger.Error(), "Queue:Process", "redis:unable to LPop").Err(err).Send()
}
time.Sleep(sleepTime)
return nil
}
for channel, payload := range poppedMap {
logger.Debug().Msgf("call handler, key- %s, data- %v", channel, string(payload))
if err = handler(channel, payload); err != nil {
logger.At(logger.Error(), channel, "gateway:unable to send: retry in 3 sec").Err(err).Send()
q.retry(channel, payload)
} else {
logger.At(logger.Debug(), channel, "redis").
Str("Gateway:msg", string(payload)).
Msgf("VIN %s received queued msg ", channel)
}
}
return nil
}
func (q *Queues) lPop(args redis.Args, sampleLogger zerolog.Logger, queueMaps map[string][]byte) (map[string][]byte, error) {
client := q.connection.GetFromPool()
defer client.Close()
for _, arg := range args {
reply, err := redis.Bytes(client.GetConn().Do("LPOP", arg))
if err != nil {
if errors.Is(err, redis.ErrNil) {
sampleLogger.Debug().Msgf("LPop::null value returned by redis")
continue
}
logger.At(logger.Error(), "LPop", "redis").
Str("LPop", "").Err(err).Send()
return queueMaps, err
}
logger.Debug().Msgf("LPop: vin %s reply %d ", arg.(string), len(reply))
queueMaps[ParseQueueKey(arg.(string))] = reply
}
return queueMaps, nil
}
func queuesToArgs(s Set) redis.Args {
a := make(redis.Args, len(s))
i := 0
for e := range s {
a[i] = QueueKey(e)
i++
}
return a
}