package kafka import ( "context" "time" "github.com/fiskerinc/cloud-services/pkg/logger" "github.com/fiskerinc/cloud-services/pkg/utils/envtool" "github.com/pkg/errors" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" ) // Global admin client var adminClient *kadm.Client func init() { cfg := LoadConfig() client, err := kgo.NewClient(kgo.SeedBrokers(cfg.Brokers...)) if err != nil { logger.Warn().Err(err).Msg("failed to create kafka admin client") return } adminClient = kadm.NewClient(client) } // EnsureTopicsExist checks Kafka for topics, creates them if they don't exist func EnsureTopicsExist(topics []string) error { if adminClient == nil { return errors.New("kafka admin client not initialized") } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(envtool.GetEnvInt("KAFKA_TIMEOUT", 30))*time.Second) defer cancel() partitions := int32(envtool.GetEnvInt("KAFKA_TOPIC_PARTITIONS", 1)) replicationFactor := int16(envtool.GetEnvInt("KAFKA_TOPIC_REPLICATION_FACTOR", 1)) responses, err := adminClient.CreateTopics(ctx, partitions, replicationFactor, nil, topics...) if err != nil { return errors.WithStack(err) } // Check for errors (topic already exists is not an error) for _, r := range responses { if r.Err != nil && r.Err.Error() != "TOPIC_ALREADY_EXISTS" { logger.Warn().Err(r.Err).Msgf("failed to create topic %s", r.Topic) } } return nil }