Files
cloud-services/pkg/kafka/async_producer.go

203 lines
4.5 KiB
Go

package kafka
import (
"context"
"encoding/json"
"sync"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/pkg/errors"
"github.com/twmb/franz-go/pkg/kgo"
)
// AsyncProducer is an async-first producer implementation
type AsyncProducer struct {
client *kgo.Client
ctx context.Context
cancel context.CancelFunc
pending int
pendingMu sync.Mutex
}
// NewAsyncProducer serves as factory method for async producer to kafka
func NewAsyncProducer(ctx context.Context) (ProducerInterface, error) {
cfg := LoadConfig()
opts := buildClientOpts(cfg)
// Async producer options - optimized for throughput
opts = append(opts,
kgo.ProducerBatchCompression(kgo.NoCompression()),
kgo.AllowAutoTopicCreation(),
kgo.ProducerLinger(5), // 5ms linger for batching
)
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, errors.WithStack(err)
}
logger.Info().Msgf("NewAsyncProducer hosts: %v", cfg.Brokers)
pctx, cancel := context.WithCancel(ctx)
return &AsyncProducer{
client: client,
ctx: pctx,
cancel: cancel,
}, nil
}
// Len returns approximate pending message count
func (p *AsyncProducer) Len() int {
p.pendingMu.Lock()
defer p.pendingMu.Unlock()
return p.pending
}
// Flush waits for all buffered records to be flushed
func (p *AsyncProducer) Flush(timeoutMs int) int {
ctx := p.ctx
if timeoutMs > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(p.ctx, durationMs(timeoutMs))
defer cancel()
}
if err := p.client.Flush(ctx); err != nil {
logger.Warn().Err(err).Msg("flush incomplete")
return p.Len()
}
return 0
}
// Produce sends a JSON-encoded message asynchronously
func (p *AsyncProducer) Produce(topic string, key string, payload interface{}, headers map[string][]byte) error {
v, err := json.Marshal(payload)
if err != nil {
return errors.WithStack(err)
}
return p.ProduceBinary(topic, key, v, headers)
}
// ProduceBinary sends a binary message asynchronously
func (p *AsyncProducer) ProduceBinary(topic string, key string, data []byte, headers map[string][]byte) error {
record := &kgo.Record{
Topic: topic,
Key: []byte(key),
Value: data,
Headers: makeHeaders(headers),
}
p.pendingMu.Lock()
p.pending++
p.pendingMu.Unlock()
p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) {
p.pendingMu.Lock()
p.pending--
p.pendingMu.Unlock()
if err != nil {
logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic)
}
})
return nil
}
// ProduceToChannel sends a message asynchronously
func (p *AsyncProducer) ProduceToChannel(topic string, key string, payload interface{}) error {
v, err := json.Marshal(payload)
if err != nil {
return errors.WithStack(err)
}
record := &kgo.Record{
Topic: topic,
Key: []byte(key),
Value: v,
}
p.pendingMu.Lock()
p.pending++
p.pendingMu.Unlock()
p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) {
p.pendingMu.Lock()
p.pending--
p.pendingMu.Unlock()
if err != nil {
logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic)
}
})
return nil
}
// ProduceBinaryToChannel sends binary data asynchronously
func (p *AsyncProducer) ProduceBinaryToChannel(topic string, key string, payload []byte) error {
record := &kgo.Record{
Topic: topic,
Key: []byte(key),
Value: payload,
}
p.pendingMu.Lock()
p.pending++
p.pendingMu.Unlock()
p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) {
p.pendingMu.Lock()
p.pending--
p.pendingMu.Unlock()
if err != nil {
logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic)
}
})
return nil
}
// ProduceSignalBatch sends a batch of CAN signals (custom format for Clickhouse)
func (p *AsyncProducer) ProduceSignalBatch(topic string, key string, payload interface{}) error {
v, err := json.Marshal(payload)
if err != nil {
return errors.WithStack(err)
}
// Strip JSON array brackets for Clickhouse format
record := &kgo.Record{
Topic: topic,
Key: []byte(key),
Value: v[1 : len(v)-1],
}
p.pendingMu.Lock()
p.pending++
p.pendingMu.Unlock()
p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) {
p.pendingMu.Lock()
p.pending--
p.pendingMu.Unlock()
if err != nil {
logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic)
}
})
return nil
}
// ReadEvents listens to producer events (callbacks handle this in franz-go)
func (p *AsyncProducer) ReadEvents() {
// franz-go uses callbacks, kept for interface compatibility
}
// Close the Kafka Producer
func (p *AsyncProducer) Close() {
p.Flush(5000)
p.cancel()
p.client.Close()
}