package tests import ( "context" "fmt" "github.com/fiskerinc/cloud-services/services/jetfire/server" "github.com/fiskerinc/cloud-services/services/jetfire/services" "github.com/fiskerinc/cloud-services/services/jetfire/utils" "os" "testing" "time" "github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc" "github.com/fiskerinc/cloud-services/pkg/kafka" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/intel-go/fastjson" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" ) var batchDataSignal = []kafka_grpc.GRPC_CANSignal{} var dataToPublish kafka_grpc.GRPC_CANSignalBatchPayload var startTimestamp = time.Now().UTC() func TestIntegration(t *testing.T) { t.Skip() ctx, cancel := context.WithCancel(context.Background()) defer cancel() //cleaning up previous run conn, err := services.GetClickhouseConnection() if err != nil { logger.Error().Err(err) } startTimestamp = time.Now().UTC() timestampString := fmt.Sprintf( "%d-%d-%d %d:%d:%d", startTimestamp.Year(), startTimestamp.Month(), startTimestamp.Day(), startTimestamp.Hour(), startTimestamp.Minute(), startTimestamp.Second(), ) println("cleaning up previous run") conn.Exec(ctx, fmt.Sprintf("ALTER TABLE %s DELETE WHERE VIN=='%s' AND Timestamp<'%s'", services.FEATURE_TABLE, testVIN, timestampString)) conn.Exec(ctx, fmt.Sprintf("ALTER TABLE %s DELETE WHERE VIN=='%s' AND Timestamp<'%s'", services.VEHICLE_SIGNAL_TABLE, testVIN, timestampString)) // conn.Ping(ctx) //initialization cache := services.GetVehicleCache() cache.Clear() readTestData() producer, err := kafka.NewAsyncProducer(ctx) assert.Nil(t, err) // runJetfire(ctx) publishDuration := int64(10) testDuration := time.Duration(publishDuration*2) * time.Second //batch inserts take some time to run. give it some time... // begin publishing kafka data. Update message timestamps first. grpcData, err := proto.Marshal(&dataToPublish) assert.Nil(t, err) err = producer.ProduceBinary(kafka.VehicleSignal, testVIN, grpcData, nil) assert.Nil(t, err) //wait a while since kafka has to rebalance, wait for clickhouse inserts to trigger time.Sleep(testDuration) //check clickhouse, count rows println("querying clickhouse feature...") query := fmt.Sprintf("SELECT VIN, Timestamp FROM %s WHERE VIN=='%s' AND Timestamp>='%s'", services.FEATURE_TABLE, testVIN, timestampString, ) checkRows(query, 1, t) println("querying clickhouse vehicle_signal...") query = fmt.Sprintf("SELECT VIN, Timestamp FROM %s WHERE VIN=='%s' AND Timestamp>='%s'", services.VEHICLE_SIGNAL_TABLE, testVIN, timestampString, ) checkRows(query, 1000, t) } func checkRows(query string, expected int, t *testing.T) { conn, err := services.GetClickhouseConnection() if err != nil { logger.Error().Err(err) return } fmt.Println(query) rows, err := conn.Query(context.Background(), query) assert.Nil(t, err) count := int(0) defer rows.Close() for rows.Next() { count++ } assert.GreaterOrEqual(t, count, expected) } func readTestData() { //1176 messages long jsonPath := "./test-batch-msg.json" data, err := os.ReadFile(jsonPath) if err != nil { panic(err) } fastjson.Unmarshal(data, &batchDataSignal) dataPtr := make([]*kafka_grpc.GRPC_CANSignal, len(batchDataSignal)) offset := -1.0 for i := range batchDataSignal { dataPtr[i] = &batchDataSignal[i] // find min timestamp in batch data if offset < 0 || offset > batchDataSignal[i].Timestamp { offset = dataPtr[i].Timestamp } dataPtr[i].Vin = testVIN // in case we need to chagne the test vin to follow proper pattern } for i := range batchDataSignal { dataPtr[i].Timestamp += utils.TimeToFloat(startTimestamp) - offset } dataToPublish = kafka_grpc.GRPC_CANSignalBatchPayload{Data: &kafka_grpc.GRPC_CANSignalData{ Cansignals: dataPtr, }} } func runJetfire(ctx context.Context) { // first initialize and run jetfire application loops services.ResetCacheVars() go server.StartConsumer(ctx, kafka.VehicleSignal) }