356 lines
8.5 KiB
Go
356 lines
8.5 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/common"
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
|
|
"github.com/pkg/errors"
|
|
"github.com/twmb/franz-go/pkg/kadm"
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
)
|
|
|
|
// Message wraps kgo.Record for compatibility
|
|
type Message struct {
|
|
Topic string
|
|
Partition int32
|
|
Offset int64
|
|
Key []byte
|
|
Value []byte
|
|
Headers map[string][]byte
|
|
}
|
|
|
|
// TopicPartition represents a topic-partition pair
|
|
type TopicPartition struct {
|
|
Topic string
|
|
Partition int32
|
|
Offset int64
|
|
}
|
|
|
|
// Metadata holds topic metadata
|
|
type Metadata struct {
|
|
Topics map[string]TopicMetadata
|
|
}
|
|
|
|
// TopicMetadata holds partition info for a topic
|
|
type TopicMetadata struct {
|
|
Partitions []PartitionMetadata
|
|
}
|
|
|
|
// PartitionMetadata holds partition details
|
|
type PartitionMetadata struct {
|
|
ID int32
|
|
Leader int32
|
|
}
|
|
|
|
// ConsumerInterface interface for Consumer utility
|
|
type ConsumerInterface interface {
|
|
Consume(topics []string, handler func([]byte, []byte) error) error
|
|
ConsumeToChannel(topics []string, events chan *Message) error
|
|
ConsumeToChannelJson(topics []string, events chan common.EventRawJSON) error
|
|
ConsumePartitionsToChannel(partitions []TopicPartition, events chan *Message) error
|
|
ConsumeOrRebalancedCatch(topics []string, events chan *Message, rebalance chan struct{}) error
|
|
Subscribe(topics []string)
|
|
GetMetadata(topic string) (*Metadata, error)
|
|
Check(ctx context.Context) error
|
|
Stop()
|
|
}
|
|
|
|
// Consumer utility to consume messages
|
|
type Consumer struct {
|
|
client *kgo.Client
|
|
running bool
|
|
timeout time.Duration
|
|
connected bool
|
|
connectedLock sync.RWMutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewConsumer serves as factory method for consumer to kafka
|
|
func NewConsumer(serviceName string) (ConsumerInterface, error) {
|
|
cfg := LoadConfig()
|
|
opts := buildClientOpts(cfg)
|
|
|
|
// Consumer-specific options
|
|
opts = append(opts,
|
|
kgo.ConsumerGroup(serviceName),
|
|
kgo.SessionTimeout(cfg.SessionTimeout),
|
|
kgo.HeartbeatInterval(cfg.HeartbeatInterval),
|
|
)
|
|
|
|
if envtool.GetEnvBool("KAFKA_ENABLE_AUTO_COMMIT", true) {
|
|
opts = append(opts, kgo.AutoCommitInterval(
|
|
time.Duration(envtool.GetEnvInt("KAFKA_AUTO_COMMIT_INTERVAL_MS", 5000))*time.Millisecond,
|
|
))
|
|
} else {
|
|
opts = append(opts, kgo.DisableAutoCommit())
|
|
}
|
|
|
|
client, err := kgo.NewClient(opts...)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
logger.Info().Msgf("NewConsumer hosts: %v group: %s", cfg.Brokers, serviceName)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &Consumer{
|
|
client: client,
|
|
timeout: time.Duration(envtool.GetEnvInt("KAFKA_TIMEOUT", 10000)) * time.Millisecond,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}, nil
|
|
}
|
|
|
|
// Consume runs a poll loop which waits for messages to consume
|
|
func (c *Consumer) Consume(topics []string, handler func([]byte, []byte) error) error {
|
|
c.client.AddConsumeTopics(topics...)
|
|
c.setConnected(true)
|
|
defer c.setConnected(false)
|
|
|
|
c.running = true
|
|
for c.running {
|
|
fetches := c.client.PollFetches(c.ctx)
|
|
if errs := fetches.Errors(); len(errs) > 0 {
|
|
for _, e := range errs {
|
|
if e.Err == context.Canceled {
|
|
return nil
|
|
}
|
|
logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic)
|
|
c.setConnected(false)
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.setConnected(true)
|
|
fetches.EachRecord(func(r *kgo.Record) {
|
|
if err := handler(r.Key, r.Value); err != nil {
|
|
logger.Warn().Err(err).Send()
|
|
}
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ConsumeToChannel runs a poll loop which waits for messages to consume
|
|
func (c *Consumer) ConsumeToChannel(topics []string, events chan *Message) error {
|
|
c.client.AddConsumeTopics(topics...)
|
|
c.setConnected(true)
|
|
defer c.setConnected(false)
|
|
|
|
c.running = true
|
|
for c.running {
|
|
fetches := c.client.PollFetches(c.ctx)
|
|
if errs := fetches.Errors(); len(errs) > 0 {
|
|
for _, e := range errs {
|
|
if e.Err == context.Canceled {
|
|
return nil
|
|
}
|
|
logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic)
|
|
c.setConnected(false)
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.setConnected(true)
|
|
fetches.EachRecord(func(r *kgo.Record) {
|
|
events <- recordToMessage(r)
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ConsumeToChannelJson runs a poll loop for JSON events
|
|
func (c *Consumer) ConsumeToChannelJson(topics []string, events chan common.EventRawJSON) error {
|
|
c.client.AddConsumeTopics(topics...)
|
|
c.setConnected(true)
|
|
defer c.setConnected(false)
|
|
|
|
c.running = true
|
|
for c.running {
|
|
fetches := c.client.PollFetches(c.ctx)
|
|
if errs := fetches.Errors(); len(errs) > 0 {
|
|
for _, e := range errs {
|
|
if e.Err == context.Canceled {
|
|
return nil
|
|
}
|
|
logger.Error().Err(e.Err).Send()
|
|
c.setConnected(false)
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.setConnected(true)
|
|
fetches.EachRecord(func(r *kgo.Record) {
|
|
events <- common.EventRawJSON{
|
|
Topic: r.Topic,
|
|
Key: string(r.Key),
|
|
Payload: r.Value,
|
|
}
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ConsumePartitionsToChannel consumes from specific partitions
|
|
func (c *Consumer) ConsumePartitionsToChannel(partitions []TopicPartition, events chan *Message) error {
|
|
if len(partitions) == 0 {
|
|
return errors.New("no partitions provided")
|
|
}
|
|
|
|
// Build partition map for direct assignment
|
|
offsets := make(map[string]map[int32]kgo.Offset)
|
|
for _, p := range partitions {
|
|
if offsets[p.Topic] == nil {
|
|
offsets[p.Topic] = make(map[int32]kgo.Offset)
|
|
}
|
|
offsets[p.Topic][p.Partition] = kgo.NewOffset().At(p.Offset)
|
|
}
|
|
c.client.AddConsumePartitions(offsets)
|
|
|
|
c.setConnected(true)
|
|
defer c.setConnected(false)
|
|
|
|
c.running = true
|
|
for c.running {
|
|
fetches := c.client.PollFetches(c.ctx)
|
|
if errs := fetches.Errors(); len(errs) > 0 {
|
|
for _, e := range errs {
|
|
if e.Err == context.Canceled {
|
|
return nil
|
|
}
|
|
logger.Error().Err(e.Err).Send()
|
|
c.setConnected(false)
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.setConnected(true)
|
|
fetches.EachRecord(func(r *kgo.Record) {
|
|
events <- recordToMessage(r)
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetMetadata returns topic metadata
|
|
func (c *Consumer) GetMetadata(topic string) (*Metadata, error) {
|
|
ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
// Use kadm for metadata
|
|
adm := kadm.NewClient(c.client)
|
|
meta, err := adm.Metadata(ctx, topic)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
result := &Metadata{Topics: make(map[string]TopicMetadata)}
|
|
for _, t := range meta.Topics {
|
|
tm := TopicMetadata{}
|
|
for _, p := range t.Partitions {
|
|
tm.Partitions = append(tm.Partitions, PartitionMetadata{
|
|
ID: p.Partition,
|
|
Leader: p.Leader,
|
|
})
|
|
}
|
|
result.Topics[t.Topic] = tm
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Stop stops the poll loop
|
|
func (c *Consumer) Stop() {
|
|
c.running = false
|
|
c.cancel()
|
|
if c.client != nil {
|
|
c.client.Close()
|
|
}
|
|
}
|
|
|
|
// Subscribe adds topics to consume (for compatibility with old API)
|
|
func (c *Consumer) Subscribe(topics []string) {
|
|
c.client.AddConsumeTopics(topics...)
|
|
}
|
|
|
|
// ConsumeOrRebalancedCatch consumes messages and notifies on rebalance events
|
|
func (c *Consumer) ConsumeOrRebalancedCatch(topics []string, events chan *Message, rebalance chan struct{}) error {
|
|
c.client.AddConsumeTopics(topics...)
|
|
c.setConnected(true)
|
|
defer c.setConnected(false)
|
|
|
|
c.running = true
|
|
for c.running {
|
|
fetches := c.client.PollFetches(c.ctx)
|
|
if errs := fetches.Errors(); len(errs) > 0 {
|
|
for _, e := range errs {
|
|
if e.Err == context.Canceled {
|
|
return nil
|
|
}
|
|
// Check for rebalance-related errors
|
|
if e.Err.Error() == "REBALANCE_IN_PROGRESS" || e.Err.Error() == "NOT_COORDINATOR" {
|
|
select {
|
|
case rebalance <- struct{}{}:
|
|
default:
|
|
}
|
|
return e.Err
|
|
}
|
|
logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic)
|
|
c.setConnected(false)
|
|
}
|
|
continue
|
|
}
|
|
|
|
c.setConnected(true)
|
|
fetches.EachRecord(func(r *kgo.Record) {
|
|
events <- recordToMessage(r)
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Check verifies consumer connectivity
|
|
func (c *Consumer) Check(ctx context.Context) error {
|
|
if !c.isConnected() {
|
|
return errors.New("consumer isn't connected to Kafka")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Consumer) setConnected(cntd bool) {
|
|
c.connectedLock.Lock()
|
|
defer c.connectedLock.Unlock()
|
|
c.connected = cntd
|
|
}
|
|
|
|
func (c *Consumer) isConnected() bool {
|
|
c.connectedLock.RLock()
|
|
defer c.connectedLock.RUnlock()
|
|
return c.connected
|
|
}
|
|
|
|
func recordToMessage(r *kgo.Record) *Message {
|
|
headers := make(map[string][]byte)
|
|
for _, h := range r.Headers {
|
|
headers[h.Key] = h.Value
|
|
}
|
|
return &Message{
|
|
Topic: r.Topic,
|
|
Partition: r.Partition,
|
|
Offset: r.Offset,
|
|
Key: r.Key,
|
|
Value: r.Value,
|
|
Headers: headers,
|
|
}
|
|
}
|