Files
cloud-services/pkg/kafka/consumer.go

312 lines
7.3 KiB
Go

package kafka
import (
"context"
"sync"
"time"
"github.com/fiskerinc/cloud-services/pkg/common"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
"github.com/pkg/errors"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)
// Message wraps kgo.Record for compatibility
type Message struct {
Topic string
Partition int32
Offset int64
Key []byte
Value []byte
Headers map[string][]byte
}
// TopicPartition represents a topic-partition pair
type TopicPartition struct {
Topic string
Partition int32
Offset int64
}
// Metadata holds topic metadata
type Metadata struct {
Topics map[string]TopicMetadata
}
// TopicMetadata holds partition info for a topic
type TopicMetadata struct {
Partitions []PartitionMetadata
}
// PartitionMetadata holds partition details
type PartitionMetadata struct {
ID int32
Leader int32
}
// ConsumerInterface interface for Consumer utility
type ConsumerInterface interface {
Consume(topics []string, handler func([]byte, []byte) error) error
ConsumeToChannel(topics []string, events chan *Message) error
ConsumeToChannelJson(topics []string, events chan common.EventRawJSON) error
ConsumePartitionsToChannel(partitions []TopicPartition, events chan *Message) error
GetMetadata(topic string) (*Metadata, error)
Check(ctx context.Context) error
Stop()
}
// Consumer utility to consume messages
type Consumer struct {
client *kgo.Client
running bool
timeout time.Duration
connected bool
connectedLock sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
// NewConsumer serves as factory method for consumer to kafka
func NewConsumer(serviceName string) (ConsumerInterface, error) {
cfg := LoadConfig()
opts := buildClientOpts(cfg)
// Consumer-specific options
opts = append(opts,
kgo.ConsumerGroup(serviceName),
kgo.SessionTimeout(cfg.SessionTimeout),
kgo.HeartbeatInterval(cfg.HeartbeatInterval),
)
if envtool.GetEnvBool("KAFKA_ENABLE_AUTO_COMMIT", true) {
opts = append(opts, kgo.AutoCommitInterval(
time.Duration(envtool.GetEnvInt("KAFKA_AUTO_COMMIT_INTERVAL_MS", 5000))*time.Millisecond,
))
} else {
opts = append(opts, kgo.DisableAutoCommit())
}
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, errors.WithStack(err)
}
logger.Info().Msgf("NewConsumer hosts: %v group: %s", cfg.Brokers, serviceName)
ctx, cancel := context.WithCancel(context.Background())
return &Consumer{
client: client,
timeout: time.Duration(envtool.GetEnvInt("KAFKA_TIMEOUT", 10000)) * time.Millisecond,
ctx: ctx,
cancel: cancel,
}, nil
}
// Consume runs a poll loop which waits for messages to consume
func (c *Consumer) Consume(topics []string, handler func([]byte, []byte) error) error {
c.client.AddConsumeTopics(topics...)
c.setConnected(true)
defer c.setConnected(false)
c.running = true
for c.running {
fetches := c.client.PollFetches(c.ctx)
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if e.Err == context.Canceled {
return nil
}
logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic)
c.setConnected(false)
}
continue
}
c.setConnected(true)
fetches.EachRecord(func(r *kgo.Record) {
if err := handler(r.Key, r.Value); err != nil {
logger.Warn().Err(err).Send()
}
})
}
return nil
}
// ConsumeToChannel runs a poll loop which waits for messages to consume
func (c *Consumer) ConsumeToChannel(topics []string, events chan *Message) error {
c.client.AddConsumeTopics(topics...)
c.setConnected(true)
defer c.setConnected(false)
c.running = true
for c.running {
fetches := c.client.PollFetches(c.ctx)
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if e.Err == context.Canceled {
return nil
}
logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic)
c.setConnected(false)
}
continue
}
c.setConnected(true)
fetches.EachRecord(func(r *kgo.Record) {
events <- recordToMessage(r)
})
}
return nil
}
// ConsumeToChannelJson runs a poll loop for JSON events
func (c *Consumer) ConsumeToChannelJson(topics []string, events chan common.EventRawJSON) error {
c.client.AddConsumeTopics(topics...)
c.setConnected(true)
defer c.setConnected(false)
c.running = true
for c.running {
fetches := c.client.PollFetches(c.ctx)
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if e.Err == context.Canceled {
return nil
}
logger.Error().Err(e.Err).Send()
c.setConnected(false)
}
continue
}
c.setConnected(true)
fetches.EachRecord(func(r *kgo.Record) {
events <- common.EventRawJSON{
Topic: r.Topic,
Key: string(r.Key),
Payload: r.Value,
}
})
}
return nil
}
// ConsumePartitionsToChannel consumes from specific partitions
func (c *Consumer) ConsumePartitionsToChannel(partitions []TopicPartition, events chan *Message) error {
if len(partitions) == 0 {
return errors.New("no partitions provided")
}
// Build partition map for direct assignment
offsets := make(map[string]map[int32]kgo.Offset)
for _, p := range partitions {
if offsets[p.Topic] == nil {
offsets[p.Topic] = make(map[int32]kgo.Offset)
}
offsets[p.Topic][p.Partition] = kgo.NewOffset().At(p.Offset)
}
c.client.AddConsumePartitions(offsets)
c.setConnected(true)
defer c.setConnected(false)
c.running = true
for c.running {
fetches := c.client.PollFetches(c.ctx)
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if e.Err == context.Canceled {
return nil
}
logger.Error().Err(e.Err).Send()
c.setConnected(false)
}
continue
}
c.setConnected(true)
fetches.EachRecord(func(r *kgo.Record) {
events <- recordToMessage(r)
})
}
return nil
}
// GetMetadata returns topic metadata
func (c *Consumer) GetMetadata(topic string) (*Metadata, error) {
ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
defer cancel()
// Use kadm for metadata
adm := kadm.NewClient(c.client)
meta, err := adm.Metadata(ctx, topic)
if err != nil {
return nil, errors.WithStack(err)
}
result := &Metadata{Topics: make(map[string]TopicMetadata)}
for _, t := range meta.Topics {
tm := TopicMetadata{}
for _, p := range t.Partitions {
tm.Partitions = append(tm.Partitions, PartitionMetadata{
ID: p.Partition,
Leader: p.Leader,
})
}
result.Topics[t.Topic] = tm
}
return result, nil
}
// Stop stops the poll loop
func (c *Consumer) Stop() {
c.running = false
c.cancel()
if c.client != nil {
c.client.Close()
}
}
// Check verifies consumer connectivity
func (c *Consumer) Check(ctx context.Context) error {
if !c.isConnected() {
return errors.New("consumer isn't connected to Kafka")
}
return nil
}
func (c *Consumer) setConnected(cntd bool) {
c.connectedLock.Lock()
defer c.connectedLock.Unlock()
c.connected = cntd
}
func (c *Consumer) isConnected() bool {
c.connectedLock.RLock()
defer c.connectedLock.RUnlock()
return c.connected
}
func recordToMessage(r *kgo.Record) *Message {
headers := make(map[string][]byte)
for _, h := range r.Headers {
headers[h.Key] = h.Value
}
return &Message{
Topic: r.Topic,
Partition: r.Partition,
Offset: r.Offset,
Key: r.Key,
Value: r.Value,
Headers: headers,
}
}