Files

104 lines
2.6 KiB
Go

package tests
/*
This file implements benchmarks for
1. Batch handling of messages.
The intention is to measure performance of appending rows to InsertBuffer
2. Batch serialization of messages
The intention is to measure performance of preparing data for clickhouse insertion
Because of the size of the input data for this benchmark, it is recommended to set
JETFIRE_BUFFER_MAX_BYTES=1073741824
Or more; otherwise the benchmark may hang while waiting for the nonexistant inserter thread to flush the buffer.
*/
import (
"github.com/fiskerinc/cloud-services/services/jetfire/handlers"
"github.com/fiskerinc/cloud-services/services/jetfire/services"
"os"
"testing"
"github.com/fiskerinc/cloud-services/pkg/grpc/kafka_grpc"
"github.com/ClickHouse/ch-go/proto"
"github.com/intel-go/fastjson"
)
var benchmarkJSONPayload = []byte{}
var benchmarkBatchPayload = []kafka_grpc.GRPC_CANSignal{}
var benchmarkBatchPtrs = []*kafka_grpc.GRPC_CANSignal{}
func benchInit() {
//1176 messages long
jsonPath := "test-batch-msg.json"
os.Chdir("./tests/")
data, err := os.ReadFile(jsonPath)
if err != nil {
panic(err)
}
benchmarkJSONPayload = data
fastjson.Unmarshal(data, &benchmarkBatchPayload)
benchmarkBatchPtrs = make([]*kafka_grpc.GRPC_CANSignal, len(benchmarkBatchPayload))
for i := range benchmarkBatchPayload {
benchmarkBatchPtrs[i] = &benchmarkBatchPayload[i]
}
services.ResetCacheVars()
}
func benchmarkMessageHandler(batchData []*kafka_grpc.GRPC_CANSignal, b *testing.B) {
cache := services.GetVehicleCache()
for i := 0; i < b.N; i++ {
handlers.HandleSignalBatch(batchData, cache, nil)
}
b.StopTimer()
}
func BenchmarkMessageHandler(b *testing.B) {
benchInit()
b.ResetTimer()
benchmarkMessageHandler(benchmarkBatchPtrs, b)
}
func BenchmarkSignalSerialization(b *testing.B) {
benchInit()
cache := services.GetVehicleCache()
handlers.HandleSignalBatch(benchmarkBatchPtrs, cache, nil)
b.ResetTimer()
dummy := proto.Input{}
serializedLength := 0
rowsLength := 0
for i := 0; i < b.N; i++ {
dummy = services.GetVehicleSignalBatch().GetInput()
serializedLength += len(dummy)
rowsLength += services.GetVehicleSignalBatch().Len()
}
b.StopTimer()
}
func BenchmarkFeatureSerialization(b *testing.B) {
benchInit()
cache := services.GetVehicleCache()
handlers.HandleSignalBatch(benchmarkBatchPtrs, cache, nil)
b.ResetTimer()
dummy := proto.Input{}
serializedLength := 0
rowsLength := 0
for i := 0; i < b.N; i++ {
dummy = services.GetFeatureBatch().GetInput()
serializedLength += len(dummy)
rowsLength += services.GetFeatureBatch().Len()
}
b.StopTimer()
}