feat 链路追踪

This commit is contained in:
2026-01-16 22:01:35 +08:00
parent 7cb057fc91
commit e59d106700
11 changed files with 394 additions and 108 deletions

View File

@@ -2,6 +2,7 @@ package grpc_conn
import (
"git.hlsq.asia/mmorpg/service-common/log"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
@@ -20,6 +21,7 @@ func NewGrpcConnection(sid string, address string) (*GrpcConnection, error) {
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: 30 * time.Second, // 保活探测包发送的时间间隔

View File

@@ -2,6 +2,7 @@ package resolver
import (
"git.hlsq.asia/mmorpg/service-common/log"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
@@ -12,6 +13,7 @@ func NewGrpcConnection(target string) (*grpc.ClientConn, error) {
cc, err := grpc.NewClient(
target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin": {}}]}`),
grpc.WithKeepaliveParams(
keepalive.ClientParameters{

View File

@@ -0,0 +1,61 @@
package service
import (
"context"
"fmt"
"git.hlsq.asia/mmorpg/service-common/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"time"
)
func (s *Base) RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("service Panic: %v", r)
err = status.Error(codes.Internal, fmt.Sprintf("Internal server error: %v", r))
}
}()
return handler(ctx, req)
}
func (s *Base) LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
usn := ""
if md, ok := metadata.FromIncomingContext(ctx); ok {
usnArr := md.Get("X-Usn")
if len(usnArr) > 0 {
usn = usnArr[0]
}
}
tracer := otel.Tracer(s.ServiceName)
_, span := tracer.Start(ctx, info.FullMethod)
defer span.End()
reqString := fmt.Sprintf("[usn:%v] method: %s, Request: %v", usn, info.FullMethod, req)
span.SetAttributes(attribute.String("rpc.req", reqString))
log.Infof(reqString)
start := time.Now()
resp, err = handler(ctx, req)
respString := fmt.Sprintf("[usn:%v] method: %s, Duration: %v, Response: %v, Error: %v", usn, info.FullMethod, time.Since(start), resp, err)
span.SetAttributes(attribute.String("rpc.resp", respString))
log.Infof(respString)
if err != nil {
if stack, ok := err.(interface{ StackTrace() string }); ok {
span.AddEvent("Stack Trace", trace.WithAttributes(
attribute.String("stack.trace", fmt.Sprintf("%v", stack.StackTrace())),
))
}
span.SetStatus(otelcodes.Error, err.Error())
}
return
}

View File

@@ -1,15 +1,13 @@
package service
import (
"context"
"fmt"
"git.hlsq.asia/mmorpg/service-common/discover"
"git.hlsq.asia/mmorpg/service-common/log"
"git.hlsq.asia/mmorpg/service-common/utils"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"net"
"sync"
"time"
@@ -21,12 +19,13 @@ type IService interface {
}
type Base struct {
Target string
SID string
Serve *grpc.Server
EtcdTTL int64
OnInit func(serve *grpc.Server)
OnClose func()
Target string
ServiceName string
SID string
Serve *grpc.Server
EtcdTTL int64
OnInit func(serve *grpc.Server)
OnClose func()
wg *sync.WaitGroup
}
@@ -48,17 +47,11 @@ func (s *Base) Init(addr string, port int32) {
}
s.Serve = grpc.NewServer(
grpc.UnaryInterceptor(
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("service Panic: %v", r)
err = status.Error(codes.Internal, fmt.Sprintf("%v", r))
}
}()
return handler(ctx, req)
},
grpc.ChainUnaryInterceptor(
s.RecoveryInterceptor,
s.LoggingInterceptor,
),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 20 * time.Second,
PermitWithoutStream: true,