package tests /* This file implements benchmarks for 1. Batch handling of messages. The intention is to measure performance of appending rows to InsertBuffer 2. Batch serialization of messages The intention is to measure performance of preparing data for clickhouse insertion Because of the size of the input data for this benchmark, it is recommended to set JETFIRE_BUFFER_MAX_BYTES=1073741824 Or more; otherwise the benchmark may hang while waiting for the nonexistant inserter thread to flush the buffer. */ import ( "github.com/fiskerinc/cloud-services/services/jetfire/handlers" "github.com/fiskerinc/cloud-services/services/jetfire/services" "os" "testing" "github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc" "github.com/ClickHouse/ch-go/proto" "github.com/intel-go/fastjson" ) var benchmarkJSONPayload = []byte{} var benchmarkBatchPayload = []kafka_grpc.GRPC_CANSignal{} var benchmarkBatchPtrs = []*kafka_grpc.GRPC_CANSignal{} func benchInit() { //1176 messages long jsonPath := "test-batch-msg.json" os.Chdir("./tests/") data, err := os.ReadFile(jsonPath) if err != nil { panic(err) } benchmarkJSONPayload = data fastjson.Unmarshal(data, &benchmarkBatchPayload) benchmarkBatchPtrs = make([]*kafka_grpc.GRPC_CANSignal, len(benchmarkBatchPayload)) for i := range benchmarkBatchPayload { benchmarkBatchPtrs[i] = &benchmarkBatchPayload[i] } services.ResetCacheVars() } func benchmarkMessageHandler(batchData []*kafka_grpc.GRPC_CANSignal, b *testing.B) { cache := services.GetVehicleCache() for i := 0; i < b.N; i++ { handlers.HandleSignalBatch(batchData, cache, nil) } b.StopTimer() } func BenchmarkMessageHandler(b *testing.B) { benchInit() b.ResetTimer() benchmarkMessageHandler(benchmarkBatchPtrs, b) } func BenchmarkSignalSerialization(b *testing.B) { benchInit() cache := services.GetVehicleCache() handlers.HandleSignalBatch(benchmarkBatchPtrs, cache, nil) b.ResetTimer() dummy := proto.Input{} serializedLength := 0 rowsLength := 0 for i := 0; i < b.N; i++ { dummy = services.GetVehicleSignalBatch().GetInput() serializedLength += len(dummy) rowsLength += services.GetVehicleSignalBatch().Len() } b.StopTimer() } func BenchmarkFeatureSerialization(b *testing.B) { benchInit() cache := services.GetVehicleCache() handlers.HandleSignalBatch(benchmarkBatchPtrs, cache, nil) b.ResetTimer() dummy := proto.Input{} serializedLength := 0 rowsLength := 0 for i := 0; i < b.N; i++ { dummy = services.GetFeatureBatch().GetInput() serializedLength += len(dummy) rowsLength += services.GetFeatureBatch().Len() } b.StopTimer() }