feat kafka

This commit is contained in:
2026-01-27 17:14:05 +08:00
parent 7271181501
commit a93b5fd1aa
3 changed files with 10 additions and 7 deletions

View File

@@ -54,7 +54,6 @@ type RedisConfig struct {
} }
type KafkaConfig struct { type KafkaConfig struct {
GroupID string `yaml:"groupID"`
Brokers []string `yaml:"brokers"` Brokers []string `yaml:"brokers"`
} }

View File

@@ -13,12 +13,12 @@ type Client struct {
consumer sarama.ConsumerGroup consumer sarama.ConsumerGroup
} }
func Init(cfg *config.KafkaConfig) error { func Init(cfg *config.KafkaConfig, appName string) error {
producer, err := getAsyncProducer(cfg) producer, err := getAsyncProducer(cfg)
if err != nil { if err != nil {
return err return err
} }
consumer, err := getConsumer(cfg) consumer, err := getConsumer(cfg, appName)
if err != nil { if err != nil {
return err return err
} }
@@ -40,13 +40,13 @@ func getAsyncProducer(cfg *config.KafkaConfig) (sarama.AsyncProducer, error) {
return sarama.NewAsyncProducer(cfg.Brokers, conf) return sarama.NewAsyncProducer(cfg.Brokers, conf)
} }
func getConsumer(cfg *config.KafkaConfig) (sarama.ConsumerGroup, error) { func getConsumer(cfg *config.KafkaConfig, appName string) (sarama.ConsumerGroup, error) {
conf := sarama.NewConfig() conf := sarama.NewConfig()
conf.Consumer.Return.Errors = true conf.Consumer.Return.Errors = true
conf.Consumer.Group.Session.Timeout = 10 * time.Second conf.Consumer.Group.Session.Timeout = 10 * time.Second
conf.Consumer.Offsets.AutoCommit.Enable = false conf.Consumer.Offsets.AutoCommit.Enable = false
conf.Consumer.Offsets.Initial = sarama.OffsetOldest conf.Consumer.Offsets.Initial = sarama.OffsetOldest
return sarama.NewConsumerGroup(cfg.Brokers, cfg.GroupID, conf) return sarama.NewConsumerGroup(cfg.Brokers, appName, conf)
} }
func Close() error { func Close() error {

View File

@@ -13,6 +13,7 @@ import (
type DB struct { type DB struct {
DefaultModule DefaultModule
cfg *config.DBConfig cfg *config.DBConfig
appName string
} }
func (m *DB) Init() error { func (m *DB) Init() error {
@@ -36,7 +37,7 @@ func (m *DB) Init() error {
} }
// KAFKA // KAFKA
if m.cfg.Kafka != nil { if m.cfg.Kafka != nil {
if err := kafka.Init(m.cfg.Kafka); err != nil { if err := kafka.Init(m.cfg.Kafka, m.appName); err != nil {
return err return err
} }
} }
@@ -66,5 +67,8 @@ func (m *DB) Bind(data ...any) Module {
if cfg, ok := data[0].(*config.DBConfig); ok { if cfg, ok := data[0].(*config.DBConfig); ok {
m.cfg = cfg m.cfg = cfg
} }
if appName, ok := data[1].(string); ok {
m.appName = appName
}
return m return m
} }