feat 排队
This commit is contained in:
@@ -6,60 +6,79 @@ import (
|
||||
"context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var gatewayServerM map[int64]map[GatewayFun]grpc.ClientStream // map[sid]map[方法名]流连接
|
||||
|
||||
type GatewayFun int
|
||||
|
||||
const (
|
||||
FunToClient GatewayFun = iota
|
||||
)
|
||||
|
||||
func init() {
|
||||
gatewayServerM = make(map[int64]map[GatewayFun]grpc.ClientStream)
|
||||
var gatewayServer sync.Map // map[string]*gatewayStream
|
||||
|
||||
type gatewayStream struct {
|
||||
mu sync.Mutex
|
||||
stream grpc.ClientStream
|
||||
}
|
||||
|
||||
func FindGatewayBySID(sid int64, fun GatewayFun) (grpc.ClientStream, error) {
|
||||
g := gatewayServerM[sid]
|
||||
if g == nil {
|
||||
g = make(map[GatewayFun]grpc.ClientStream)
|
||||
gatewayServerM[sid] = g
|
||||
func findGatewayBySID(sid int64, fun GatewayFun) (*gatewayStream, error) {
|
||||
key := gatewayKey(sid, fun)
|
||||
|
||||
if v, ok := gatewayServer.Load(key); ok {
|
||||
return v.(*gatewayStream), nil
|
||||
}
|
||||
gatewayLink := g[fun]
|
||||
if gatewayLink == nil {
|
||||
gatewayClient, err := service.GatewayNewClient(sid)
|
||||
if err != nil {
|
||||
log.Errorf("cannot find gatewayClient: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
var link grpc.ClientStream
|
||||
switch fun {
|
||||
case FunToClient:
|
||||
link, err = gatewayClient.ToClient(context.Background())
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("FindGatewayBySID %v err: %v, sid: %v", fun, err, sid)
|
||||
return nil, err
|
||||
}
|
||||
g[fun] = link
|
||||
gatewayLink = link
|
||||
|
||||
client, err := service.GatewayNewClient(sid)
|
||||
if err != nil {
|
||||
log.Errorf("findGatewayBySID cannot find client: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return gatewayLink, nil
|
||||
var stream grpc.ClientStream
|
||||
switch fun {
|
||||
case FunToClient:
|
||||
stream, err = client.ToClient(context.Background())
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("findGatewayBySID %v err: %v, sid: %v", fun, err, sid)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ss := &gatewayStream{stream: stream}
|
||||
if actual, loaded := gatewayServer.LoadOrStore(key, ss); loaded {
|
||||
go func() { _ = stream.CloseSend() }()
|
||||
return actual.(*gatewayStream), nil
|
||||
}
|
||||
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
func SendMessageToGateway(sid int64, fun GatewayFun, msg proto.Message, re ...bool) error {
|
||||
stream, err := FindGatewayBySID(sid, fun)
|
||||
ss, err := findGatewayBySID(sid, fun)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = stream.SendMsg(msg); err != nil {
|
||||
|
||||
ss.mu.Lock()
|
||||
err = ss.stream.SendMsg(msg)
|
||||
ss.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
key := gatewayKey(sid, fun)
|
||||
if v, ok := gatewayServer.Load(key); ok && v == ss {
|
||||
gatewayServer.Delete(key)
|
||||
_ = ss.stream.CloseSend()
|
||||
}
|
||||
// 如果没有标识本次是重试的,就重试一次(默认重试)
|
||||
if re == nil || !re[0] {
|
||||
_ = stream.CloseSend()
|
||||
delete(gatewayServerM[sid], fun)
|
||||
return SendMessageToGateway(sid, fun, msg, true)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func gatewayKey(sid int64, fun GatewayFun) string {
|
||||
return strconv.FormatInt(sid, 10) + "-" + strconv.Itoa(int(fun))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user