package server import ( "context" "github.com/fiskerinc/cloud-services/services/jetfire/models" "github.com/fiskerinc/cloud-services/services/jetfire/services" "time" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/ClickHouse/ch-go" "github.com/ClickHouse/ch-go/proto" "github.com/pkg/errors" "github.com/fiskerinc/cloud-services/pkg/utils/envtool" ) var ( flushBufferCmd chan interface{} schemaResetPeriod = envtool.GetEnvDuration("JETFIRE_SCHEMA_RESET_PERIOD_MS", 3600000*12) * time.Millisecond ) // loop for Inserter goroutine. // This goroutine is responsible for inserting data into clickhouse func InserterLoop(producerChannel chan models.InsertCommand, ctx context.Context) { resetSchemaTicker := time.NewTicker(schemaResetPeriod) for { select { // reset cache vars every hour case <-resetSchemaTicker.C: logger.Debug().Msgf("<-resetSchemaTicker") services.ResetCacheVars() // process flush buffer command case <-flushBufferCmd: logger.Debug().Msgf("<-flushBufferCmd") //get clickhouse client client := services.GetShardClient() for client == nil || client.IsClosed() { logger.Error().Err(errors.Errorf("bad chgo client , retrying connection...")).Send() time.Sleep(time.Second) services.InitShardClients() client = services.GetShardClient() } logger.Debug().Msgf("InsertFlushAllBuffers??") services.InsertFlushAllBuffers(client) // process data to be inserted to clickhouse case command := <-producerChannel: logger.Debug().Msgf("<-producerChannel") // get clickhouse client client := services.GetShardClient() for client == nil || client.IsClosed() { logger.Error().Err(errors.Errorf("bad chgo client , retrying connection...")).Send() time.Sleep(time.Second) services.InitShardClients() client = services.GetShardClient() } //insert the queued buffer and block in command var err error retryInsert := true count := 0 insertTime := time.Now() for retryInsert { count, err = models.InsertAndFlush(command, ctx, client.GetClient(), client.GetBreaker(), nil) if err == nil { break } logger.Error().Err(errors.WithStack(err)).Send() // if the error is a mismatching schema error, then pull new schemas if ch.IsErr(err, proto.ErrIncompatibleColumns, proto.ErrNoSuchColumnInTable, proto.ErrThereIsNoColumn, proto.ErrIncorrectNumberOfColumns, ) { services.ResetCacheVars() } else { // client.Connect() //close and restart the clickhouse connection client = services.GetShardClient() //grab a new shard client to retry insert } time.Sleep(time.Second) } if count == 0 { continue } //log messages relating to clickhouse insertion logger.Debug().Msgf("done flush Buffer %s, %dms", command.Buffer.TableName, (time.Since(insertTime))/time.Millisecond) if time.Since(insertTime) > 10*time.Second { logger.Warn().Msgf("slow row insertions: took %s to insert %d rows for %s", time.Since(insertTime), count, command.Buffer.TableName) } } } }