package kafka import ( "context" "errors" "fmt" "git.hlsq.asia/mmorpg/service-common/log" "git.hlsq.asia/mmorpg/service-common/utils" "github.com/IBM/sarama" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "time" ) func NewConsumer() *Consumer { ctx, cancel := context.WithCancel(context.Background()) return &Consumer{ ctx: ctx, cancel: cancel, } } type Consumer struct { ctx context.Context cancel context.CancelFunc } func (c *Consumer) Consume(topic string, h Handler) { for { err := client.consumer.Consume(c.ctx, []string{topic}, &handler{ handler: h, }) if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) { return } time.Sleep(time.Second) } } func (c *Consumer) Stop() error { c.cancel() return nil } type Handler func(context.Context, *sarama.ConsumerMessage) error type handler struct { handler Handler serviceName string } func (h *handler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (h *handler) Cleanup(sess sarama.ConsumerGroupSession) error { sess.Commit() return nil } func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { ctx := NewCarrier().ExtractConsumer(message.Headers) _, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume") if err := h.handler(ctx, message); err != nil { if stack, ok := err.(interface{ StackTrace() string }); ok { span.AddEvent("Stack Trace", trace.WithAttributes( attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())), )) } span.SetStatus(otelcodes.Error, err.Error()) span.End() return utils.ErrorsWrap(err, "kafka handler error") } sess.MarkMessage(message, "") span.End() } sess.Commit() return nil } func consumerError() { for err := range client.consumer.Errors() { log.Errorf("kafka consumer error: %v", err) } }