package websocket import ( "fmt" "net" "sync" "testing" "time" kafka "github.com/fiskerinc/cloud-services/pkg/kafka/mock" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/fiskerinc/cloud-services/pkg/redis" ) func AddRemoveRedisListeners(add bool, id string, pubsub *redis.PubSub, queueRef *redis.Queues) { if !add { if err := pubsub.Remove(id); err != nil { logger.At(logger.Warn(), id, "Redis PubSub conn remove failed").Err(err).Send() } if err := queueRef.Remove(id); err != nil { logger.At(logger.Warn(), id, "Redis Queue conn remove failed").Err(err).Send() } } else { if err := pubsub.Add(id); err != nil { logger.At(logger.Error(), id, "Redis PubSub conn add failed").Err(err).Send() } if err := queueRef.Add(id); err != nil { logger.At(logger.Error(), id, "Redis Queue conn add failed").Err(err).Send() } } } func TestSecureSessionTRexConnections(t *testing.T) { redis.MockRedisConnection() kafka.GetKafkaMock(nil) pubSub := redis.NewPubSub(redis.GetMockPool().Get()) queueRef := redis.NewQueues(redis.GetMockPool()) var wg sync.WaitGroup connections := NewConnections() for i := 0; i < 1000; i++ { wg.Add(1) go func(i int) { ws, _ := net.Pipe() id := fmt.Sprintf("fisker123") if i%2 == 0 { id = fmt.Sprintf("%s%d", id, i) } s := &SessionTRex{ Session: &Session{ Websocket: ws, ID: id, }, DBC: id, ICCID: "!23454523453", } connections.Add(s) AddRemoveRedisListeners(true, id, pubSub, queueRef) time.Sleep(200 * time.Millisecond) connections.Remove(s) s.Close() ws.Close() wg.Done() }(i) } wg.Wait() t.Log("success") }