204 lines
4.8 KiB
Go
204 lines
4.8 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sync"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/pkg/errors"
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
)
|
|
|
|
// ProducerInterface interface for Producer utility
|
|
type ProducerInterface interface {
|
|
Produce(topic string, key string, payload interface{}, headers map[string][]byte) error
|
|
ProduceBinary(topic string, key string, data []byte, headers map[string][]byte) error
|
|
ProduceBinaryToChannel(topic string, key string, payload []byte) error
|
|
ProduceToChannel(string, string, interface{}) error
|
|
ProduceSignalBatch(string, string, interface{}) error
|
|
ReadEvents()
|
|
Len() int
|
|
Flush(timeoutMs int) int
|
|
Close()
|
|
}
|
|
|
|
// Producer utility to produce messages
|
|
type Producer struct {
|
|
client *kgo.Client
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
pending int
|
|
pendingMu sync.Mutex
|
|
}
|
|
|
|
// NewProducer serves as factory method for producer to kafka
|
|
func NewProducer(ctx context.Context) (ProducerInterface, error) {
|
|
cfg := LoadConfig()
|
|
opts := buildClientOpts(cfg)
|
|
|
|
// Producer-specific options
|
|
opts = append(opts,
|
|
kgo.ProducerBatchCompression(kgo.NoCompression()),
|
|
kgo.AllowAutoTopicCreation(),
|
|
)
|
|
|
|
client, err := kgo.NewClient(opts...)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
logger.Info().Msgf("NewProducer hosts: %v", cfg.Brokers)
|
|
|
|
pctx, cancel := context.WithCancel(ctx)
|
|
return &Producer{
|
|
client: client,
|
|
ctx: pctx,
|
|
cancel: cancel,
|
|
}, nil
|
|
}
|
|
|
|
// Len returns len of messages in queue (approximate)
|
|
func (p *Producer) Len() int {
|
|
p.pendingMu.Lock()
|
|
defer p.pendingMu.Unlock()
|
|
return p.pending
|
|
}
|
|
|
|
// Flush waits for all buffered records to be flushed
|
|
func (p *Producer) Flush(timeoutMs int) int {
|
|
ctx := p.ctx
|
|
if timeoutMs > 0 {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(p.ctx, durationMs(timeoutMs))
|
|
defer cancel()
|
|
}
|
|
if err := p.client.Flush(ctx); err != nil {
|
|
logger.Warn().Err(err).Msg("flush incomplete")
|
|
return p.Len()
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// Produce sends a JSON-encoded message to Kafka
|
|
func (p *Producer) Produce(topic string, key string, payload interface{}, headers map[string][]byte) error {
|
|
v, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
return p.ProduceBinary(topic, key, v, headers)
|
|
}
|
|
|
|
func makeHeaders(headers map[string][]byte) []kgo.RecordHeader {
|
|
if headers == nil {
|
|
return nil
|
|
}
|
|
result := make([]kgo.RecordHeader, 0, len(headers))
|
|
for k, v := range headers {
|
|
result = append(result, kgo.RecordHeader{Key: k, Value: v})
|
|
}
|
|
return result
|
|
}
|
|
|
|
// ProduceBinary sends a binary message to Kafka synchronously
|
|
func (p *Producer) ProduceBinary(topic string, key string, data []byte, headers map[string][]byte) error {
|
|
record := &kgo.Record{
|
|
Topic: topic,
|
|
Key: []byte(key),
|
|
Value: data,
|
|
Headers: makeHeaders(headers),
|
|
}
|
|
|
|
p.pendingMu.Lock()
|
|
p.pending++
|
|
p.pendingMu.Unlock()
|
|
|
|
result := p.client.ProduceSync(p.ctx, record)
|
|
|
|
p.pendingMu.Lock()
|
|
p.pending--
|
|
p.pendingMu.Unlock()
|
|
|
|
if err := result.FirstErr(); err != nil {
|
|
logger.Error().Err(err).Msgf("Delivery failed to topic %s", topic)
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
r := result[0].Record
|
|
logger.Info().Msgf("Delivered message to topic %s [%d] at offset %d", r.Topic, r.Partition, r.Offset)
|
|
return nil
|
|
}
|
|
|
|
// ProduceToChannel sends a message asynchronously
|
|
func (p *Producer) ProduceToChannel(topic string, key string, payload interface{}) error {
|
|
v, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
record := &kgo.Record{
|
|
Topic: topic,
|
|
Key: []byte(key),
|
|
Value: v,
|
|
}
|
|
|
|
p.pendingMu.Lock()
|
|
p.pending++
|
|
p.pendingMu.Unlock()
|
|
|
|
p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) {
|
|
p.pendingMu.Lock()
|
|
p.pending--
|
|
p.pendingMu.Unlock()
|
|
|
|
if err != nil {
|
|
logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic)
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProduceBinaryToChannel sends binary data asynchronously
|
|
func (p *Producer) ProduceBinaryToChannel(topic string, key string, payload []byte) error {
|
|
record := &kgo.Record{
|
|
Topic: topic,
|
|
Key: []byte(key),
|
|
Value: payload,
|
|
}
|
|
|
|
p.pendingMu.Lock()
|
|
p.pending++
|
|
p.pendingMu.Unlock()
|
|
|
|
p.client.Produce(p.ctx, record, func(r *kgo.Record, err error) {
|
|
p.pendingMu.Lock()
|
|
p.pending--
|
|
p.pendingMu.Unlock()
|
|
|
|
if err != nil {
|
|
logger.Error().Err(err).Msgf("Async delivery failed to topic %s", topic)
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProduceSignalBatch is a stub for batch signal production
|
|
func (p *Producer) ProduceSignalBatch(topic string, key string, payload interface{}) error {
|
|
// stub - implement if needed
|
|
return nil
|
|
}
|
|
|
|
// ReadEvents is a no-op for franz-go (callbacks handle events)
|
|
func (p *Producer) ReadEvents() {
|
|
// franz-go uses callbacks, this is kept for interface compatibility
|
|
}
|
|
|
|
// Close the Kafka Producer
|
|
func (p *Producer) Close() {
|
|
p.Flush(5000)
|
|
p.cancel()
|
|
p.client.Close()
|
|
}
|