100 lines
3.0 KiB
Go
100 lines
3.0 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"github.com/fiskerinc/cloud-services/services/jetfire/models"
|
|
"github.com/fiskerinc/cloud-services/services/jetfire/services"
|
|
"time"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/ClickHouse/ch-go"
|
|
"github.com/ClickHouse/ch-go/proto"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
|
|
)
|
|
|
|
var (
|
|
flushBufferCmd chan interface{}
|
|
|
|
schemaResetPeriod = envtool.GetEnvDuration("JETFIRE_SCHEMA_RESET_PERIOD_MS", 3600000*12) * time.Millisecond
|
|
)
|
|
|
|
// loop for Inserter goroutine.
|
|
// This goroutine is responsible for inserting data into clickhouse
|
|
func InserterLoop(producerChannel chan models.InsertCommand, ctx context.Context) {
|
|
resetSchemaTicker := time.NewTicker(schemaResetPeriod)
|
|
for {
|
|
select {
|
|
// reset cache vars every hour
|
|
case <-resetSchemaTicker.C:
|
|
logger.Debug().Msgf("<-resetSchemaTicker")
|
|
services.ResetCacheVars()
|
|
|
|
// process flush buffer command
|
|
case <-flushBufferCmd:
|
|
logger.Debug().Msgf("<-flushBufferCmd")
|
|
|
|
//get clickhouse client
|
|
client := services.GetShardClient()
|
|
for client == nil || client.IsClosed() {
|
|
logger.Error().Err(errors.Errorf("bad chgo client , retrying connection...")).Send()
|
|
time.Sleep(time.Second)
|
|
services.InitShardClients()
|
|
client = services.GetShardClient()
|
|
}
|
|
|
|
logger.Debug().Msgf("InsertFlushAllBuffers??")
|
|
services.InsertFlushAllBuffers(client)
|
|
|
|
// process data to be inserted to clickhouse
|
|
case command := <-producerChannel:
|
|
logger.Debug().Msgf("<-producerChannel")
|
|
// get clickhouse client
|
|
client := services.GetShardClient()
|
|
for client == nil || client.IsClosed() {
|
|
logger.Error().Err(errors.Errorf("bad chgo client , retrying connection...")).Send()
|
|
time.Sleep(time.Second)
|
|
services.InitShardClients()
|
|
client = services.GetShardClient()
|
|
}
|
|
|
|
//insert the queued buffer and block in command
|
|
var err error
|
|
retryInsert := true
|
|
count := 0
|
|
insertTime := time.Now()
|
|
for retryInsert {
|
|
count, err = models.InsertAndFlush(command, ctx, client.GetClient(), client.GetBreaker(), nil)
|
|
if err == nil {
|
|
break
|
|
}
|
|
|
|
logger.Error().Err(errors.WithStack(err)).Send()
|
|
// if the error is a mismatching schema error, then pull new schemas
|
|
if ch.IsErr(err,
|
|
proto.ErrIncompatibleColumns,
|
|
proto.ErrNoSuchColumnInTable,
|
|
proto.ErrThereIsNoColumn,
|
|
proto.ErrIncorrectNumberOfColumns,
|
|
) {
|
|
services.ResetCacheVars()
|
|
} else {
|
|
// client.Connect() //close and restart the clickhouse connection
|
|
client = services.GetShardClient() //grab a new shard client to retry insert
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
if count == 0 {
|
|
continue
|
|
}
|
|
|
|
//log messages relating to clickhouse insertion
|
|
logger.Debug().Msgf("done flush Buffer %s, %dms", command.Buffer.TableName, (time.Since(insertTime))/time.Millisecond)
|
|
if time.Since(insertTime) > 10*time.Second {
|
|
logger.Warn().Msgf("slow row insertions: took %s to insert %d rows for %s", time.Since(insertTime), count, command.Buffer.TableName)
|
|
}
|
|
}
|
|
}
|
|
}
|