package kafka import ( "context" "sync" "time" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/fiskerinc/cloud-services/pkg/utils/envtool" "github.com/pkg/errors" "github.com/twmb/franz-go/pkg/kgo" ) const KafkaTimeout = 30000 // ms // BaseConsumerInterface interface for Consumer utility with manual commit control type BaseConsumerInterface interface { Check(ctx context.Context) error Commit(message *Message) ([]TopicPartition, error) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) ConsumeToChannel(topics []string, events chan *Message) error ConsumeOrRebalancedCatch(topics []string, events chan *Message, reb chan struct{}) error LastOffsetConsumed(topic string, partition int32) (int64, error) Seek(topic string, offset int64) error Stop() Subscribe(topics []string) error } // BaseConsumer utility for consuming with manual offset control type BaseConsumer struct { client *kgo.Client running bool timeout time.Duration connected bool connectedLock sync.RWMutex pollingError error ctx context.Context cancel context.CancelFunc lastOffsets map[string]map[int32]int64 offsetsMu sync.RWMutex } // NewBaseConsumer serves as factory method for consumer with manual commit func NewBaseConsumer(serviceName string, minBufSize int, fetchMaxWaitTimems int, autoCommit bool) (BaseConsumerInterface, error) { cfg := LoadConfig() opts := buildClientOpts(cfg) // Consumer-specific options opts = append(opts, kgo.ConsumerGroup(serviceName), kgo.SessionTimeout(cfg.SessionTimeout), kgo.HeartbeatInterval(cfg.HeartbeatInterval), ) if autoCommit { opts = append(opts, kgo.AutoCommitInterval( time.Duration(envtool.GetEnvInt("KAFKA_AUTO_COMMIT_INTERVAL_MS", 5000))*time.Millisecond, )) } else { opts = append(opts, kgo.DisableAutoCommit()) } if minBufSize > 0 { opts = append(opts, kgo.FetchMinBytes(int32(minBufSize))) } if fetchMaxWaitTimems > 0 { opts = append(opts, kgo.FetchMaxWait(time.Duration(fetchMaxWaitTimems)*time.Millisecond)) } client, err := kgo.NewClient(opts...) if err != nil { return nil, errors.WithStack(err) } logger.Info().Msgf("NewBaseConsumer hosts: %v group: %s", cfg.Brokers, serviceName) ctx, cancel := context.WithCancel(context.Background()) return &BaseConsumer{ client: client, timeout: time.Duration(envtool.GetEnvInt("KAFKA_TIMEOUT", 10000)) * time.Millisecond, ctx: ctx, cancel: cancel, lastOffsets: make(map[string]map[int32]int64), }, nil } // Subscribe subscribes to topics func (c *BaseConsumer) Subscribe(topics []string) error { c.client.AddConsumeTopics(topics...) return nil } // ConsumeToChannel runs a poll loop which waits for messages to consume func (c *BaseConsumer) ConsumeToChannel(topics []string, events chan *Message) error { c.running = true c.setConnected(true) defer c.setConnected(false) for c.running { fetches := c.client.PollFetches(c.ctx) if errs := fetches.Errors(); len(errs) > 0 { for _, e := range errs { if e.Err == context.Canceled { return nil } logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic) c.setConnected(false) c.pollingError = errors.WithStack(e.Err) return c.pollingError } continue } c.setConnected(true) fetches.EachRecord(func(r *kgo.Record) { c.trackOffset(r.Topic, r.Partition, r.Offset) events <- recordToMessage(r) }) } return nil } // ConsumeOrRebalancedCatch runs a poll loop with rebalance notification func (c *BaseConsumer) ConsumeOrRebalancedCatch(topics []string, events chan *Message, reb chan struct{}) error { c.running = true c.setConnected(true) defer c.setConnected(false) c.pollingError = nil for c.running { fetches := c.client.PollFetches(c.ctx) if errs := fetches.Errors(); len(errs) > 0 { for _, e := range errs { if e.Err == context.Canceled { return nil } logger.Error().Err(e.Err).Send() c.setConnected(false) c.pollingError = errors.WithStack(e.Err) return c.pollingError } continue } c.setConnected(true) fetches.EachRecord(func(r *kgo.Record) { c.trackOffset(r.Topic, r.Partition, r.Offset) events <- recordToMessage(r) }) } return nil } // Seek sets the offset for a topic partition func (c *BaseConsumer) Seek(topic string, offset int64) error { offsets := map[string]map[int32]kgo.Offset{ topic: {0: kgo.NewOffset().At(offset)}, } c.client.AddConsumePartitions(offsets) return nil } // Commit commits the offset for a message func (c *BaseConsumer) Commit(message *Message) ([]TopicPartition, error) { if message == nil { // Commit all uncommitted if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil { return nil, errors.WithStack(err) } return nil, nil } // Mark the record as consumed and commit c.client.MarkCommitRecords(&kgo.Record{ Topic: message.Topic, Partition: message.Partition, Offset: message.Offset, }) if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil { return nil, errors.WithStack(err) } return []TopicPartition{{ Topic: message.Topic, Partition: message.Partition, Offset: message.Offset + 1, }}, nil } // CommitOffsets commits specific offsets func (c *BaseConsumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) { for _, o := range offsets { c.client.MarkCommitRecords(&kgo.Record{ Topic: o.Topic, Partition: o.Partition, Offset: o.Offset - 1, // MarkCommitRecords expects the record offset, not next }) } if err := c.client.CommitUncommittedOffsets(c.ctx); err != nil { return nil, errors.WithStack(err) } return offsets, nil } // LastOffsetConsumed returns the last consumed offset for a partition func (c *BaseConsumer) LastOffsetConsumed(topic string, partition int32) (int64, error) { c.offsetsMu.RLock() defer c.offsetsMu.RUnlock() if topicOffsets, ok := c.lastOffsets[topic]; ok { if offset, ok := topicOffsets[partition]; ok { return offset, nil } } return -1, errors.Errorf("no subscription for topic %s partition %d", topic, partition) } // Stop stops the poll loop func (c *BaseConsumer) Stop() { c.running = false c.cancel() if c.client != nil { c.client.Close() } } // Check verifies consumer connectivity func (c *BaseConsumer) Check(ctx context.Context) error { if c.pollingError != nil { return c.pollingError } if !c.isConnected() { return errors.New("consumer isn't connected to Kafka") } return nil } func (c *BaseConsumer) setConnected(cntd bool) { c.connectedLock.Lock() defer c.connectedLock.Unlock() c.connected = cntd } func (c *BaseConsumer) isConnected() bool { c.connectedLock.RLock() defer c.connectedLock.RUnlock() return c.connected } func (c *BaseConsumer) trackOffset(topic string, partition int32, offset int64) { c.offsetsMu.Lock() defer c.offsetsMu.Unlock() if c.lastOffsets[topic] == nil { c.lastOffsets[topic] = make(map[int32]int64) } c.lastOffsets[topic][partition] = offset }