265 lines
6.9 KiB
Go
265 lines
6.9 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
|
|
"github.com/pkg/errors"
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
)
|
|
|
|
const KafkaTimeout = 30000 // ms
|
|
|
|
// BaseConsumerInterface interface for Consumer utility with manual commit control
|
|
type BaseConsumerInterface interface {
|
|
Check(ctx context.Context) error
|
|
Commit(message *Message) ([]TopicPartition, error)
|
|
CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
|
|
ConsumeToChannel(topics []string, events chan *Message) error
|
|
ConsumeOrRebalancedCatch(topics []string, events chan *Message, reb chan struct{}) error
|
|
LastOffsetConsumed(topic string, partition int32) (int64, error)
|
|
Seek(topic string, offset int64) error
|
|
Stop()
|
|
Subscribe(topics []string) error
|
|
}
|
|
|
|
// BaseConsumer utility for consuming with manual offset control
|
|
type BaseConsumer struct {
|
|
client *kgo.Client
|
|
running bool
|
|
timeout time.Duration
|
|
connected bool
|
|
connectedLock sync.RWMutex
|
|
pollingError error
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
lastOffsets map[string]map[int32]int64
|
|
offsetsMu sync.RWMutex
|
|
}
|
|
|
|
// NewBaseConsumer serves as factory method for consumer with manual commit
|
|
func NewBaseConsumer(serviceName string, minBufSize int, fetchMaxWaitTimems int, autoCommit bool) (BaseConsumerInterface, error) {
|
|
cfg := LoadConfig()
|
|
opts := buildClientOpts(cfg)
|
|
|
|
// Consumer-specific options
|
|
opts = append(opts,
|
|
kgo.ConsumerGroup(serviceName),
|
|
kgo.SessionTimeout(cfg.SessionTimeout),
|
|
kgo.HeartbeatInterval(cfg.HeartbeatInterval),
|
|
)
|
|
|
|
if autoCommit {
|
|
opts = append(opts, kgo.AutoCommitInterval(
|
|
time.Duration(envtool.GetEnvInt("KAFKA_AUTO_COMMIT_INTERVAL_MS", 5000))*time.Millisecond,
|
|
))
|
|
} else {
|
|
opts = append(opts, kgo.DisableAutoCommit())
|
|
}
|
|
|
|
if minBufSize > 0 {
|
|
opts = append(opts, kgo.FetchMinBytes(int32(minBufSize)))
|
|
}
|
|
|
|
if fetchMaxWaitTimems > 0 {
|
|
opts = append(opts, kgo.FetchMaxWait(time.Duration(fetchMaxWaitTimems)*time.Millisecond))
|
|
}
|
|
|
|
client, err := kgo.NewClient(opts...)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
logger.Info().Msgf("NewBaseConsumer hosts: %v group: %s", cfg.Brokers, serviceName)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &BaseConsumer{
|
|
client: client,
|
|
timeout: time.Duration(envtool.GetEnvInt("KAFKA_TIMEOUT", 10000)) * time.Millisecond,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
lastOffsets: make(map[string]map[int32]int64),
|
|
}, nil
|
|
}
|
|
|
|
// Subscribe subscribes to topics
|
|
func (c *BaseConsumer) Subscribe(topics []string) error {
|
|
c.client.AddConsumeTopics(topics...)
|
|
return nil
|
|
}
|
|
|
|
// ConsumeToChannel runs a poll loop which waits for messages to consume
|
|
func (c *BaseConsumer) ConsumeToChannel(topics []string, events chan *Message) error {
|
|
c.running = true
|
|
c.setConnected(true)
|
|
defer c.setConnected(false)
|
|
|
|
for c.running {
|
|
fetches := c.client.PollFetches(c.ctx)
|
|
if errs := fetches.Errors(); len(errs) > 0 {
|
|
for _, e := range errs {
|
|
if e.Err == context.Canceled {
|
|
return nil
|
|
}
|
|
logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic)
|
|
c.setConnected(false)
|
|
c.pollingError = errors.WithStack(e.Err)
|
|
return c.pollingError
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.setConnected(true)
|
|
fetches.EachRecord(func(r *kgo.Record) {
|
|
c.trackOffset(r.Topic, r.Partition, r.Offset)
|
|
events <- recordToMessage(r)
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ConsumeOrRebalancedCatch runs a poll loop with rebalance notification
|
|
func (c *BaseConsumer) ConsumeOrRebalancedCatch(topics []string, events chan *Message, reb chan struct{}) error {
|
|
c.running = true
|
|
c.setConnected(true)
|
|
defer c.setConnected(false)
|
|
c.pollingError = nil
|
|
|
|
for c.running {
|
|
fetches := c.client.PollFetches(c.ctx)
|
|
if errs := fetches.Errors(); len(errs) > 0 {
|
|
for _, e := range errs {
|
|
if e.Err == context.Canceled {
|
|
return nil
|
|
}
|
|
logger.Error().Err(e.Err).Send()
|
|
c.setConnected(false)
|
|
c.pollingError = errors.WithStack(e.Err)
|
|
return c.pollingError
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.setConnected(true)
|
|
fetches.EachRecord(func(r *kgo.Record) {
|
|
c.trackOffset(r.Topic, r.Partition, r.Offset)
|
|
events <- recordToMessage(r)
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Seek sets the offset for a topic partition
|
|
func (c *BaseConsumer) Seek(topic string, offset int64) error {
|
|
offsets := map[string]map[int32]kgo.Offset{
|
|
topic: {0: kgo.NewOffset().At(offset)},
|
|
}
|
|
c.client.AddConsumePartitions(offsets)
|
|
return nil
|
|
}
|
|
|
|
// Commit commits the offset for a message
|
|
func (c *BaseConsumer) Commit(message *Message) ([]TopicPartition, error) {
|
|
if message == nil {
|
|
// Commit all uncommitted
|
|
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// Mark the record as consumed and commit
|
|
c.client.MarkCommitRecords(&kgo.Record{
|
|
Topic: message.Topic,
|
|
Partition: message.Partition,
|
|
Offset: message.Offset,
|
|
})
|
|
|
|
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return []TopicPartition{{
|
|
Topic: message.Topic,
|
|
Partition: message.Partition,
|
|
Offset: message.Offset + 1,
|
|
}}, nil
|
|
}
|
|
|
|
// CommitOffsets commits specific offsets
|
|
func (c *BaseConsumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
|
|
for _, o := range offsets {
|
|
c.client.MarkCommitRecords(&kgo.Record{
|
|
Topic: o.Topic,
|
|
Partition: o.Partition,
|
|
Offset: o.Offset - 1, // MarkCommitRecords expects the record offset, not next
|
|
})
|
|
}
|
|
|
|
if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return offsets, nil
|
|
}
|
|
|
|
// LastOffsetConsumed returns the last consumed offset for a partition
|
|
func (c *BaseConsumer) LastOffsetConsumed(topic string, partition int32) (int64, error) {
|
|
c.offsetsMu.RLock()
|
|
defer c.offsetsMu.RUnlock()
|
|
|
|
if topicOffsets, ok := c.lastOffsets[topic]; ok {
|
|
if offset, ok := topicOffsets[partition]; ok {
|
|
return offset, nil
|
|
}
|
|
}
|
|
return -1, errors.Errorf("no subscription for topic %s partition %d", topic, partition)
|
|
}
|
|
|
|
// Stop stops the poll loop
|
|
func (c *BaseConsumer) Stop() {
|
|
c.running = false
|
|
c.cancel()
|
|
if c.client != nil {
|
|
c.client.Close()
|
|
}
|
|
}
|
|
|
|
// Check verifies consumer connectivity
|
|
func (c *BaseConsumer) Check(ctx context.Context) error {
|
|
if c.pollingError != nil {
|
|
return c.pollingError
|
|
}
|
|
if !c.isConnected() {
|
|
return errors.New("consumer isn't connected to Kafka")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *BaseConsumer) setConnected(cntd bool) {
|
|
c.connectedLock.Lock()
|
|
defer c.connectedLock.Unlock()
|
|
c.connected = cntd
|
|
}
|
|
|
|
func (c *BaseConsumer) isConnected() bool {
|
|
c.connectedLock.RLock()
|
|
defer c.connectedLock.RUnlock()
|
|
return c.connected
|
|
}
|
|
|
|
func (c *BaseConsumer) trackOffset(topic string, partition int32, offset int64) {
|
|
c.offsetsMu.Lock()
|
|
defer c.offsetsMu.Unlock()
|
|
|
|
if c.lastOffsets[topic] == nil {
|
|
c.lastOffsets[topic] = make(map[int32]int64)
|
|
}
|
|
c.lastOffsets[topic][partition] = offset
|
|
}
|