From b1e7d33940d7c6367cc5527bff9af5fd31ce24a3 Mon Sep 17 00:00:00 2001 From: "DESKTOP-V763RJ7\\Administrator" <835606593@qq.com> Date: Mon, 12 Jan 2026 16:22:58 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=E5=A2=9E=E5=8A=A0grpc=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- net/grpc/service/client_qgdzs.go | 24 +++++++++ net/grpc/stream_client/gateway.go | 84 +++++++++++++++++++++++++++++++ net/grpc/stream_client/scene.go | 84 +++++++++++++++++++++++++++++++ 3 files changed, 192 insertions(+) create mode 100644 net/grpc/service/client_qgdzs.go create mode 100644 net/grpc/stream_client/gateway.go create mode 100644 net/grpc/stream_client/scene.go diff --git a/net/grpc/service/client_qgdzs.go b/net/grpc/service/client_qgdzs.go new file mode 100644 index 0000000..c99ad78 --- /dev/null +++ b/net/grpc/service/client_qgdzs.go @@ -0,0 +1,24 @@ +package service + +import ( + "git.hlsq.asia/mmorpg/service-common/discover" + "git.hlsq.asia/mmorpg/service-common/discover/common" + "git.hlsq.asia/mmorpg/service-common/net/grpc/resolver" + "git.hlsq.asia/mmorpg/service-common/proto/rs/grpc_pb" +) + +func QgdzsNewClient(sid ...string) (grpc_pb.QgdzsClient, error) { + c, err := discover.FindServer(common.KeyDiscoverQgdzs, sid...) + if err != nil { + return nil, err + } + return grpc_pb.NewQgdzsClient(c), nil +} + +func QgdzsNewClientLB() (grpc_pb.QgdzsClient, error) { + c, err := resolver.GetGrpcClientConn("etcd:///" + common.KeyDiscoverServiceNameQgdzs) + if err != nil { + return nil, err + } + return grpc_pb.NewQgdzsClient(c), nil +} diff --git a/net/grpc/stream_client/gateway.go b/net/grpc/stream_client/gateway.go new file mode 100644 index 0000000..c43e22b --- /dev/null +++ b/net/grpc/stream_client/gateway.go @@ -0,0 +1,84 @@ +package stream_client + +import ( + "context" + "git.hlsq.asia/mmorpg/service-common/log" + "git.hlsq.asia/mmorpg/service-common/net/grpc/service" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "strconv" + "sync" +) + +type GatewayFun int + +const ( + FunToClient GatewayFun = iota +) + +var gatewayServer sync.Map // map[string]*gatewayStream + +type gatewayStream struct { + mu sync.Mutex + stream grpc.ClientStream +} + +func findGatewayBySID(sid string, fun GatewayFun) (*gatewayStream, error) { + key := gatewayKey(sid, fun) + + if v, ok := gatewayServer.Load(key); ok { + return v.(*gatewayStream), nil + } + + client, err := service.GatewayNewClient(sid) + if err != nil { + log.Errorf("findGatewayBySID cannot find client: %v", err) + return nil, err + } + var stream grpc.ClientStream + switch fun { + case FunToClient: + stream, err = client.ToClient(context.Background()) + } + if err != nil { + log.Errorf("findGatewayBySID %v err: %v, sid: %v", fun, err, sid) + return nil, err + } + + ss := &gatewayStream{stream: stream} + if actual, loaded := gatewayServer.LoadOrStore(key, ss); loaded { + go func() { _ = stream.CloseSend() }() + return actual.(*gatewayStream), nil + } + + return ss, nil +} + +func SendMessageToGateway(sid string, fun GatewayFun, msg proto.Message, re ...bool) error { + ss, err := findGatewayBySID(sid, fun) + if err != nil { + return err + } + + ss.mu.Lock() + err = ss.stream.SendMsg(msg) + ss.mu.Unlock() + + if err != nil { + key := gatewayKey(sid, fun) + if v, ok := gatewayServer.Load(key); ok && v == ss { + gatewayServer.Delete(key) + _ = ss.stream.CloseSend() + } + // 如果没有标识本次是重试的,就重试一次(默认重试) + if re == nil || !re[0] { + return SendMessageToGateway(sid, fun, msg, true) + } + return err + } + return nil +} + +func gatewayKey(sid string, fun GatewayFun) string { + return sid + "-" + strconv.Itoa(int(fun)) +} diff --git a/net/grpc/stream_client/scene.go b/net/grpc/stream_client/scene.go new file mode 100644 index 0000000..85ff472 --- /dev/null +++ b/net/grpc/stream_client/scene.go @@ -0,0 +1,84 @@ +package stream_client + +import ( + "context" + "git.hlsq.asia/mmorpg/service-common/log" + "git.hlsq.asia/mmorpg/service-common/net/grpc/service" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "strconv" + "sync" +) + +type SceneFun int + +const ( + FunAction SceneFun = iota +) + +var sceneServer sync.Map // map[string]*sceneStream + +type sceneStream struct { + mu sync.Mutex + stream grpc.ClientStream +} + +func findSceneBySID(sid string, fun SceneFun) (*sceneStream, error) { + key := sceneKey(sid, fun) + + if v, ok := sceneServer.Load(key); ok { + return v.(*sceneStream), nil + } + + client, err := service.SceneNewClient(sid) + if err != nil { + log.Errorf("findSceneBySID cannot find client: %v", err) + return nil, err + } + var stream grpc.ClientStream + switch fun { + case FunAction: + stream, err = client.Action(context.Background()) + } + if err != nil { + log.Errorf("findSceneBySID %v err: %v, sid: %v", fun, err, sid) + return nil, err + } + + ss := &sceneStream{stream: stream} + if actual, loaded := sceneServer.LoadOrStore(key, ss); loaded { + go func() { _ = stream.CloseSend() }() + return actual.(*sceneStream), nil + } + + return ss, nil +} + +func SendMessageToScene(sid string, fun SceneFun, msg proto.Message, re ...bool) error { + ss, err := findSceneBySID(sid, fun) + if err != nil { + return err + } + + ss.mu.Lock() + err = ss.stream.SendMsg(msg) + ss.mu.Unlock() + + if err != nil { + key := sceneKey(sid, fun) + if v, ok := sceneServer.Load(key); ok && v == ss { + sceneServer.Delete(key) + _ = ss.stream.CloseSend() + } + // 如果没有标识本次是重试的,就重试一次(默认重试) + if re == nil || !re[0] { + return SendMessageToScene(sid, fun, msg, true) + } + return err + } + return nil +} + +func sceneKey(sid string, fun SceneFun) string { + return sid + "-" + strconv.Itoa(int(fun)) +}