Files
service-common/db/kafka/consumer.go

89 lines
2.0 KiB
Go

package kafka
import (
"context"
"errors"
"fmt"
"git.hlsq.asia/mmorpg/service-common/log"
"git.hlsq.asia/mmorpg/service-common/utils"
"github.com/IBM/sarama"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"time"
)
func NewConsumer() *Consumer {
ctx, cancel := context.WithCancel(context.Background())
return &Consumer{
ctx: ctx,
cancel: cancel,
}
}
type Consumer struct {
ctx context.Context
cancel context.CancelFunc
}
func (c *Consumer) Consume(topic string, h Handler) {
for {
err := client.consumer.Consume(c.ctx, []string{topic}, &handler{
handler: h,
})
if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) {
return
}
time.Sleep(time.Second)
}
}
func (c *Consumer) Stop() error {
c.cancel()
return nil
}
type Handler func(context.Context, *sarama.ConsumerMessage) error
type handler struct {
handler Handler
serviceName string
}
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (h *handler) Cleanup(sess sarama.ConsumerGroupSession) error {
sess.Commit()
return nil
}
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
ctx := NewCarrier().ExtractConsumer(message.Headers)
_, span := otel.Tracer(client.serverName).Start(ctx, "kafka.consume")
if err := h.handler(ctx, message); err != nil {
if stack, ok := err.(interface{ StackTrace() string }); ok {
span.AddEvent("Stack Trace", trace.WithAttributes(
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
))
}
span.SetStatus(otelcodes.Error, err.Error())
span.End()
return utils.ErrorsWrap(err, "kafka handler error")
}
sess.MarkMessage(message, "")
span.End()
}
sess.Commit()
return nil
}
func consumerError() {
for err := range client.consumer.Errors() {
log.Errorf("kafka consumer error: %v", err)
}
}