feat kafka
This commit is contained in:
62
db/kafka/client.go
Normal file
62
db/kafka/client.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"git.hlsq.asia/mmorpg/service-common/config"
|
||||
"github.com/IBM/sarama"
|
||||
"time"
|
||||
)
|
||||
|
||||
var client *Client
|
||||
|
||||
type Client struct {
|
||||
producer sarama.AsyncProducer
|
||||
consumer sarama.ConsumerGroup
|
||||
serverName string
|
||||
}
|
||||
|
||||
func Init(cfg *config.KafkaConfig, serverName string) error {
|
||||
producer, err := getAsyncProducer(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
consumer, err := getConsumer(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client = &Client{
|
||||
producer: producer,
|
||||
consumer: consumer,
|
||||
serverName: serverName,
|
||||
}
|
||||
go producerError()
|
||||
go consumerError()
|
||||
return nil
|
||||
}
|
||||
|
||||
func getAsyncProducer(cfg *config.KafkaConfig) (sarama.AsyncProducer, error) {
|
||||
conf := sarama.NewConfig()
|
||||
conf.Producer.RequiredAcks = sarama.WaitForAll
|
||||
conf.Producer.Return.Errors = true
|
||||
conf.Producer.Retry.Max = 5
|
||||
conf.Producer.Retry.Backoff = 100 * time.Millisecond
|
||||
return sarama.NewAsyncProducer(cfg.Brokers, conf)
|
||||
}
|
||||
|
||||
func getConsumer(cfg *config.KafkaConfig) (sarama.ConsumerGroup, error) {
|
||||
conf := sarama.NewConfig()
|
||||
conf.Consumer.Return.Errors = true
|
||||
conf.Consumer.Group.Session.Timeout = 10 * time.Second
|
||||
conf.Consumer.Offsets.AutoCommit.Enable = false
|
||||
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||||
return sarama.NewConsumerGroup(cfg.Brokers, cfg.GroupID, conf)
|
||||
}
|
||||
|
||||
func Close() error {
|
||||
if client != nil && client.producer != nil {
|
||||
_ = client.producer.Close()
|
||||
}
|
||||
if client != nil && client.consumer != nil {
|
||||
_ = client.consumer.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
93
db/kafka/consumer.go
Normal file
93
db/kafka/consumer.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.hlsq.asia/mmorpg/service-common/log"
|
||||
"git.hlsq.asia/mmorpg/service-common/utils"
|
||||
"github.com/IBM/sarama"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
otelcodes "go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewConsumer() *Consumer {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &Consumer{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
type Consumer struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (c *Consumer) Consume(topic string, h Handler) {
|
||||
for {
|
||||
err := client.consumer.Consume(c.ctx, []string{topic}, &handler{
|
||||
handler: h,
|
||||
})
|
||||
if errors.Is(err, context.Canceled) || errors.Is(c.ctx.Err(), context.Canceled) {
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) Stop() error {
|
||||
c.cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
type Handler func(context.Context, *sarama.ConsumerMessage) error
|
||||
|
||||
type handler struct {
|
||||
handler Handler
|
||||
serviceName string
|
||||
}
|
||||
|
||||
func (h *handler) Setup(_ sarama.ConsumerGroupSession) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *handler) Cleanup(sess sarama.ConsumerGroupSession) error {
|
||||
sess.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
for message := range claim.Messages() {
|
||||
carrier := propagation.MapCarrier{}
|
||||
for _, header := range message.Headers {
|
||||
carrier[string(header.Key)] = string(header.Value)
|
||||
}
|
||||
ctx := otel.GetTextMapPropagator().Extract(context.Background(), carrier)
|
||||
_, span := otel.Tracer(client.serverName).Start(ctx, "kafka.consume")
|
||||
if err := h.handler(ctx, message); err != nil {
|
||||
if stack, ok := err.(interface{ StackTrace() string }); ok {
|
||||
span.AddEvent("Stack Trace", trace.WithAttributes(
|
||||
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
|
||||
))
|
||||
}
|
||||
span.SetStatus(otelcodes.Error, err.Error())
|
||||
span.End()
|
||||
return utils.ErrorsWrap(err, "kafka handler error")
|
||||
}
|
||||
sess.MarkMessage(message, "")
|
||||
span.End()
|
||||
}
|
||||
sess.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
func consumerError() {
|
||||
for err := range client.consumer.Errors() {
|
||||
log.Errorf("kafka consumer error: %v", err)
|
||||
}
|
||||
}
|
||||
37
db/kafka/producer.go
Normal file
37
db/kafka/producer.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"git.hlsq.asia/mmorpg/service-common/log"
|
||||
"github.com/IBM/sarama"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
)
|
||||
|
||||
func NewProducer() *Producer {
|
||||
return &Producer{}
|
||||
}
|
||||
|
||||
type Producer struct {
|
||||
}
|
||||
|
||||
func (c *Producer) Produce(ctx context.Context, topic, value string) {
|
||||
msg := &sarama.ProducerMessage{
|
||||
Topic: topic,
|
||||
Value: sarama.StringEncoder(value),
|
||||
Headers: make([]sarama.RecordHeader, 0),
|
||||
}
|
||||
// 注入链路信息
|
||||
carrier := propagation.MapCarrier{}
|
||||
otel.GetTextMapPropagator().Inject(ctx, carrier)
|
||||
for k, v := range carrier {
|
||||
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)})
|
||||
}
|
||||
client.producer.Input() <- msg
|
||||
}
|
||||
|
||||
func producerError() {
|
||||
for err := range client.producer.Errors() {
|
||||
log.Errorf("kafka producer error: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user