package kafka import ( "context" "encoding/json" "sync" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/pkg/errors" "github.com/twmb/franz-go/pkg/kgo" ) // ProducerInterface interface for Producer utility type ProducerInterface interface { Produce(topic string, key string, payload interface{}, headers map[string][]byte) error ProduceBinary(topic string, key string, data []byte, headers map[string][]byte) error ProduceBinaryToChannel(topic string, key string, payload []byte) error ProduceToChannel(string, string, interface{}) error ProduceSignalBatch(string, string, interface{}) error ReadEvents() Len() int Flush(timeoutMs int) int Close() } // Producer utility to produce messages type Producer struct { client *kgo.Client ctx context.Context cancel context.CancelFunc pending int pendingMu sync.Mutex } // NewProducer serves as factory method for producer to kafka func NewProducer(ctx context.Context) (ProducerInterface, error) { cfg := LoadConfig() opts := buildClientOpts(cfg) // Producer-specific options opts = append(opts, kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.AllowAutoTopicCreation(), ) client, err := kgo.NewClient(opts...) if err != nil { return nil, errors.WithStack(err) } logger.Info().Msgf("NewProducer hosts: %v", cfg.Brokers) pctx, cancel := context.WithCancel(ctx) return &Producer{ client: client, ctx: pctx, cancel: cancel, }, nil } // Len returns len of messages in queue (approximate) func (p *Producer) Len() int { p.pendingMu.Lock() defer p.pendingMu.Unlock() return p.pending } // Flush waits for all buffered records to be flushed func (p *Producer) Flush(timeoutMs int) int { ctx := p.ctx if timeoutMs > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(p.ctx, durationMs(timeoutMs)) defer cancel() } if err := p.client.Flush(ctx); err != nil { logger.Warn().Err(err).Msg("flush incomplete") return p.Len() } return 0 } // Produce sends a JSON-encoded message to Kafka func (p *Producer) Produce(topic string, key string, payload interface{}, headers map[string][]byte) error { v, err := json.Marshal(payload) if err != nil { return errors.WithStack(err) } return p.ProduceBinary(topic, key, v, headers) } func makeHeaders(headers map[string][]byte) []kgo.RecordHeader { if headers == nil { return nil } result := make([]kgo.RecordHeader, 0, len(headers)) for k, v := range headers { result = append(result, kgo.RecordHeader{Key: k, Value: v}) } return result } // ProduceBinary sends a binary message to Kafka synchronously func (p *Producer) ProduceBinary(topic string, key string, data []byte, headers map[string][]byte) error { record := &kgo.Record{ Topic: topic, Key: []byte(key), Value: data, Headers: makeHeaders(headers), } p.pendingMu.Lock() p.pending++ p.pendingMu.Unlock() result := p.client.ProduceSync(p.ctx, record) p.pendingMu.Lock() p.pending-- p.pendingMu.Unlock() if err := result.FirstErr(); err != nil { logger.Error().Err(err).Msgf("Delivery failed to topic %s", topic) return errors.WithStack(err) } r := result[0].Record logger.Info().Msgf("Delivered message to topic %s [%d] at offset %d", r.Topic, r.Partition, r.Offset) return nil } // ProduceToChannel sends a message asynchronously func (p *Producer) ProduceToChannel(topic string, key string, payload interface{}) error { v, err := json.Marshal(payload) if err != nil { return errors.WithStack(err) } record := &kgo.Record{ Topic: topic, Key: []byte(key), Value: v, } p.pendingMu.Lock() p.pending++ p.pendingMu.Unlock() p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) { p.pendingMu.Lock() p.pending-- p.pendingMu.Unlock() if err != nil { logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic) } }) return nil } // ProduceBinaryToChannel sends binary data asynchronously func (p *Producer) ProduceBinaryToChannel(topic string, key string, payload []byte) error { record := &kgo.Record{ Topic: topic, Key: []byte(key), Value: payload, } p.pendingMu.Lock() p.pending++ p.pendingMu.Unlock() p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) { p.pendingMu.Lock() p.pending-- p.pendingMu.Unlock() if err != nil { logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic) } }) return nil } // ProduceSignalBatch is a stub for batch signal production func (p *Producer) ProduceSignalBatch(topic string, key string, payload interface{}) error { // stub - implement if needed return nil } // ReadEvents is a no-op for franz-go (callbacks handle events) func (p *Producer) ReadEvents() { // franz-go uses callbacks, this is kept for interface compatibility } // Close the Kafka Producer func (p *Producer) Close() { p.Flush(5000) p.cancel() p.client.Close() }