diff --git a/db/kafka/client.go b/db/kafka/client.go index 73ccf04..1cc2809 100644 --- a/db/kafka/client.go +++ b/db/kafka/client.go @@ -9,12 +9,11 @@ import ( var client *Client type Client struct { - producer sarama.AsyncProducer - consumer sarama.ConsumerGroup - serverName string + producer sarama.AsyncProducer + consumer sarama.ConsumerGroup } -func Init(cfg *config.KafkaConfig, serverName string) error { +func Init(cfg *config.KafkaConfig) error { producer, err := getAsyncProducer(cfg) if err != nil { return err @@ -24,9 +23,8 @@ func Init(cfg *config.KafkaConfig, serverName string) error { return err } client = &Client{ - producer: producer, - consumer: consumer, - serverName: serverName, + producer: producer, + consumer: consumer, } go producerError() go consumerError() diff --git a/db/kafka/consumer.go b/db/kafka/consumer.go index 98c573d..9f7a133 100644 --- a/db/kafka/consumer.go +++ b/db/kafka/consumer.go @@ -63,7 +63,7 @@ func (h *handler) Cleanup(sess sarama.ConsumerGroupSession) error { func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { ctx := NewCarrier().ExtractConsumer(message.Headers) - _, span := otel.Tracer(client.serverName).Start(ctx, "kafka.consume") + _, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume") if err := h.handler(ctx, message); err != nil { if stack, ok := err.(interface{ StackTrace() string }); ok { span.AddEvent("Stack Trace", trace.WithAttributes( diff --git a/db/kafka/producer.go b/db/kafka/producer.go index 96f0a39..a05d0ca 100644 --- a/db/kafka/producer.go +++ b/db/kafka/producer.go @@ -26,7 +26,7 @@ func (c *Producer) Produce(ctx context.Context, topic, value string) { func producerError() { for err := range client.producer.Errors() { ctx := NewCarrier().ExtractProducer(err.Msg.Headers) - _, span := otel.Tracer(client.serverName).Start(ctx, "kafka.producer") + _, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.producer.error") span.SetStatus(otelcodes.Error, err.Error()) span.End() } diff --git a/module/db.go b/module/db.go index 29f4427..98bd100 100644 --- a/module/db.go +++ b/module/db.go @@ -12,8 +12,7 @@ import ( // DB 数据库模块 type DB struct { DefaultModule - cfg *config.DBConfig - serviceName string + cfg *config.DBConfig } func (m *DB) Init() error { @@ -37,7 +36,7 @@ func (m *DB) Init() error { } // KAFKA if m.cfg.Kafka != nil { - if err := kafka.Init(m.cfg.Kafka, m.serviceName); err != nil { + if err := kafka.Init(m.cfg.Kafka); err != nil { return err } } @@ -67,8 +66,5 @@ func (m *DB) Bind(data ...any) Module { if cfg, ok := data[0].(*config.DBConfig); ok { m.cfg = cfg } - if name, ok := data[1].(string); ok { - m.serviceName = name - } return m }