Refactor kafka to pure Go (franz-go), fix DBC stubs, update Dockerfile
This commit is contained in:
@@ -3,209 +3,262 @@ package kafka
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"fiskerinc.com/modules/logger"
|
||||
"fiskerinc.com/modules/utils/envtool"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
"github.com/fiskerinc/cloud-services/pkg/logger"
|
||||
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
)
|
||||
|
||||
const KafkaTimeout = 30000 //ms
|
||||
// BaseConsumerInterface interface for Consumer utility
|
||||
//
|
||||
// NOTE: DOES NOT AUTO COMMIT OFFSETS
|
||||
const KafkaTimeout = 30000 // ms
|
||||
|
||||
// BaseConsumerInterface interface for Consumer utility with manual commit control
|
||||
type BaseConsumerInterface interface {
|
||||
Check(ctx context.Context) error
|
||||
Commit(message *kafka.Message) ([]kafka.TopicPartition, error)
|
||||
CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
|
||||
ConsumeToChannel(topics []string, events chan *kafka.Message) error
|
||||
ConsumeOrRebalancedCatch(topics []string, events chan *kafka.Message, reb chan struct{}) error
|
||||
LastOffsetConsumed(topic string, partition int32) (kafka.Offset, error)
|
||||
Seek(topic string, offset kafka.Offset) error
|
||||
Commit(message *Message) ([]TopicPartition, error)
|
||||
CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
|
||||
ConsumeToChannel(topics []string, events chan *Message) error
|
||||
ConsumeOrRebalancedCatch(topics []string, events chan *Message, reb chan struct{}) error
|
||||
LastOffsetConsumed(topic string, partition int32) (int64, error)
|
||||
Seek(topic string, offset int64) error
|
||||
Stop()
|
||||
Subscribe(topics []string) error
|
||||
}
|
||||
|
||||
// NoncommitalConsumer utility to produce messages
|
||||
// BaseConsumer utility for consuming with manual offset control
|
||||
type BaseConsumer struct {
|
||||
consumer *kafka.Consumer
|
||||
client *kgo.Client
|
||||
running bool
|
||||
timeout int
|
||||
timeout time.Duration
|
||||
connected bool
|
||||
connectedLock sync.RWMutex
|
||||
pollingError error
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
lastOffsets map[string]map[int32]int64
|
||||
offsetsMu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewConsumer serves as factory method for consumer to kafka
|
||||
func NewBaseConsumer(serviceName string, minBufSize int, fetchMaxWaitTimems int, auto_commit bool) (BaseConsumerInterface, error) {
|
||||
var consumer *kafka.Consumer
|
||||
kafkaConfigMap := loadKafkaConsumerConfig(serviceName)
|
||||
kafkaConfigMap.SetKey("enable.auto.commit", auto_commit)
|
||||
if minBufSize != -1 {
|
||||
kafkaConfigMap.SetKey("fetch.min.bytes", minBufSize)
|
||||
// NewBaseConsumer serves as factory method for consumer with manual commit
|
||||
func NewBaseConsumer(serviceName string, minBufSize int, fetchMaxWaitTimems int, autoCommit bool) (BaseConsumerInterface, error) {
|
||||
cfg := LoadConfig()
|
||||
opts := buildClientOpts(cfg)
|
||||
|
||||
// Consumer-specific options
|
||||
opts = append(opts,
|
||||
kgo.ConsumerGroup(serviceName),
|
||||
kgo.SessionTimeout(cfg.SessionTimeout),
|
||||
kgo.HeartbeatInterval(cfg.HeartbeatInterval),
|
||||
)
|
||||
|
||||
if autoCommit {
|
||||
opts = append(opts, kgo.AutoCommitInterval(
|
||||
time.Duration(envtool.GetEnvInt("KAFKA_AUTO_COMMIT_INTERVAL_MS", 5000))*time.Millisecond,
|
||||
))
|
||||
} else {
|
||||
opts = append(opts, kgo.DisableAutoCommit())
|
||||
}
|
||||
|
||||
consumer, err := kafka.NewConsumer(&kafkaConfigMap)
|
||||
kafkaHosts := envtool.GetEnv("KAFKA_HOSTS", "localhost:9093")
|
||||
logger.Info().Msgf("NewNoncommitalConsumer hosts: %s", kafkaHosts)
|
||||
if minBufSize > 0 {
|
||||
opts = append(opts, kgo.FetchMinBytes(int32(minBufSize)))
|
||||
}
|
||||
|
||||
if fetchMaxWaitTimems > 0 {
|
||||
opts = append(opts, kgo.FetchMaxWait(time.Duration(fetchMaxWaitTimems)*time.Millisecond))
|
||||
}
|
||||
|
||||
client, err := kgo.NewClient(opts...)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.Info().Msgf("NewBaseConsumer hosts: %v group: %s", cfg.Brokers, serviceName)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &BaseConsumer{
|
||||
consumer: consumer,
|
||||
timeout: envtool.GetEnvInt("KAFKA_TIMEOUT", 10000),
|
||||
client: client,
|
||||
timeout: time.Duration(envtool.GetEnvInt("KAFKA_TIMEOUT", 10000)) * time.Millisecond,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
lastOffsets: make(map[string]map[int32]int64),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Subscribe subscribes to topics
|
||||
func (c *BaseConsumer) Subscribe(topics []string) error {
|
||||
err := c.consumer.SubscribeTopics(topics, nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
c.client.AddConsumeTopics(topics...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConsumeToChannel runs a poll loop which waits for messages to consume
|
||||
func (c *BaseConsumer) ConsumeToChannel(topics []string, events chan *kafka.Message) error {
|
||||
func (c *BaseConsumer) ConsumeToChannel(topics []string, events chan *Message) error {
|
||||
c.running = true
|
||||
|
||||
c.setConnected(true)
|
||||
defer c.setConnected(false)
|
||||
|
||||
for c.running {
|
||||
e := c.consumer.Poll(c.timeout)
|
||||
|
||||
switch msg := e.(type) {
|
||||
case *kafka.Message:
|
||||
events <- msg
|
||||
msg = nil
|
||||
c.setConnected(true)
|
||||
case kafka.Error:
|
||||
c.setConnected(false)
|
||||
logger.Error().Msg(e.String())
|
||||
c.pollingError = errors.WithStack(errors.New(e.String()))
|
||||
return c.pollingError
|
||||
fetches := c.client.PollFetches(c.ctx)
|
||||
if errs := fetches.Errors(); len(errs) > 0 {
|
||||
for _, e := range errs {
|
||||
if e.Err == context.Canceled {
|
||||
return nil
|
||||
}
|
||||
logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic)
|
||||
c.setConnected(false)
|
||||
c.pollingError = errors.WithStack(e.Err)
|
||||
return c.pollingError
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
c.setConnected(true)
|
||||
fetches.EachRecord(func(r *kgo.Record) {
|
||||
c.trackOffset(r.Topic, r.Partition, r.Offset)
|
||||
events <- recordToMessage(r)
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConsumeOrRebalancedCatch runs a poll loop which waits for messages to consume
|
||||
func (c *BaseConsumer) ConsumeOrRebalancedCatch(topics []string, events chan *kafka.Message, reb chan struct{}) error {
|
||||
// ConsumeOrRebalancedCatch runs a poll loop with rebalance notification
|
||||
func (c *BaseConsumer) ConsumeOrRebalancedCatch(topics []string, events chan *Message, reb chan struct{}) error {
|
||||
c.running = true
|
||||
|
||||
c.setConnected(true)
|
||||
defer c.setConnected(false)
|
||||
|
||||
c.pollingError = nil
|
||||
|
||||
for c.running {
|
||||
e := c.consumer.Poll(c.timeout)
|
||||
switch msg := e.(type) {
|
||||
case *kafka.Message:
|
||||
events <- msg
|
||||
msg = nil
|
||||
c.setConnected(true)
|
||||
|
||||
case kafka.AssignedPartitions:
|
||||
reb <- struct{}{}
|
||||
c.setConnected(true)
|
||||
|
||||
case kafka.Error:
|
||||
c.setConnected(false)
|
||||
c.pollingError = errors.WithStack(errors.New(e.String()))
|
||||
logger.Error().Err(c.pollingError).Send()
|
||||
return c.pollingError
|
||||
fetches := c.client.PollFetches(c.ctx)
|
||||
if errs := fetches.Errors(); len(errs) > 0 {
|
||||
for _, e := range errs {
|
||||
if e.Err == context.Canceled {
|
||||
return nil
|
||||
}
|
||||
logger.Error().Err(e.Err).Send()
|
||||
c.setConnected(false)
|
||||
c.pollingError = errors.WithStack(e.Err)
|
||||
return c.pollingError
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
c.setConnected(true)
|
||||
fetches.EachRecord(func(r *kgo.Record) {
|
||||
c.trackOffset(r.Topic, r.Partition, r.Offset)
|
||||
events <- recordToMessage(r)
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Seek provides a wrapper for private consumer method "Seek"
|
||||
func (c *BaseConsumer) Seek(topic string, offset kafka.Offset) error {
|
||||
err := c.consumer.Seek(kafka.TopicPartition{
|
||||
Topic: &topic,
|
||||
Offset: offset,
|
||||
}, c.timeout)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
// Seek sets the offset for a topic partition
|
||||
func (c *BaseConsumer) Seek(topic string, offset int64) error {
|
||||
offsets := map[string]map[int32]kgo.Offset{
|
||||
topic: {0: kgo.NewOffset().At(offset)},
|
||||
}
|
||||
c.client.AddConsumePartitions(offsets)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *BaseConsumer) commitWithoutMessage() ([]kafka.TopicPartition, error) {
|
||||
partitions, err := c.consumer.Commit()
|
||||
if err != nil {
|
||||
return partitions, errors.WithStack(err)
|
||||
}
|
||||
return partitions, nil
|
||||
}
|
||||
|
||||
func (c *BaseConsumer) Commit(message *kafka.Message) ([]kafka.TopicPartition, error) {
|
||||
// Commit commits the offset for a message
|
||||
func (c *BaseConsumer) Commit(message *Message) ([]TopicPartition, error) {
|
||||
if message == nil {
|
||||
return c.commitWithoutMessage()
|
||||
// Commit all uncommitted
|
||||
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
partitions, err := c.consumer.CommitMessage(message)
|
||||
if err != nil {
|
||||
return partitions, errors.WithStack(err)
|
||||
|
||||
// Mark the record as consumed and commit
|
||||
c.client.MarkCommitRecords(&kgo.Record{
|
||||
Topic: message.Topic,
|
||||
Partition: message.Partition,
|
||||
Offset: message.Offset,
|
||||
})
|
||||
|
||||
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return partitions, nil
|
||||
|
||||
return []TopicPartition{{
|
||||
Topic: message.Topic,
|
||||
Partition: message.Partition,
|
||||
Offset: message.Offset + 1,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (c *BaseConsumer) LastOffsetConsumed(topic string, partition int32) (kafka.Offset, error) {
|
||||
partitions, err := c.consumer.Position([]kafka.TopicPartition{{
|
||||
Topic: &topic,
|
||||
Partition: partition,
|
||||
}})
|
||||
if err != nil {
|
||||
return -1, errors.WithStack(err)
|
||||
} else if len(partitions) != 1 {
|
||||
return -1, errors.Errorf("no subscription for topic %s", topic)
|
||||
// CommitOffsets commits specific offsets
|
||||
func (c *BaseConsumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
|
||||
for _, o := range offsets {
|
||||
c.client.MarkCommitRecords(&kgo.Record{
|
||||
Topic: o.Topic,
|
||||
Partition: o.Partition,
|
||||
Offset: o.Offset - 1, // MarkCommitRecords expects the record offset, not next
|
||||
})
|
||||
}
|
||||
return partitions[0].Offset, nil
|
||||
|
||||
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return offsets, nil
|
||||
}
|
||||
|
||||
// Stop stops the poll loop running
|
||||
func (c *BaseConsumer) Stop() {
|
||||
if c.consumer != nil {
|
||||
c.running = false
|
||||
if err := c.consumer.Close(); err != nil {
|
||||
logger.Warn().Err(err).Send()
|
||||
// LastOffsetConsumed returns the last consumed offset for a partition
|
||||
func (c *BaseConsumer) LastOffsetConsumed(topic string, partition int32) (int64, error) {
|
||||
c.offsetsMu.RLock()
|
||||
defer c.offsetsMu.RUnlock()
|
||||
|
||||
if topicOffsets, ok := c.lastOffsets[topic]; ok {
|
||||
if offset, ok := topicOffsets[partition]; ok {
|
||||
return offset, nil
|
||||
}
|
||||
}
|
||||
return -1, errors.Errorf("no subscription for topic %s partition %d", topic, partition)
|
||||
}
|
||||
|
||||
// very basic implementation for health_check
|
||||
// Stop stops the poll loop
|
||||
func (c *BaseConsumer) Stop() {
|
||||
c.running = false
|
||||
c.cancel()
|
||||
if c.client != nil {
|
||||
c.client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Check verifies consumer connectivity
|
||||
func (c *BaseConsumer) Check(ctx context.Context) error {
|
||||
if c.pollingError != nil {
|
||||
return c.pollingError
|
||||
}
|
||||
|
||||
if !c.isConnected() {
|
||||
return errors.New("consumer isn't connected to Kafka")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *BaseConsumer) setConnected(cntd bool) {
|
||||
c.connectedLock.Lock()
|
||||
defer c.connectedLock.Unlock()
|
||||
|
||||
c.connected = cntd
|
||||
}
|
||||
|
||||
func (c *BaseConsumer) isConnected() bool {
|
||||
c.connectedLock.RLock()
|
||||
defer c.connectedLock.RUnlock()
|
||||
|
||||
return c.connected
|
||||
}
|
||||
|
||||
// passthrough function for CommitOffsets()
|
||||
func (c *BaseConsumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) {
|
||||
return c.consumer.CommitOffsets(offsets)
|
||||
func (c *BaseConsumer) trackOffset(topic string, partition int32, offset int64) {
|
||||
c.offsetsMu.Lock()
|
||||
defer c.offsetsMu.Unlock()
|
||||
|
||||
if c.lastOffsets[topic] == nil {
|
||||
c.lastOffsets[topic] = make(map[int32]int64)
|
||||
}
|
||||
c.lastOffsets[topic][partition] = offset
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user