Files
cloud-services/pkg/kafka/config.go

46 lines
1.2 KiB
Go

package kafka
import (
"strings"
"time"
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
"github.com/twmb/franz-go/pkg/kgo"
)
// Config holds kafka connection configuration
type Config struct {
Brokers []string
SecurityProtocol string
SessionTimeout time.Duration
HeartbeatInterval time.Duration
}
// LoadConfig loads kafka configuration from environment
func LoadConfig() Config {
hosts := envtool.GetEnv("KAFKA_HOSTS", "localhost:9092")
brokers := strings.Split(hosts, ",")
for i := range brokers {
brokers[i] = strings.TrimSpace(brokers[i])
}
return Config{
Brokers: brokers,
SecurityProtocol: envtool.GetEnv("KAFKA_SECURITY_PROTOCOL", "plaintext"),
SessionTimeout: time.Duration(envtool.GetEnvInt("KAFKA_SESSION_TIMEOUT_MS", 45000)) * time.Millisecond,
HeartbeatInterval: time.Duration(envtool.GetEnvInt("KAFKA_HEARTBEAT_INTERVAL_MS", 3000)) * time.Millisecond,
}
}
// buildClientOpts creates common kgo.Opt slice from config
func buildClientOpts(cfg Config) []kgo.Opt {
opts := []kgo.Opt{
kgo.SeedBrokers(cfg.Brokers...),
}
// SASL can be added here if needed in the future
// For now we use PLAINTEXT (no auth) for the mini cluster
return opts
}