feat kafka 改版 1
This commit is contained in:
@@ -27,10 +27,16 @@ type Consumer struct {
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Consumer) Consume(topic string, h Handler) {
|
func (c *Consumer) Consume(h map[Topic]Handler) {
|
||||||
for {
|
for {
|
||||||
err := client.consumer.Consume(c.ctx, []string{topic}, &handler{
|
topicArr := make([]string, 0)
|
||||||
handler: h,
|
handlerMap := make(map[string]Handler)
|
||||||
|
for t, h2 := range h {
|
||||||
|
topicArr = append(topicArr, t.Name())
|
||||||
|
handlerMap[t.Name()] = h2
|
||||||
|
}
|
||||||
|
err := client.consumer.Consume(c.ctx, topicArr, &handler{
|
||||||
|
handler: handlerMap,
|
||||||
})
|
})
|
||||||
if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) {
|
if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) {
|
||||||
return
|
return
|
||||||
@@ -44,11 +50,10 @@ func (c *Consumer) Stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Handler func(context.Context, *sarama.ConsumerMessage) error
|
type Handler func(context.Context, []byte) error
|
||||||
|
|
||||||
type handler struct {
|
type handler struct {
|
||||||
handler Handler
|
handler map[string]Handler
|
||||||
serviceName string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error {
|
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error {
|
||||||
@@ -64,7 +69,15 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
|
|||||||
for message := range claim.Messages() {
|
for message := range claim.Messages() {
|
||||||
ctx := NewCarrier().ExtractConsumer(message.Headers)
|
ctx := NewCarrier().ExtractConsumer(message.Headers)
|
||||||
_, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume")
|
_, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume")
|
||||||
if err := h.handler(ctx, message); err != nil {
|
|
||||||
|
cb := h.handler[message.Topic]
|
||||||
|
if cb == nil {
|
||||||
|
span.SetStatus(otelcodes.Error, "handler not found")
|
||||||
|
span.End()
|
||||||
|
return utils.ErrorsWrap(errors.New("handler not found"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cb(ctx, message.Value); err != nil {
|
||||||
if stack, ok := err.(interface{ StackTrace() string }); ok {
|
if stack, ok := err.(interface{ StackTrace() string }); ok {
|
||||||
span.AddEvent("Stack Trace", trace.WithAttributes(
|
span.AddEvent("Stack Trace", trace.WithAttributes(
|
||||||
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
|
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
|
||||||
@@ -74,6 +87,7 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
|
|||||||
span.End()
|
span.End()
|
||||||
return utils.ErrorsWrap(err, "kafka handler error")
|
return utils.ErrorsWrap(err, "kafka handler error")
|
||||||
}
|
}
|
||||||
|
|
||||||
sess.MarkMessage(message, "")
|
sess.MarkMessage(message, "")
|
||||||
span.End()
|
span.End()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package kafka
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"github.com/IBM/sarama"
|
"github.com/IBM/sarama"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
otelcodes "go.opentelemetry.io/otel/codes"
|
otelcodes "go.opentelemetry.io/otel/codes"
|
||||||
@@ -14,10 +15,11 @@ func NewProducer() *Producer {
|
|||||||
type Producer struct {
|
type Producer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Producer) Produce(ctx context.Context, topic, value string) {
|
func (c *Producer) Produce(ctx context.Context, data Topic) {
|
||||||
|
marshal, _ := json.Marshal(data)
|
||||||
msg := &sarama.ProducerMessage{
|
msg := &sarama.ProducerMessage{
|
||||||
Topic: topic,
|
Topic: data.Name(),
|
||||||
Value: sarama.StringEncoder(value),
|
Value: sarama.ByteEncoder(marshal),
|
||||||
Headers: NewCarrier().Inject(ctx),
|
Headers: NewCarrier().Inject(ctx),
|
||||||
}
|
}
|
||||||
client.producer.Input() <- msg
|
client.producer.Input() <- msg
|
||||||
|
|||||||
5
db/kafka/topic.go
Normal file
5
db/kafka/topic.go
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
type Topic interface {
|
||||||
|
Name() string
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user