package kafka import ( "context" "encoding/json" "errors" "fmt" "git.hlsq.asia/mmorpg/service-common/log" "git.hlsq.asia/mmorpg/service-common/utils" "github.com/IBM/sarama" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "time" ) func NewConsumer() *Consumer { ctx, cancel := context.WithCancel(context.Background()) return &Consumer{ ctx: ctx, cancel: cancel, } } type Consumer struct { ctx context.Context cancel context.CancelFunc } func (c *Consumer) Consume(t []Topic) { for { topicArr := make([]string, 0) handlerMap := make(map[string]Topic) for _, v := range t { topicArr = append(topicArr, v.Name()) handlerMap[v.Name()] = v } err := client.consumer.Consume(c.ctx, topicArr, &handler{ handler: handlerMap, }) if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) { return } time.Sleep(time.Second) } } func (c *Consumer) Stop() error { c.cancel() return nil } type Handler func(context.Context, []byte) error type handler struct { handler map[string]Topic } func (h *handler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (h *handler) Cleanup(sess sarama.ConsumerGroupSession) error { sess.Commit() return nil } func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { ctx := NewCarrier().ExtractConsumer(message.Headers) _, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume") cb := h.handler[message.Topic] if cb == nil { span.SetStatus(otelcodes.Error, "handler not found") span.End() return utils.ErrorsWrap(errors.New("handler not found")) } if err := json.Unmarshal(message.Value, cb); err != nil { span.SetStatus(otelcodes.Error, "handler json.Unmarshal error") span.End() return utils.ErrorsWrap(err, "handler json.Unmarshal error") } if err := cb.OnMessage(ctx); err != nil { if stack, ok := err.(interface{ StackTrace() string }); ok { span.AddEvent("Stack Trace", trace.WithAttributes( attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())), )) } span.SetStatus(otelcodes.Error, err.Error()) span.End() return utils.ErrorsWrap(err, "kafka handler error") } sess.MarkMessage(message, "") span.End() } sess.Commit() return nil } func consumerError() { for err := range client.consumer.Errors() { log.Errorf("kafka consumer error: %v", err) } }