package kafka import ( "context" "sync" "time" "github.com/fiskerinc/cloud-services/pkg/common" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/fiskerinc/cloud-services/pkg/utils/envtool" "github.com/pkg/errors" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" ) // Message wraps kgo.Record for compatibility type Message struct { Topic string Partition int32 Offset int64 Key []byte Value []byte Headers map[string][]byte } // TopicPartition represents a topic-partition pair type TopicPartition struct { Topic string Partition int32 Offset int64 } // Metadata holds topic metadata type Metadata struct { Topics map[string]TopicMetadata } // TopicMetadata holds partition info for a topic type TopicMetadata struct { Partitions []PartitionMetadata } // PartitionMetadata holds partition details type PartitionMetadata struct { ID int32 Leader int32 } // ConsumerInterface interface for Consumer utility type ConsumerInterface interface { Consume(topics []string, handler func([]byte, []byte) error) error ConsumeToChannel(topics []string, events chan *Message) error ConsumeToChannelJson(topics []string, events chan common.EventRawJSON) error ConsumePartitionsToChannel(partitions []TopicPartition, events chan *Message) error GetMetadata(topic string) (*Metadata, error) Check(ctx context.Context) error Stop() } // Consumer utility to consume messages type Consumer struct { client *kgo.Client running bool timeout time.Duration connected bool connectedLock sync.RWMutex ctx context.Context cancel context.CancelFunc } // NewConsumer serves as factory method for consumer to kafka func NewConsumer(serviceName string) (ConsumerInterface, error) { cfg := LoadConfig() opts := buildClientOpts(cfg) // Consumer-specific options opts = append(opts, kgo.ConsumerGroup(serviceName), kgo.SessionTimeout(cfg.SessionTimeout), kgo.HeartbeatInterval(cfg.HeartbeatInterval), ) if envtool.GetEnvBool("KAFKA_ENABLE_AUTO_COMMIT", true) { opts = append(opts, kgo.AutoCommitInterval( time.Duration(envtool.GetEnvInt("KAFKA_AUTO_COMMIT_INTERVAL_MS", 5000))*time.Millisecond, )) } else { opts = append(opts, kgo.DisableAutoCommit()) } client, err := kgo.NewClient(opts...) if err != nil { return nil, errors.WithStack(err) } logger.Info().Msgf("NewConsumer hosts: %v group: %s", cfg.Brokers, serviceName) ctx, cancel := context.WithCancel(context.Background()) return &Consumer{ client: client, timeout: time.Duration(envtool.GetEnvInt("KAFKA_TIMEOUT", 10000)) * time.Millisecond, ctx: ctx, cancel: cancel, }, nil } // Consume runs a poll loop which waits for messages to consume func (c *Consumer) Consume(topics []string, handler func([]byte, []byte) error) error { c.client.AddConsumeTopics(topics...) c.setConnected(true) defer c.setConnected(false) c.running = true for c.running { fetches := c.client.PollFetches(c.ctx) if errs := fetches.Errors(); len(errs) > 0 { for _, e := range errs { if e.Err == context.Canceled { return nil } logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic) c.setConnected(false) } continue } c.setConnected(true) fetches.EachRecord(func(r *kgo.Record) { if err := handler(r.Key, r.Value); err != nil { logger.Warn().Err(err).Send() } }) } return nil } // ConsumeToChannel runs a poll loop which waits for messages to consume func (c *Consumer) ConsumeToChannel(topics []string, events chan *Message) error { c.client.AddConsumeTopics(topics...) c.setConnected(true) defer c.setConnected(false) c.running = true for c.running { fetches := c.client.PollFetches(c.ctx) if errs := fetches.Errors(); len(errs) > 0 { for _, e := range errs { if e.Err == context.Canceled { return nil } logger.Error().Err(e.Err).Msgf("fetch error on %s", e.Topic) c.setConnected(false) } continue } c.setConnected(true) fetches.EachRecord(func(r *kgo.Record) { events <- recordToMessage(r) }) } return nil } // ConsumeToChannelJson runs a poll loop for JSON events func (c *Consumer) ConsumeToChannelJson(topics []string, events chan common.EventRawJSON) error { c.client.AddConsumeTopics(topics...) c.setConnected(true) defer c.setConnected(false) c.running = true for c.running { fetches := c.client.PollFetches(c.ctx) if errs := fetches.Errors(); len(errs) > 0 { for _, e := range errs { if e.Err == context.Canceled { return nil } logger.Error().Err(e.Err).Send() c.setConnected(false) } continue } c.setConnected(true) fetches.EachRecord(func(r *kgo.Record) { events <- common.EventRawJSON{ Topic: r.Topic, Key: string(r.Key), Payload: r.Value, } }) } return nil } // ConsumePartitionsToChannel consumes from specific partitions func (c *Consumer) ConsumePartitionsToChannel(partitions []TopicPartition, events chan *Message) error { if len(partitions) == 0 { return errors.New("no partitions provided") } // Build partition map for direct assignment offsets := make(map[string]map[int32]kgo.Offset) for _, p := range partitions { if offsets[p.Topic] == nil { offsets[p.Topic] = make(map[int32]kgo.Offset) } offsets[p.Topic][p.Partition] = kgo.NewOffset().At(p.Offset) } c.client.AddConsumePartitions(offsets) c.setConnected(true) defer c.setConnected(false) c.running = true for c.running { fetches := c.client.PollFetches(c.ctx) if errs := fetches.Errors(); len(errs) > 0 { for _, e := range errs { if e.Err == context.Canceled { return nil } logger.Error().Err(e.Err).Send() c.setConnected(false) } continue } c.setConnected(true) fetches.EachRecord(func(r *kgo.Record) { events <- recordToMessage(r) }) } return nil } // GetMetadata returns topic metadata func (c *Consumer) GetMetadata(topic string) (*Metadata, error) { ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) defer cancel() // Use kadm for metadata adm := kadm.NewClient(c.client) meta, err := adm.Metadata(ctx, topic) if err != nil { return nil, errors.WithStack(err) } result := &Metadata{Topics: make(map[string]TopicMetadata)} for _, t := range meta.Topics { tm := TopicMetadata{} for _, p := range t.Partitions { tm.Partitions = append(tm.Partitions, PartitionMetadata{ ID: p.Partition, Leader: p.Leader, }) } result.Topics[t.Topic] = tm } return result, nil } // Stop stops the poll loop func (c *Consumer) Stop() { c.running = false c.cancel() if c.client != nil { c.client.Close() } } // Check verifies consumer connectivity func (c *Consumer) Check(ctx context.Context) error { if !c.isConnected() { return errors.New("consumer isn't connected to Kafka") } return nil } func (c *Consumer) setConnected(cntd bool) { c.connectedLock.Lock() defer c.connectedLock.Unlock() c.connected = cntd } func (c *Consumer) isConnected() bool { c.connectedLock.RLock() defer c.connectedLock.RUnlock() return c.connected } func recordToMessage(r *kgo.Record) *Message { headers := make(map[string][]byte) for _, h := range r.Headers { headers[h.Key] = h.Value } return &Message{ Topic: r.Topic, Partition: r.Partition, Offset: r.Offset, Key: r.Key, Value: r.Value, Headers: headers, } }