package stream_client import ( "context" "git.hlsq.asia/mmorpg/service-common/log" "git.hlsq.asia/mmorpg/service-common/net/grpc/service" "google.golang.org/grpc" "google.golang.org/protobuf/proto" "strconv" "sync" ) type SceneFun int const ( FunAction SceneFun = iota ) var sceneServer sync.Map // map[string]*sceneStream type sceneStream struct { mu sync.Mutex stream grpc.ClientStream } func findSceneBySID(sid int64, fun SceneFun) (*sceneStream, error) { key := sceneKey(sid, fun) if v, ok := sceneServer.Load(key); ok { return v.(*sceneStream), nil } client, err := service.SceneNewClient(sid) if err != nil { log.Errorf("findSceneBySID cannot find client: %v", err) return nil, err } var stream grpc.ClientStream switch fun { case FunAction: stream, err = client.Action(context.Background()) } if err != nil { log.Errorf("findSceneBySID %v err: %v, sid: %v", fun, err, sid) return nil, err } ss := &sceneStream{stream: stream} if actual, loaded := sceneServer.LoadOrStore(key, ss); loaded { go func() { _ = stream.CloseSend() }() return actual.(*sceneStream), nil } return ss, nil } func SendMessageToScene(sid int64, fun SceneFun, msg proto.Message, re ...bool) error { ss, err := findSceneBySID(sid, fun) if err != nil { return err } ss.mu.Lock() err = ss.stream.SendMsg(msg) ss.mu.Unlock() if err != nil { key := sceneKey(sid, fun) if v, ok := sceneServer.Load(key); ok && v == ss { sceneServer.Delete(key) _ = ss.stream.CloseSend() } // 如果没有标识本次是重试的,就重试一次(默认重试) if re == nil || !re[0] { return SendMessageToScene(sid, fun, msg, true) } return err } return nil } func sceneKey(sid int64, fun SceneFun) string { return strconv.FormatInt(sid, 10) + "-" + strconv.Itoa(int(fun)) }