Files
service-common/db/kafka/carrier.go

42 lines
1.1 KiB
Go

package kafka
import (
"context"
"github.com/IBM/sarama"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
type Carrier struct {
}
func NewCarrier() *Carrier {
return &Carrier{}
}
func (c *Carrier) Inject(ctx context.Context) []sarama.RecordHeader {
headers := make([]sarama.RecordHeader, 0)
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)
for k, v := range carrier {
headers = append(headers, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)})
}
return headers
}
func (c *Carrier) ExtractConsumer(headers []*sarama.RecordHeader) context.Context {
carrier := propagation.MapCarrier{}
for _, header := range headers {
carrier[string(header.Key)] = string(header.Value)
}
return otel.GetTextMapPropagator().Extract(context.Background(), carrier)
}
func (c *Carrier) ExtractProducer(headers []sarama.RecordHeader) context.Context {
carrier := propagation.MapCarrier{}
for _, header := range headers {
carrier[string(header.Key)] = string(header.Value)
}
return otel.GetTextMapPropagator().Extract(context.Background(), carrier)
}