diff --git a/db/kafka/consumer.go b/db/kafka/consumer.go index 9f7a133..9763b4d 100644 --- a/db/kafka/consumer.go +++ b/db/kafka/consumer.go @@ -27,10 +27,16 @@ type Consumer struct { cancel context.CancelFunc } -func (c *Consumer) Consume(topic string, h Handler) { +func (c *Consumer) Consume(h map[Topic]Handler) { for { - err := client.consumer.Consume(c.ctx, []string{topic}, &handler{ - handler: h, + topicArr := make([]string, 0) + handlerMap := make(map[string]Handler) + for t, h2 := range h { + topicArr = append(topicArr, t.Name()) + handlerMap[t.Name()] = h2 + } + err := client.consumer.Consume(c.ctx, topicArr, &handler{ + handler: handlerMap, }) if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) { return @@ -44,11 +50,10 @@ func (c *Consumer) Stop() error { return nil } -type Handler func(context.Context, *sarama.ConsumerMessage) error +type Handler func(context.Context, []byte) error type handler struct { - handler Handler - serviceName string + handler map[string]Handler } func (h *handler) Setup(_ sarama.ConsumerGroupSession) error { @@ -64,7 +69,15 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co for message := range claim.Messages() { ctx := NewCarrier().ExtractConsumer(message.Headers) _, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume") - if err := h.handler(ctx, message); err != nil { + + cb := h.handler[message.Topic] + if cb == nil { + span.SetStatus(otelcodes.Error, "handler not found") + span.End() + return utils.ErrorsWrap(errors.New("handler not found")) + } + + if err := cb(ctx, message.Value); err != nil { if stack, ok := err.(interface{ StackTrace() string }); ok { span.AddEvent("Stack Trace", trace.WithAttributes( attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())), @@ -74,6 +87,7 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co span.End() return utils.ErrorsWrap(err, "kafka handler error") } + sess.MarkMessage(message, "") span.End() } diff --git a/db/kafka/producer.go b/db/kafka/producer.go index a05d0ca..0ab37bc 100644 --- a/db/kafka/producer.go +++ b/db/kafka/producer.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "encoding/json" "github.com/IBM/sarama" "go.opentelemetry.io/otel" otelcodes "go.opentelemetry.io/otel/codes" @@ -14,10 +15,11 @@ func NewProducer() *Producer { type Producer struct { } -func (c *Producer) Produce(ctx context.Context, topic, value string) { +func (c *Producer) Produce(ctx context.Context, data Topic) { + marshal, _ := json.Marshal(data) msg := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.StringEncoder(value), + Topic: data.Name(), + Value: sarama.ByteEncoder(marshal), Headers: NewCarrier().Inject(ctx), } client.producer.Input() <- msg diff --git a/db/kafka/topic.go b/db/kafka/topic.go new file mode 100644 index 0000000..c6a3567 --- /dev/null +++ b/db/kafka/topic.go @@ -0,0 +1,5 @@ +package kafka + +type Topic interface { + Name() string +}