package grpc_client import ( "context" "fmt" "git.hlsq.asia/mmorpg/service-common/log" "google.golang.org/grpc" "google.golang.org/protobuf/proto" "sync" ) type GatewayFun int const ( FunToClient GatewayFun = iota ) var gatewayServer sync.Map // map[string]*gatewayStream type gatewayStream struct { mu sync.Mutex stream grpc.ClientStream } func findGatewayBySID(sid int64, fun GatewayFun) (*gatewayStream, error) { key := gatewayKey(sid, fun) if v, ok := gatewayServer.Load(key); ok { return v.(*gatewayStream), nil } client, err := GatewayNewClient(sid) if err != nil { log.Errorf("findGatewayBySID cannot find client: %v", err) return nil, err } var stream grpc.ClientStream switch fun { case FunToClient: stream, err = client.ToClient(context.Background()) } if err != nil { log.Errorf("findGatewayBySID %v err: %v, sid: %v", fun, err, sid) return nil, err } ss := &gatewayStream{stream: stream} if actual, loaded := gatewayServer.LoadOrStore(key, ss); loaded { go func() { _ = stream.CloseSend() }() return actual.(*gatewayStream), nil } return ss, nil } func SendMessageToGateway(sid int64, fun GatewayFun, msg proto.Message, re ...bool) error { ss, err := findGatewayBySID(sid, fun) if err != nil { return err } ss.mu.Lock() err = ss.stream.SendMsg(msg) ss.mu.Unlock() if err != nil { key := gatewayKey(sid, fun) if v, ok := gatewayServer.Load(key); ok && v == ss { gatewayServer.Delete(key) _ = ss.stream.CloseSend() } // 如果没有标识本次是重试的,就重试一次(默认重试) if re == nil || !re[0] { return SendMessageToGateway(sid, fun, msg, true) } return err } return nil } func gatewayKey(sid int64, fun GatewayFun) string { return fmt.Sprintf("%v-%v", sid, fun) }