feat kafka 改版 2
This commit is contained in:
@@ -2,6 +2,7 @@ package kafka
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.hlsq.asia/mmorpg/service-common/log"
|
"git.hlsq.asia/mmorpg/service-common/log"
|
||||||
@@ -27,13 +28,13 @@ type Consumer struct {
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Consumer) Consume(h map[Topic]Handler) {
|
func (c *Consumer) Consume(t []Topic) {
|
||||||
for {
|
for {
|
||||||
topicArr := make([]string, 0)
|
topicArr := make([]string, 0)
|
||||||
handlerMap := make(map[string]Handler)
|
handlerMap := make(map[string]Topic)
|
||||||
for t, h2 := range h {
|
for _, v := range t {
|
||||||
topicArr = append(topicArr, t.Name())
|
topicArr = append(topicArr, v.Name())
|
||||||
handlerMap[t.Name()] = h2
|
handlerMap[v.Name()] = v
|
||||||
}
|
}
|
||||||
err := client.consumer.Consume(c.ctx, topicArr, &handler{
|
err := client.consumer.Consume(c.ctx, topicArr, &handler{
|
||||||
handler: handlerMap,
|
handler: handlerMap,
|
||||||
@@ -53,7 +54,7 @@ func (c *Consumer) Stop() error {
|
|||||||
type Handler func(context.Context, []byte) error
|
type Handler func(context.Context, []byte) error
|
||||||
|
|
||||||
type handler struct {
|
type handler struct {
|
||||||
handler map[string]Handler
|
handler map[string]Topic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error {
|
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error {
|
||||||
@@ -77,7 +78,13 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
|
|||||||
return utils.ErrorsWrap(errors.New("handler not found"))
|
return utils.ErrorsWrap(errors.New("handler not found"))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cb(ctx, message.Value); err != nil {
|
if err := json.Unmarshal(message.Value, cb); err != nil {
|
||||||
|
span.SetStatus(otelcodes.Error, "handler json.Unmarshal error")
|
||||||
|
span.End()
|
||||||
|
return utils.ErrorsWrap(err, "handler json.Unmarshal error")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cb.OnMessage(ctx); err != nil {
|
||||||
if stack, ok := err.(interface{ StackTrace() string }); ok {
|
if stack, ok := err.(interface{ StackTrace() string }); ok {
|
||||||
span.AddEvent("Stack Trace", trace.WithAttributes(
|
span.AddEvent("Stack Trace", trace.WithAttributes(
|
||||||
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
|
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
type Topic interface {
|
type Topic interface {
|
||||||
Name() string
|
Name() string
|
||||||
|
OnMessage(context.Context) error
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,6 +49,10 @@ func (c *Client) Set(ctx context.Context, key string, value interface{}, expirat
|
|||||||
return c.cli.Set(ctx, key, value, expiration)
|
return c.cli.Set(ctx, key, value, expiration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd {
|
||||||
|
return c.cli.SetNX(ctx, key, value, expiration)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) Get(ctx context.Context, key string) *redis.StringCmd {
|
func (c *Client) Get(ctx context.Context, key string) *redis.StringCmd {
|
||||||
return c.cli.Get(ctx, key)
|
return c.cli.Get(ctx, key)
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user