package redis import ( "context" "fmt" "os" "sync" "testing" "time" "github.com/fiskerinc/cloud-services/pkg/testhelper" "github.com/pkg/errors" ) func TestInitQueues(t *testing.T) { MockRedisConnection() var q Listener q = NewQueues() numQueues := 0 if q.Length() != numQueues { t.Errorf(testhelper.TestErrorTemplate, "TestInitQueues", numQueues, q.Length()) } } func TestAddQueue(t *testing.T) { MockRedisConnection() var q Listener q = NewQueues() err := q.Add("TESTVIN123") if err != nil { t.Errorf(testhelper.TestErrorTemplate, "TestAddQueue", nil, err) } numQueues := 1 if q.Length() != numQueues { t.Errorf(testhelper.TestErrorTemplate, "TestAddQueue", numQueues, q.Length()) } } func TestRemoveQueue(t *testing.T) { MockRedisConnection() var q Listener q = NewQueues() err := q.Add("TESTVIN123") if err != nil { t.Errorf(testhelper.TestErrorTemplate, "TestRemoveQueue", nil, err) } numQueues := 1 if q.Length() != numQueues { t.Errorf(testhelper.TestErrorTemplate, "TestRemoveQueue", numQueues, q.Length()) } err = q.Remove("TESTVIN123") if err != nil { t.Errorf(testhelper.TestErrorTemplate, "TestRemoveQueue", nil, err) } numQueues = 0 if q.Length() != numQueues { t.Errorf(testhelper.TestErrorTemplate, "TestRemoveQueue", numQueues, q.Length()) } } func TestQueuesListener(t *testing.T) { MockRedisConnection() q := NewQueues() ctx, cancel := context.WithCancel(context.Background()) defer cancel() mockFunc := func(string, []byte) error { return nil } go q.Listen(ctx, mockFunc) } func TestQueuesRestart(t *testing.T) { MockRedisConnection() q := NewQueues() err := q.Restart() if err != nil { t.Errorf(testhelper.TestErrorTemplate, "TestQueuesRestart", nil, err) } } func TestQueuesThreaded(t *testing.T) { t.Skip() //remove this for local testing or testing with redis UpdateRedisConnection("localhost", "6379", "fisker123") os.Setenv("REDIS_MAXACTIVECONN", "100") counter := 0 q := NewQueues() ctx, cancel := context.WithCancel(context.Background()) defer cancel() var wg sync.WaitGroup go q.Listen(ctx, func(s string, b []byte) error { counter++ t.Logf("received call from channel %v, counter %d data %v", s, counter, string(b)) if counter%2 != 0 { return errors.New("fake error") } return nil }) q.Add("TESTVIN123") for i := 0; i < 95; i++ { wg.Add(1) go func(i int) { conn := q.connection.GetFromPool() err := conn.SafeQueueMessage("TESTVIN123", fmt.Sprintf("hello fisker! %d", i)) conn.Close() if err != nil { t.Errorf(testhelper.TestErrorTemplate, "TestConnQueueMessage", nil, err) } time.Sleep(5 * time.Second) wg.Done() }(i) } wg.Wait() t.Log("success") time.Sleep(25 * time.Second) t.Fail() }