package kafka import ( "context" "encoding/json" "sync" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/pkg/errors" "github.com/twmb/franz-go/pkg/kgo" ) // AsyncProducer is an async-first producer implementation type AsyncProducer struct { client *kgo.Client ctx context.Context cancel context.CancelFunc pending int pendingMu sync.Mutex } // NewAsyncProducer serves as factory method for async producer to kafka func NewAsyncProducer(ctx context.Context) (ProducerInterface, error) { cfg := LoadConfig() opts := buildClientOpts(cfg) // Async producer options - optimized for throughput opts = append(opts, kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.AllowAutoTopicCreation(), kgo.ProducerLinger(5), // 5ms linger for batching ) client, err := kgo.NewClient(opts...) if err != nil { return nil, errors.WithStack(err) } logger.Info().Msgf("NewAsyncProducer hosts: %v", cfg.Brokers) pctx, cancel := context.WithCancel(ctx) return &AsyncProducer{ client: client, ctx: pctx, cancel: cancel, }, nil } // Len returns approximate pending message count func (p *AsyncProducer) Len() int { p.pendingMu.Lock() defer p.pendingMu.Unlock() return p.pending } // Flush waits for all buffered records to be flushed func (p *AsyncProducer) Flush(timeoutMs int) int { ctx := p.ctx if timeoutMs > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(p.ctx, durationMs(timeoutMs)) defer cancel() } if err := p.client.Flush(ctx); err != nil { logger.Warn().Err(err).Msg("flush incomplete") return p.Len() } return 0 } // Produce sends a JSON-encoded message asynchronously func (p *AsyncProducer) Produce(topic string, key string, payload interface{}, headers map[string][]byte) error { v, err := json.Marshal(payload) if err != nil { return errors.WithStack(err) } return p.ProduceBinary(topic, key, v, headers) } // ProduceBinary sends a binary message asynchronously func (p *AsyncProducer) ProduceBinary(topic string, key string, data []byte, headers map[string][]byte) error { record := &kgo.Record{ Topic: topic, Key: []byte(key), Value: data, Headers: makeHeaders(headers), } p.pendingMu.Lock() p.pending++ p.pendingMu.Unlock() p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) { p.pendingMu.Lock() p.pending-- p.pendingMu.Unlock() if err != nil { logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic) } }) return nil } // ProduceToChannel sends a message asynchronously func (p *AsyncProducer) ProduceToChannel(topic string, key string, payload interface{}) error { v, err := json.Marshal(payload) if err != nil { return errors.WithStack(err) } record := &kgo.Record{ Topic: topic, Key: []byte(key), Value: v, } p.pendingMu.Lock() p.pending++ p.pendingMu.Unlock() p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) { p.pendingMu.Lock() p.pending-- p.pendingMu.Unlock() if err != nil { logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic) } }) return nil } // ProduceBinaryToChannel sends binary data asynchronously func (p *AsyncProducer) ProduceBinaryToChannel(topic string, key string, payload []byte) error { record := &kgo.Record{ Topic: topic, Key: []byte(key), Value: payload, } p.pendingMu.Lock() p.pending++ p.pendingMu.Unlock() p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) { p.pendingMu.Lock() p.pending-- p.pendingMu.Unlock() if err != nil { logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic) } }) return nil } // ProduceSignalBatch sends a batch of CAN signals (custom format for Clickhouse) func (p *AsyncProducer) ProduceSignalBatch(topic string, key string, payload interface{}) error { v, err := json.Marshal(payload) if err != nil { return errors.WithStack(err) } // Strip JSON array brackets for Clickhouse format record := &kgo.Record{ Topic: topic, Key: []byte(key), Value: v[1 : len(v)-1], } p.pendingMu.Lock() p.pending++ p.pendingMu.Unlock() p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) { p.pendingMu.Lock() p.pending-- p.pendingMu.Unlock() if err != nil { logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic) } }) return nil } // ReadEvents listens to producer events (callbacks handle this in franz-go) func (p *AsyncProducer) ReadEvents() { // franz-go uses callbacks, kept for interface compatibility } // Close the Kafka Producer func (p *AsyncProducer) Close() { p.Flush(5000) p.cancel() p.client.Close() }