Compare commits
2 Commits
be7b922cbf
...
427fca7ed1
| Author | SHA1 | Date | |
|---|---|---|---|
| 427fca7ed1 | |||
| a2251b9cb7 |
@@ -2,6 +2,7 @@ package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.hlsq.asia/mmorpg/service-common/log"
|
||||
@@ -27,10 +28,16 @@ type Consumer struct {
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (c *Consumer) Consume(topic string, h Handler) {
|
||||
func (c *Consumer) Consume(t []Topic) {
|
||||
for {
|
||||
err := client.consumer.Consume(c.ctx, []string{topic}, &handler{
|
||||
handler: h,
|
||||
topicArr := make([]string, 0)
|
||||
handlerMap := make(map[string]Topic)
|
||||
for _, v := range t {
|
||||
topicArr = append(topicArr, v.Name())
|
||||
handlerMap[v.Name()] = v
|
||||
}
|
||||
err := client.consumer.Consume(c.ctx, topicArr, &handler{
|
||||
handler: handlerMap,
|
||||
})
|
||||
if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) {
|
||||
return
|
||||
@@ -44,11 +51,10 @@ func (c *Consumer) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type Handler func(context.Context, *sarama.ConsumerMessage) error
|
||||
type Handler func(context.Context, []byte) error
|
||||
|
||||
type handler struct {
|
||||
handler Handler
|
||||
serviceName string
|
||||
handler map[string]Topic
|
||||
}
|
||||
|
||||
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error {
|
||||
@@ -64,7 +70,21 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
|
||||
for message := range claim.Messages() {
|
||||
ctx := NewCarrier().ExtractConsumer(message.Headers)
|
||||
_, span := otel.Tracer("common.db.kafka").Start(ctx, "kafka.consume")
|
||||
if err := h.handler(ctx, message); err != nil {
|
||||
|
||||
cb := h.handler[message.Topic]
|
||||
if cb == nil {
|
||||
span.SetStatus(otelcodes.Error, "handler not found")
|
||||
span.End()
|
||||
return utils.ErrorsWrap(errors.New("handler not found"))
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(message.Value, cb); err != nil {
|
||||
span.SetStatus(otelcodes.Error, "handler json.Unmarshal error")
|
||||
span.End()
|
||||
return utils.ErrorsWrap(err, "handler json.Unmarshal error")
|
||||
}
|
||||
|
||||
if err := cb.OnMessage(ctx); err != nil {
|
||||
if stack, ok := err.(interface{ StackTrace() string }); ok {
|
||||
span.AddEvent("Stack Trace", trace.WithAttributes(
|
||||
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
|
||||
@@ -74,6 +94,7 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
|
||||
span.End()
|
||||
return utils.ErrorsWrap(err, "kafka handler error")
|
||||
}
|
||||
|
||||
sess.MarkMessage(message, "")
|
||||
span.End()
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/IBM/sarama"
|
||||
"go.opentelemetry.io/otel"
|
||||
otelcodes "go.opentelemetry.io/otel/codes"
|
||||
@@ -14,10 +15,11 @@ func NewProducer() *Producer {
|
||||
type Producer struct {
|
||||
}
|
||||
|
||||
func (c *Producer) Produce(ctx context.Context, topic, value string) {
|
||||
func (c *Producer) Produce(ctx context.Context, data Topic) {
|
||||
marshal, _ := json.Marshal(data)
|
||||
msg := &sarama.ProducerMessage{
|
||||
Topic: topic,
|
||||
Value: sarama.StringEncoder(value),
|
||||
Topic: data.Name(),
|
||||
Value: sarama.ByteEncoder(marshal),
|
||||
Headers: NewCarrier().Inject(ctx),
|
||||
}
|
||||
client.producer.Input() <- msg
|
||||
|
||||
8
db/kafka/topic.go
Normal file
8
db/kafka/topic.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package kafka
|
||||
|
||||
import "context"
|
||||
|
||||
type Topic interface {
|
||||
Name() string
|
||||
OnMessage(context.Context) error
|
||||
}
|
||||
@@ -49,6 +49,10 @@ func (c *Client) Set(ctx context.Context, key string, value interface{}, expirat
|
||||
return c.cli.Set(ctx, key, value, expiration)
|
||||
}
|
||||
|
||||
func (c *Client) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd {
|
||||
return c.cli.SetNX(ctx, key, value, expiration)
|
||||
}
|
||||
|
||||
func (c *Client) Get(ctx context.Context, key string) *redis.StringCmd {
|
||||
return c.cli.Get(ctx, key)
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user