2.2 KiB
2.2 KiB
Kafka Package
Pure Go Kafka client using franz-go.
Why franz-go?
- Pure Go - no CGO, no librdkafka dependency
- Builds to static binaries, works with scratch/distroless images
- Well-maintained, performant, full Kafka protocol support
Usage
Producer
producer, err := kafka.NewProducer(ctx)
if err != nil {
return err
}
defer producer.Close()
// Sync produce
err = producer.Produce("my-topic", "key", myPayload, nil)
// Async produce
err = producer.ProduceToChannel("my-topic", "key", myPayload)
Consumer
consumer, err := kafka.NewConsumer("my-service")
if err != nil {
return err
}
defer consumer.Stop()
// With handler
consumer.Consume([]string{"topic1", "topic2"}, func(key, value []byte) error {
// process message
return nil
})
// Or with channel
events := make(chan *kafka.Message)
go consumer.ConsumeToChannel([]string{"topic1"}, events)
for msg := range events {
// process msg
}
Admin
// Ensure topics exist (creates if missing)
kafka.EnsureTopicsExist([]string{"topic1", "topic2"})
Configuration
All config via environment variables:
| Variable | Default | Description |
|---|---|---|
KAFKA_HOSTS |
localhost:9092 |
Broker addresses (comma-separated) |
KAFKA_SECURITY_PROTOCOL |
plaintext |
plaintext, ssl, sasl_plaintext, sasl_ssl |
KAFKA_SASL_MECHANISMS |
`` | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 |
KAFKA_SASL_USERNAME |
`` | SASL username |
KAFKA_SASL_PASSWORD |
`` | SASL password |
KAFKA_SESSION_TIMEOUT_MS |
45000 |
Consumer session timeout |
KAFKA_HEARTBEAT_INTERVAL_MS |
3000 |
Consumer heartbeat interval |
KAFKA_ENABLE_AUTO_COMMIT |
true |
Auto-commit offsets |
KAFKA_AUTO_COMMIT_INTERVAL_MS |
5000 |
Auto-commit interval |
KAFKA_TIMEOUT |
10000 |
Poll timeout (ms) |
KAFKA_TOPIC_PARTITIONS |
1 |
Default partitions for new topics |
KAFKA_TOPIC_REPLICATION_FACTOR |
1 |
Default replication factor |
Migration from confluent-kafka-go
The interfaces are compatible. Main changes:
*kafka.Message→*kafka.Message(our wrapper type)kafka.TopicPartition→kafka.TopicPartition(our type)- No more CGO build tags or librdkafka deps