Files
cloud-services/services/jetfire/services/chgo.go

202 lines
4.2 KiB
Go

package services
import (
"context"
"fmt"
"math/rand"
"time"
fClickhouse "github.com/fiskerinc/cloud-services/pkg/clickhouse"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/ClickHouse/ch-go"
"github.com/pkg/errors"
"github.com/sony/gobreaker"
"gopkg.in/retry.v1"
)
/*
This file implements an interface for the chgo library.
*/
var (
shardClients []*ChGoClient
)
// Gets a shard client at random
func GetShardClient() *ChGoClient {
if len(shardClients) == 0 {
InitShardClients()
}
if len(shardClients) == 0 {
return nil
}
index := rand.Intn(len(shardClients))
client := shardClients[index]
isInvalid := client.GetBreaker().State() == gobreaker.StateOpen && !client.GetClient().IsClosed()
count := 5
//resample shards N times to look for a shard client that is active and circruit breaker is not open
for isInvalid && count > 0 {
count--
index = rand.Intn(len(shardClients))
client = shardClients[index]
isInvalid = client.GetBreaker().State() == gobreaker.StateOpen && !client.GetClient().IsClosed()
}
if isInvalid {
return nil
}
return client
}
func InitShardClients() {
clear(shardClients)
shardClients = shardClients[:0]
client, err := GetClickhouseConnection()
if err != nil {
logger.Error().Err(err).Send()
}
shards := []ShardInfo{}
err = client.Select(
context.Background(),
&shards,
"SELECT shard_num, replica_num, host_address FROM system.clusters WHERE cluster='default'",
)
if err != nil {
err = errors.WithStack(err)
logger.Error().Err(err).Send()
}
for _, s := range shards {
shardName := fmt.Sprintf("%d-%d", s.ShardNum, s.ReplicaNum)
logger.Debug().Msgf("Creating new shard connection %s, %s", s.HostAddress, shardName)
client, _ := NewChgoClient(
s.HostAddress,
fClickhouse.CLICKHOUSE_PORT,
shardName,
fClickhouse.CLICKHOUSE_DB,
fClickhouse.CLICKHOUSE_USER,
fClickhouse.CLICKHOUSE_PASS,
)
shardClients = append(shardClients, client)
}
logger.Info().Msgf("Connected to %d shards", len(shardClients))
}
type ChGoClient struct {
client *ch.Client
shardName string
retry retry.Strategy //retry is used for connecting and reconnecting
breaker *gobreaker.CircuitBreaker //circuit breaker is only used for insertions
ch_host string
ch_port string
ch_db string
ch_user string
ch_pass string
}
func NewChgoClient(ch_host string, ch_port string, ch_shard string, ch_db string, ch_user string, ch_pass string) (*ChGoClient, error) {
newClient := ChGoClient{
shardName: ch_shard,
ch_host: ch_host,
ch_port: ch_port,
ch_db: ch_db,
ch_user: ch_user,
ch_pass: ch_pass,
}
newClient.retry = retry.LimitTime(
120*time.Second,
retry.Exponential{
Initial: 100 * time.Millisecond,
},
)
err := newClient.Connect()
if err != nil {
return nil, errors.WithStack(err)
}
return &newClient, nil
}
func (client *ChGoClient) Connect() error {
if client.client != nil && !client.client.IsClosed() {
client.client.Close()
}
var err error
var newConn *ch.Client
client.breaker = gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "clickhouse",
MaxRequests: 1,
Interval: time.Minute * 1,
Timeout: time.Minute * 10,
})
for a := retry.Start(client.retry, nil); a.Next(); {
newConn, err = ch.Dial(
context.Background(),
ch.Options{
Address: fmt.Sprintf("%s:%s", client.ch_host, client.ch_port),
Database: client.ch_db,
User: client.ch_user,
Password: client.ch_pass,
DialTimeout: 1 * time.Minute,
},
)
if err == nil {
client.client = newConn
return nil
}
}
err = errors.WithStack(err)
return err
}
func (c *ChGoClient) IsClosed() bool {
if c.client == nil {
return true
}
return c.client.IsClosed()
}
func (c *ChGoClient) Close() error {
if c.client == nil {
return nil
}
return c.client.Close()
}
func (c *ChGoClient) GetClient() *ch.Client {
return c.client
}
func (c *ChGoClient) GetShardName() string {
return c.shardName
}
func (c *ChGoClient) GetBreaker() *gobreaker.CircuitBreaker {
return c.breaker
}
// helper struct for querying clickhouse shard info
type ShardInfo struct {
ShardNum uint32 `ch:"shard_num"`
ReplicaNum uint32 `ch:"replica_num"`
HostAddress string `ch:"host_address"`
}