Files
cloud-services/services/jetfire/tests/integration_test.go

152 lines
3.9 KiB
Go

package tests
import (
"context"
"fmt"
"github.com/fiskerinc/cloud-services/services/jetfire/server"
"github.com/fiskerinc/cloud-services/services/jetfire/services"
"github.com/fiskerinc/cloud-services/services/jetfire/utils"
"os"
"testing"
"time"
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
"github.com/fiskerinc/cloud-services/pkg/kafka"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/intel-go/fastjson"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)
var batchDataSignal = []kafka_grpc.GRPC_CANSignal{}
var dataToPublish kafka_grpc.GRPC_CANSignalBatchPayload
var startTimestamp = time.Now().UTC()
func TestIntegration(t *testing.T) {
t.Skip()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//cleaning up previous run
conn, err := services.GetClickhouseConnection()
if err != nil {
logger.Error().Err(err)
}
startTimestamp = time.Now().UTC()
timestampString := fmt.Sprintf(
"%d-%d-%d %d:%d:%d",
startTimestamp.Year(),
startTimestamp.Month(),
startTimestamp.Day(),
startTimestamp.Hour(),
startTimestamp.Minute(),
startTimestamp.Second(),
)
println("cleaning up previous run")
conn.Exec(ctx, fmt.Sprintf("ALTER TABLE %s DELETE WHERE VIN=='%s' AND Timestamp<'%s'", services.FEATURE_TABLE, testVIN, timestampString))
conn.Exec(ctx, fmt.Sprintf("ALTER TABLE %s DELETE WHERE VIN=='%s' AND Timestamp<'%s'", services.VEHICLE_SIGNAL_TABLE, testVIN, timestampString))
// conn.Ping(ctx)
//initialization
cache := services.GetVehicleCache()
cache.Clear()
readTestData()
producer, err := kafka.NewAsyncProducer(ctx)
assert.Nil(t, err)
// runJetfire(ctx)
publishDuration := int64(10)
testDuration := time.Duration(publishDuration*2) * time.Second //batch inserts take some time to run. give it some time...
// begin publishing kafka data. Update message timestamps first.
grpcData, err := proto.Marshal(&dataToPublish)
assert.Nil(t, err)
err = producer.ProduceBinary(kafka.VehicleSignal, testVIN, grpcData, nil)
assert.Nil(t, err)
//wait a while since kafka has to rebalance, wait for clickhouse inserts to trigger
time.Sleep(testDuration)
//check clickhouse, count rows
println("querying clickhouse feature...")
query := fmt.Sprintf("SELECT VIN, Timestamp FROM %s WHERE VIN=='%s' AND Timestamp>='%s'",
services.FEATURE_TABLE,
testVIN,
timestampString,
)
checkRows(query, 1, t)
println("querying clickhouse vehicle_signal...")
query = fmt.Sprintf("SELECT VIN, Timestamp FROM %s WHERE VIN=='%s' AND Timestamp>='%s'",
services.VEHICLE_SIGNAL_TABLE,
testVIN,
timestampString,
)
checkRows(query, 1000, t)
}
func checkRows(query string, expected int, t *testing.T) {
conn, err := services.GetClickhouseConnection()
if err != nil {
logger.Error().Err(err)
return
}
fmt.Println(query)
rows, err := conn.Query(context.Background(), query)
assert.Nil(t, err)
count := int(0)
defer rows.Close()
for rows.Next() {
count++
}
assert.GreaterOrEqual(t, count, expected)
}
func readTestData() {
//1176 messages long
jsonPath := "./test-batch-msg.json"
data, err := os.ReadFile(jsonPath)
if err != nil {
panic(err)
}
fastjson.Unmarshal(data, &batchDataSignal)
dataPtr := make([]*kafka_grpc.GRPC_CANSignal, len(batchDataSignal))
offset := -1.0
for i := range batchDataSignal {
dataPtr[i] = &batchDataSignal[i]
// find min timestamp in batch data
if offset < 0 || offset > batchDataSignal[i].Timestamp {
offset = dataPtr[i].Timestamp
}
dataPtr[i].Vin = testVIN // in case we need to chagne the test vin to follow proper pattern
}
for i := range batchDataSignal {
dataPtr[i].Timestamp += utils.TimeToFloat(startTimestamp) - offset
}
dataToPublish = kafka_grpc.GRPC_CANSignalBatchPayload{Data: &kafka_grpc.GRPC_CANSignalData{
Cansignals: dataPtr,
}}
}
func runJetfire(ctx context.Context) {
// first initialize and run jetfire application loops
services.ResetCacheVars()
go server.StartConsumer(ctx, kafka.VehicleSignal)
}