Files
cloud-services/pkg/kafka/base_consumer.go

265 lines
6.9 KiB
Go

package kafka
import (
"context"
"sync"
"time"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
"github.com/pkg/errors"
"github.com/twmb/franz-go/pkg/kgo"
)
const KafkaTimeout = 30000 // ms
// BaseConsumerInterface interface for Consumer utility with manual commit control
type BaseConsumerInterface interface {
Check(ctx context.Context) error
Commit(message *Message) ([]TopicPartition, error)
CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
ConsumeToChannel(topics []string, events chan *Message) error
ConsumeOrRebalancedCatch(topics []string, events chan *Message, reb chan struct{}) error
LastOffsetConsumed(topic string, partition int32) (int64, error)
Seek(topic string, offset int64) error
Stop()
Subscribe(topics []string) error
}
// BaseConsumer utility for consuming with manual offset control
type BaseConsumer struct {
client *kgo.Client
running bool
timeout time.Duration
connected bool
connectedLock sync.RWMutex
pollingError error
ctx context.Context
cancel context.CancelFunc
lastOffsets map[string]map[int32]int64
offsetsMu sync.RWMutex
}
// NewBaseConsumer serves as factory method for consumer with manual commit
func NewBaseConsumer(serviceName string, minBufSize int, fetchMaxWaitTimems int, autoCommit bool) (BaseConsumerInterface, error) {
cfg := LoadConfig()
opts := buildClientOpts(cfg)
// Consumer-specific options
opts = append(opts,
kgo.ConsumerGroup(serviceName),
kgo.SessionTimeout(cfg.SessionTimeout),
kgo.HeartbeatInterval(cfg.HeartbeatInterval),
)
if autoCommit {
opts = append(opts, kgo.AutoCommitInterval(
time.Duration(envtool.GetEnvInt("KAFKA_AUTO_COMMIT_INTERVAL_MS", 5000))*time.Millisecond,
))
} else {
opts = append(opts, kgo.DisableAutoCommit())
}
if minBufSize > 0 {
opts = append(opts, kgo.FetchMinBytes(int32(minBufSize)))
}
if fetchMaxWaitTimems > 0 {
opts = append(opts, kgo.FetchMaxWait(time.Duration(fetchMaxWaitTimems)*time.Millisecond))
}
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, errors.WithStack(err)
}
logger.Info().Msgf("NewBaseConsumer hosts: %v group: %s", cfg.Brokers, serviceName)
ctx, cancel := context.WithCancel(context.Background())
return &BaseConsumer{
client: client,
timeout: time.Duration(envtool.GetEnvInt("KAFKA_TIMEOUT", 10000)) * time.Millisecond,
ctx: ctx,
cancel: cancel,
lastOffsets: make(map[string]map[int32]int64),
}, nil
}
// Subscribe subscribes to topics
func (c *BaseConsumer) Subscribe(topics []string) error {
c.client.AddConsumeTopics(topics...)
return nil
}
// ConsumeToChannel runs a poll loop which waits for messages to consume
func (c *BaseConsumer) ConsumeToChannel(topics []string, events chan *Message) error {
c.running = true
c.setConnected(true)
defer c.setConnected(false)
for c.running {
fetches := c.client.PollFetches(c.ctx)
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if e.Err == context.Canceled {
return nil
}
logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic)
c.setConnected(false)
c.pollingError = errors.WithStack(e.Err)
return c.pollingError
}
continue
}
c.setConnected(true)
fetches.EachRecord(func(r *kgo.Record) {
c.trackOffset(r.Topic, r.Partition, r.Offset)
events <- recordToMessage(r)
})
}
return nil
}
// ConsumeOrRebalancedCatch runs a poll loop with rebalance notification
func (c *BaseConsumer) ConsumeOrRebalancedCatch(topics []string, events chan *Message, reb chan struct{}) error {
c.running = true
c.setConnected(true)
defer c.setConnected(false)
c.pollingError = nil
for c.running {
fetches := c.client.PollFetches(c.ctx)
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if e.Err == context.Canceled {
return nil
}
logger.Error().Err(e.Err).Send()
c.setConnected(false)
c.pollingError = errors.WithStack(e.Err)
return c.pollingError
}
continue
}
c.setConnected(true)
fetches.EachRecord(func(r *kgo.Record) {
c.trackOffset(r.Topic, r.Partition, r.Offset)
events <- recordToMessage(r)
})
}
return nil
}
// Seek sets the offset for a topic partition
func (c *BaseConsumer) Seek(topic string, offset int64) error {
offsets := map[string]map[int32]kgo.Offset{
topic: {0: kgo.NewOffset().At(offset)},
}
c.client.AddConsumePartitions(offsets)
return nil
}
// Commit commits the offset for a message
func (c *BaseConsumer) Commit(message *Message) ([]TopicPartition, error) {
if message == nil {
// Commit all uncommitted
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
return nil, errors.WithStack(err)
}
return nil, nil
}
// Mark the record as consumed and commit
c.client.MarkCommitRecords(&kgo.Record{
Topic: message.Topic,
Partition: message.Partition,
Offset: message.Offset,
})
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
return nil, errors.WithStack(err)
}
return []TopicPartition{{
Topic: message.Topic,
Partition: message.Partition,
Offset: message.Offset + 1,
}}, nil
}
// CommitOffsets commits specific offsets
func (c *BaseConsumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
for _, o := range offsets {
c.client.MarkCommitRecords(&kgo.Record{
Topic: o.Topic,
Partition: o.Partition,
Offset: o.Offset - 1, // MarkCommitRecords expects the record offset, not next
})
}
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
return nil, errors.WithStack(err)
}
return offsets, nil
}
// LastOffsetConsumed returns the last consumed offset for a partition
func (c *BaseConsumer) LastOffsetConsumed(topic string, partition int32) (int64, error) {
c.offsetsMu.RLock()
defer c.offsetsMu.RUnlock()
if topicOffsets, ok := c.lastOffsets[topic]; ok {
if offset, ok := topicOffsets[partition]; ok {
return offset, nil
}
}
return -1, errors.Errorf("no subscription for topic %s partition %d", topic, partition)
}
// Stop stops the poll loop
func (c *BaseConsumer) Stop() {
c.running = false
c.cancel()
if c.client != nil {
c.client.Close()
}
}
// Check verifies consumer connectivity
func (c *BaseConsumer) Check(ctx context.Context) error {
if c.pollingError != nil {
return c.pollingError
}
if !c.isConnected() {
return errors.New("consumer isn't connected to Kafka")
}
return nil
}
func (c *BaseConsumer) setConnected(cntd bool) {
c.connectedLock.Lock()
defer c.connectedLock.Unlock()
c.connected = cntd
}
func (c *BaseConsumer) isConnected() bool {
c.connectedLock.RLock()
defer c.connectedLock.RUnlock()
return c.connected
}
func (c *BaseConsumer) trackOffset(topic string, partition int32, offset int64) {
c.offsetsMu.Lock()
defer c.offsetsMu.Unlock()
if c.lastOffsets[topic] == nil {
c.lastOffsets[topic] = make(map[int32]int64)
}
c.lastOffsets[topic][partition] = offset
}