Compare commits

...

2 Commits

Author SHA1 Message Date
427fca7ed1 feat kafka 改版 2 2026-01-30 10:53:00 +08:00
a2251b9cb7 feat kafka 改版 1 2026-01-29 16:20:29 +08:00
5 changed files with 476 additions and 464 deletions

View File

@@ -2,6 +2,7 @@ package kafka
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"git.hlsq.asia/mmorpg/service-common/log" "git.hlsq.asia/mmorpg/service-common/log"
@@ -27,10 +28,16 @@ type Consumer struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
func (c *Consumer) Consume(topic string, h Handler) { func (c *Consumer) Consume(t []Topic) {
for { for {
err := client.consumer.Consume(c.ctx, []string{topic}, &handler{ topicArr := make([]string, 0)
handler: h, handlerMap := make(map[string]Topic)
for _, v := range t {
topicArr = append(topicArr, v.Name())
handlerMap[v.Name()] = v
}
err := client.consumer.Consume(c.ctx, topicArr, &handler{
handler: handlerMap,
}) })
if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) { if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) {
return return
@@ -44,11 +51,10 @@ func (c *Consumer) Stop() error {
return nil return nil
} }
type Handler func(context.Context, *sarama.ConsumerMessage) error type Handler func(context.Context, []byte) error
type handler struct { type handler struct {
handler Handler handler map[string]Topic
serviceName string
} }
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error { func (h *handler) Setup(_ sarama.ConsumerGroupSession) error {
@@ -64,7 +70,21 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
for message := range claim.Messages() { for message := range claim.Messages() {
ctx := NewCarrier().ExtractConsumer(message.Headers) ctx := NewCarrier().ExtractConsumer(message.Headers)
_, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume") _, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume")
if err := h.handler(ctx, message); err != nil {
cb := h.handler[message.Topic]
if cb == nil {
span.SetStatus(otelcodes.Error, "handler not found")
span.End()
return utils.ErrorsWrap(errors.New("handler not found"))
}
if err := json.Unmarshal(message.Value, cb); err != nil {
span.SetStatus(otelcodes.Error, "handler json.Unmarshal error")
span.End()
return utils.ErrorsWrap(err, "handler json.Unmarshal error")
}
if err := cb.OnMessage(ctx); err != nil {
if stack, ok := err.(interface{ StackTrace() string }); ok { if stack, ok := err.(interface{ StackTrace() string }); ok {
span.AddEvent("Stack Trace", trace.WithAttributes( span.AddEvent("Stack Trace", trace.WithAttributes(
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())), attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
@@ -74,6 +94,7 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
span.End() span.End()
return utils.ErrorsWrap(err, "kafka handler error") return utils.ErrorsWrap(err, "kafka handler error")
} }
sess.MarkMessage(message, "") sess.MarkMessage(message, "")
span.End() span.End()
} }

View File

@@ -2,6 +2,7 @@ package kafka
import ( import (
"context" "context"
"encoding/json"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes" otelcodes "go.opentelemetry.io/otel/codes"
@@ -14,10 +15,11 @@ func NewProducer() *Producer {
type Producer struct { type Producer struct {
} }
func (c *Producer) Produce(ctx context.Context, topic, value string) { func (c *Producer) Produce(ctx context.Context, data Topic) {
marshal, _ := json.Marshal(data)
msg := &sarama.ProducerMessage{ msg := &sarama.ProducerMessage{
Topic: topic, Topic: data.Name(),
Value: sarama.StringEncoder(value), Value: sarama.ByteEncoder(marshal),
Headers: NewCarrier().Inject(ctx), Headers: NewCarrier().Inject(ctx),
} }
client.producer.Input() <- msg client.producer.Input() <- msg

8
db/kafka/topic.go Normal file
View File

@@ -0,0 +1,8 @@
package kafka
import "context"
type Topic interface {
Name() string
OnMessage(context.Context) error
}

View File

@@ -49,6 +49,10 @@ func (c *Client) Set(ctx context.Context, key string, value interface{}, expirat
return c.cli.Set(ctx, key, value, expiration) return c.cli.Set(ctx, key, value, expiration)
} }
func (c *Client) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd {
return c.cli.SetNX(ctx, key, value, expiration)
}
func (c *Client) Get(ctx context.Context, key string) *redis.StringCmd { func (c *Client) Get(ctx context.Context, key string) *redis.StringCmd {
return c.cli.Get(ctx, key) return c.cli.Get(ctx, key)
} }

File diff suppressed because it is too large Load Diff