diff --git a/config/config.go b/config/config.go index e49e9d2..e3ccb8e 100644 --- a/config/config.go +++ b/config/config.go @@ -54,7 +54,6 @@ type RedisConfig struct { } type KafkaConfig struct { - GroupID string `yaml:"groupID"` Brokers []string `yaml:"brokers"` } diff --git a/db/kafka/client.go b/db/kafka/client.go index 1cc2809..cadee83 100644 --- a/db/kafka/client.go +++ b/db/kafka/client.go @@ -13,12 +13,12 @@ type Client struct { consumer sarama.ConsumerGroup } -func Init(cfg *config.KafkaConfig) error { +func Init(cfg *config.KafkaConfig, appName string) error { producer, err := getAsyncProducer(cfg) if err != nil { return err } - consumer, err := getConsumer(cfg) + consumer, err := getConsumer(cfg, appName) if err != nil { return err } @@ -40,13 +40,13 @@ func getAsyncProducer(cfg *config.KafkaConfig) (sarama.AsyncProducer, error) { return sarama.NewAsyncProducer(cfg.Brokers, conf) } -func getConsumer(cfg *config.KafkaConfig) (sarama.ConsumerGroup, error) { +func getConsumer(cfg *config.KafkaConfig, appName string) (sarama.ConsumerGroup, error) { conf := sarama.NewConfig() conf.Consumer.Return.Errors = true conf.Consumer.Group.Session.Timeout = 10 * time.Second conf.Consumer.Offsets.AutoCommit.Enable = false conf.Consumer.Offsets.Initial = sarama.OffsetOldest - return sarama.NewConsumerGroup(cfg.Brokers, cfg.GroupID, conf) + return sarama.NewConsumerGroup(cfg.Brokers, appName, conf) } func Close() error { diff --git a/module/db.go b/module/db.go index 98bd100..6a04891 100644 --- a/module/db.go +++ b/module/db.go @@ -12,7 +12,8 @@ import ( // DB 数据库模块 type DB struct { DefaultModule - cfg *config.DBConfig + cfg *config.DBConfig + appName string } func (m *DB) Init() error { @@ -36,7 +37,7 @@ func (m *DB) Init() error { } // KAFKA if m.cfg.Kafka != nil { - if err := kafka.Init(m.cfg.Kafka); err != nil { + if err := kafka.Init(m.cfg.Kafka, m.appName); err != nil { return err } } @@ -66,5 +67,8 @@ func (m *DB) Bind(data ...any) Module { if cfg, ok := data[0].(*config.DBConfig); ok { m.cfg = cfg } + if appName, ok := data[1].(string); ok { + m.appName = appName + } return m }