Files
cloud-services/pkg/kafka/base_consumer.go

212 lines
5.3 KiB
Go

package kafka
import (
"context"
"sync"
"fiskerinc.com/modules/logger"
"fiskerinc.com/modules/utils/envtool"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pkg/errors"
)
const KafkaTimeout = 30000 //ms
// BaseConsumerInterface interface for Consumer utility
//
// NOTE: DOES NOT AUTO COMMIT OFFSETS
type BaseConsumerInterface interface {
Check(ctx context.Context) error
Commit(message *kafka.Message) ([]kafka.TopicPartition, error)
CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
ConsumeToChannel(topics []string, events chan *kafka.Message) error
ConsumeOrRebalancedCatch(topics []string, events chan *kafka.Message, reb chan struct{}) error
LastOffsetConsumed(topic string, partition int32) (kafka.Offset, error)
Seek(topic string, offset kafka.Offset) error
Stop()
Subscribe(topics []string) error
}
// NoncommitalConsumer utility to produce messages
type BaseConsumer struct {
consumer *kafka.Consumer
running bool
timeout int
connected bool
connectedLock sync.RWMutex
pollingError error
}
// NewConsumer serves as factory method for consumer to kafka
func NewBaseConsumer(serviceName string, minBufSize int, fetchMaxWaitTimems int, auto_commit bool) (BaseConsumerInterface, error) {
var consumer *kafka.Consumer
kafkaConfigMap := loadKafkaConsumerConfig(serviceName)
kafkaConfigMap.SetKey("enable.auto.commit", auto_commit)
if minBufSize != -1 {
kafkaConfigMap.SetKey("fetch.min.bytes", minBufSize)
}
consumer, err := kafka.NewConsumer(&kafkaConfigMap)
kafkaHosts := envtool.GetEnv("KAFKA_HOSTS", "localhost:9093")
logger.Info().Msgf("NewNoncommitalConsumer hosts: %s", kafkaHosts)
if err != nil {
return nil, errors.WithStack(err)
}
return &BaseConsumer{
consumer: consumer,
timeout: envtool.GetEnvInt("KAFKA_TIMEOUT", 10000),
}, nil
}
func (c *BaseConsumer) Subscribe(topics []string) error {
err := c.consumer.SubscribeTopics(topics, nil)
if err != nil {
return errors.WithStack(err)
}
return nil
}
// ConsumeToChannel runs a poll loop which waits for messages to consume
func (c *BaseConsumer) ConsumeToChannel(topics []string, events chan *kafka.Message) error {
c.running = true
c.setConnected(true)
defer c.setConnected(false)
for c.running {
e := c.consumer.Poll(c.timeout)
switch msg := e.(type) {
case *kafka.Message:
events <- msg
msg = nil
c.setConnected(true)
case kafka.Error:
c.setConnected(false)
logger.Error().Msg(e.String())
c.pollingError = errors.WithStack(errors.New(e.String()))
return c.pollingError
}
}
return nil
}
// ConsumeOrRebalancedCatch runs a poll loop which waits for messages to consume
func (c *BaseConsumer) ConsumeOrRebalancedCatch(topics []string, events chan *kafka.Message, reb chan struct{}) error {
c.running = true
c.setConnected(true)
defer c.setConnected(false)
c.pollingError = nil
for c.running {
e := c.consumer.Poll(c.timeout)
switch msg := e.(type) {
case *kafka.Message:
events <- msg
msg = nil
c.setConnected(true)
case kafka.AssignedPartitions:
reb <- struct{}{}
c.setConnected(true)
case kafka.Error:
c.setConnected(false)
c.pollingError = errors.WithStack(errors.New(e.String()))
logger.Error().Err(c.pollingError).Send()
return c.pollingError
}
}
return nil
}
// Seek provides a wrapper for private consumer method "Seek"
func (c *BaseConsumer) Seek(topic string, offset kafka.Offset) error {
err := c.consumer.Seek(kafka.TopicPartition{
Topic: &topic,
Offset: offset,
}, c.timeout)
if err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *BaseConsumer) commitWithoutMessage() ([]kafka.TopicPartition, error) {
partitions, err := c.consumer.Commit()
if err != nil {
return partitions, errors.WithStack(err)
}
return partitions, nil
}
func (c *BaseConsumer) Commit(message *kafka.Message) ([]kafka.TopicPartition, error) {
if message == nil {
return c.commitWithoutMessage()
}
partitions, err := c.consumer.CommitMessage(message)
if err != nil {
return partitions, errors.WithStack(err)
}
return partitions, nil
}
func (c *BaseConsumer) LastOffsetConsumed(topic string, partition int32) (kafka.Offset, error) {
partitions, err := c.consumer.Position([]kafka.TopicPartition{{
Topic: &topic,
Partition: partition,
}})
if err != nil {
return -1, errors.WithStack(err)
} else if len(partitions) != 1 {
return -1, errors.Errorf("no subscription for topic %s", topic)
}
return partitions[0].Offset, nil
}
// Stop stops the poll loop running
func (c *BaseConsumer) Stop() {
if c.consumer != nil {
c.running = false
if err := c.consumer.Close(); err != nil {
logger.Warn().Err(err).Send()
}
}
}
// very basic implementation for health_check
func (c *BaseConsumer) Check(ctx context.Context) error {
if c.pollingError != nil {
return c.pollingError
}
if !c.isConnected() {
return errors.New("consumer isn't connected to Kafka")
}
return nil
}
func (c *BaseConsumer) setConnected(cntd bool) {
c.connectedLock.Lock()
defer c.connectedLock.Unlock()
c.connected = cntd
}
func (c *BaseConsumer) isConnected() bool {
c.connectedLock.RLock()
defer c.connectedLock.RUnlock()
return c.connected
}
// passthrough function for CommitOffsets()
func (c *BaseConsumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) {
return c.consumer.CommitOffsets(offsets)
}