package stream_client import ( "common/log" "common/net/grpc/service" "context" "google.golang.org/grpc" "google.golang.org/protobuf/proto" ) var gatewayServerM map[int64]map[GatewayFun]grpc.ClientStream // map[sid]map[方法名]流连接 type GatewayFun int const ( FunToClient GatewayFun = iota ) func init() { gatewayServerM = make(map[int64]map[GatewayFun]grpc.ClientStream) } func FindGatewayBySID(sid int64, fun GatewayFun) (grpc.ClientStream, error) { g := gatewayServerM[sid] if g == nil { g = make(map[GatewayFun]grpc.ClientStream) gatewayServerM[sid] = g } gatewayLink := g[fun] if gatewayLink == nil { gatewayClient, err := service.GatewayNewClient(sid) if err != nil { log.Errorf("cannot find gatewayClient: %v", err) return nil, err } var link grpc.ClientStream switch fun { case FunToClient: link, err = gatewayClient.ToClient(context.Background()) } if err != nil { log.Errorf("FindGatewayBySID %v err: %v, sid: %v", fun, err, sid) return nil, err } g[fun] = link gatewayLink = link } return gatewayLink, nil } func SendMessageToGateway(sid int64, fun GatewayFun, msg proto.Message, re ...bool) error { stream, err := FindGatewayBySID(sid, fun) if err != nil { return err } if err = stream.SendMsg(msg); err != nil { if re == nil || !re[0] { _ = stream.CloseSend() delete(gatewayServerM[sid], fun) return SendMessageToGateway(sid, fun, msg, true) } return err } return nil }