From 7c2c32a31ae24a91d249783431b82899897a7d3c Mon Sep 17 00:00:00 2001 From: "DESKTOP-V763RJ7\\Administrator" <835606593@qq.com> Date: Tue, 1 Jul 2025 00:08:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=9C=8D=E5=8A=A1=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Server/Gateway/app/app.go | 4 +- Server/Gateway/config/config.dev.yaml | 6 +- Server/Gateway/config/config.go | 11 +- .../grpc_server/{ => server}/server.go | 2 +- .../grpc_server/{ => server}/server_init.go | 2 +- .../grpc_server/stream_client/scene.go | 44 +++ Server/Gateway/handler/ws_handler/client.go | 27 +- Server/Gateway/handler/ws_handler/handler.go | 71 ++++ Server/Gateway/handler/ws_handler/manager.go | 12 +- Server/Gateway/handler/ws_handler/temp.go | 17 +- Server/Gateway/net/ws_gateway/server.go | 4 +- Server/common/discover/common/define.go | 27 +- Server/common/discover/instance.go | 82 +++++ Server/common/discover/listener.go | 39 +- Server/common/discover/server.go | 6 +- Server/common/net/grpc/grpc_conn/conn.go | 6 +- Server/common/net/grpc/grpc_conn/conn_mgr.go | 16 +- .../common/net/grpc/service/client_gateway.go | 6 +- .../common/net/grpc/service/client_scene.go | 24 ++ Server/common/proto/cs/action.proto | 16 + Server/common/proto/cs/define.proto | 10 +- Server/common/proto/gen/grpc_pb/gateway.pb.go | 103 +++++- .../proto/gen/grpc_pb/gateway_grpc.pb.go | 86 +++-- Server/common/proto/gen/grpc_pb/scene.pb.go | 335 ++++++++++++++++++ .../common/proto/gen/grpc_pb/scene_grpc.pb.go | 177 +++++++++ Server/common/proto/grpc/gateway.proto | 7 +- Server/common/proto/grpc/scene.proto | 14 +- Server/common/utils/number.go | 10 + Server/scene/app/app.go | 18 +- Server/scene/config/config.dev.yaml | 8 +- Server/scene/config/config.go | 21 +- Server/scene/grpc_server/server.go | 39 +- Server/scene/grpc_server/server_init.go | 6 +- Server/scene/instance/define.go | 9 + Server/scene/instance/instance.go | 97 +++++ Server/scene/instance/manager.go | 42 +++ Server/scene/instance/player.go | 63 ++++ 37 files changed, 1307 insertions(+), 160 deletions(-) rename Server/Gateway/grpc_server/{ => server}/server.go (91%) rename Server/Gateway/grpc_server/{ => server}/server_init.go (96%) create mode 100644 Server/Gateway/grpc_server/stream_client/scene.go create mode 100644 Server/Gateway/handler/ws_handler/handler.go create mode 100644 Server/common/discover/instance.go create mode 100644 Server/common/net/grpc/service/client_scene.go create mode 100644 Server/common/proto/cs/action.proto create mode 100644 Server/common/proto/gen/grpc_pb/scene.pb.go create mode 100644 Server/common/proto/gen/grpc_pb/scene_grpc.pb.go create mode 100644 Server/scene/instance/define.go create mode 100644 Server/scene/instance/instance.go create mode 100644 Server/scene/instance/manager.go create mode 100644 Server/scene/instance/player.go diff --git a/Server/Gateway/app/app.go b/Server/Gateway/app/app.go index cef5995..ce92144 100644 --- a/Server/Gateway/app/app.go +++ b/Server/Gateway/app/app.go @@ -8,7 +8,7 @@ import ( "common/net/socket/websocket" "fmt" "gateway/config" - "gateway/grpc_server" + "gateway/grpc_server/server" "github.com/gin-gonic/gin" "github.com/judwhite/go-svc" "runtime/debug" @@ -51,7 +51,7 @@ func (p *Program) Start() error { }() discover.Listen() - p.server = grpc_server.NewServer(config.Get().Serve.Grpc.TTL) + p.server = server.NewServer(config.Get().Serve.Grpc.TTL) p.server.Init(config.Get().Serve.Grpc.Address, config.Get().Serve.Grpc.Port) go func() { cfg := config.Get() diff --git a/Server/Gateway/config/config.dev.yaml b/Server/Gateway/config/config.dev.yaml index b97a6cf..c5f7c04 100644 --- a/Server/Gateway/config/config.dev.yaml +++ b/Server/Gateway/config/config.dev.yaml @@ -4,9 +4,9 @@ app: log: debug: true level: "debug" - max_size: 100 - max_backups: 3 - max_age: 7 + maxSize: 100 + maxBackups: 3 + maxAge: 7 db: etcd: diff --git a/Server/Gateway/config/config.go b/Server/Gateway/config/config.go index 94a8c32..8713f0e 100644 --- a/Server/Gateway/config/config.go +++ b/Server/Gateway/config/config.go @@ -13,9 +13,9 @@ type AppConfig struct { type LogConfig struct { Debug bool `yaml:"debug"` - MaxSize int `yaml:"max_size"` - MaxBackups int `yaml:"max_backups"` - MaxAge int `yaml:"max_age"` + MaxSize int `yaml:"maxSize"` + MaxBackups int `yaml:"maxBackups"` + MaxAge int `yaml:"maxAge"` Level string `yaml:"level"` } @@ -27,8 +27,9 @@ type DBConfig struct { type ServeConfig struct { Grpc *struct { - AddressConfig - TTL int64 `yaml:"ttl"` + Address string `yaml:"address"` + Port int `yaml:"port"` + TTL int64 `yaml:"ttl"` } `yaml:"grpc"` Socket *struct { Web *AddressConfig `yaml:"web"` diff --git a/Server/Gateway/grpc_server/server.go b/Server/Gateway/grpc_server/server/server.go similarity index 91% rename from Server/Gateway/grpc_server/server.go rename to Server/Gateway/grpc_server/server/server.go index a3ca5b6..5142932 100644 --- a/Server/Gateway/grpc_server/server.go +++ b/Server/Gateway/grpc_server/server/server.go @@ -1,4 +1,4 @@ -package grpc_server +package server import ( "common/log" diff --git a/Server/Gateway/grpc_server/server_init.go b/Server/Gateway/grpc_server/server/server_init.go similarity index 96% rename from Server/Gateway/grpc_server/server_init.go rename to Server/Gateway/grpc_server/server/server_init.go index 8ca2161..aa0ff33 100644 --- a/Server/Gateway/grpc_server/server_init.go +++ b/Server/Gateway/grpc_server/server/server_init.go @@ -1,4 +1,4 @@ -package grpc_server +package server import ( "common/discover/common" diff --git a/Server/Gateway/grpc_server/stream_client/scene.go b/Server/Gateway/grpc_server/stream_client/scene.go new file mode 100644 index 0000000..28e3242 --- /dev/null +++ b/Server/Gateway/grpc_server/stream_client/scene.go @@ -0,0 +1,44 @@ +package stream_client + +import ( + "common/log" + "common/net/grpc/service" + "context" + "google.golang.org/grpc" +) + +var sceneServerM map[int64]map[SceneFun]grpc.ClientStream // map[sid]map[方法名]流连接 + +type SceneFun int + +const ( + FunAction SceneFun = iota +) + +func FindSceneBySID(sid int64, fun SceneFun) grpc.ClientStream { + g := sceneServerM[sid] + if g == nil { + g = make(map[SceneFun]grpc.ClientStream) + sceneServerM[sid] = g + } + sceneLink := g[fun] + if sceneLink == nil { + sceneClient, err := service.SceneNewClient(sid) + if err != nil { + log.Errorf("cannot find sceneClient: %v", err) + return nil + } + var link grpc.ClientStream + switch fun { + case FunAction: + link, err = sceneClient.Action(context.Background()) + } + if err != nil { + log.Errorf("FindSceneBySID %v err: %v, sid: %v", fun, err, sid) + return nil + } + g[fun] = link + sceneLink = link + } + return sceneLink +} diff --git a/Server/Gateway/handler/ws_handler/client.go b/Server/Gateway/handler/ws_handler/client.go index a47db79..4e43ccd 100644 --- a/Server/Gateway/handler/ws_handler/client.go +++ b/Server/Gateway/handler/ws_handler/client.go @@ -3,7 +3,6 @@ package ws_handler import ( "common/log" "common/net/socket" - "common/utils" "context" "fmt" "go.uber.org/zap" @@ -21,10 +20,11 @@ type Client struct { cancel context.CancelFunc // 取消上下文 heartBeat time.Time // 最后一次心跳 - UID int32 + UID int + SceneSID int64 // 场景服ID } -func NewClient(uid int32, conn socket.ISocketConn) *Client { +func NewClient(uid int, conn socket.ISocketConn) *Client { client := &Client{ UID: uid, conn: conn, @@ -78,27 +78,6 @@ func (c *Client) OnEvent(event Event) { } } -func (c *Client) handle(event Event) { - switch e := event.(type) { - case *ClientEvent: - m, err := parseMsg(e.Msg) - if err != nil { - c.logger.Errorf("handle event json.Unmarshal err: %v", err) - c.cancel() - } - c.logger.Infof("收到客户端消息:%+v", *m) - switch m.Type { - case "init": - _ = c.conn.Write(wapMsg(&msg{ - Type: "init", - Data: fmt.Sprintf("[%v,%v]", utils.RandInt(1, 100), utils.RandInt(1, 100)), - })) - } - case *PongEvent: - c.heartBeat = time.Now() - } -} - // CloseClient 关闭客户端(同步,会等待onClose执行完成) func (c *Client) CloseClient() { if c.cancel != nil { diff --git a/Server/Gateway/handler/ws_handler/handler.go b/Server/Gateway/handler/ws_handler/handler.go new file mode 100644 index 0000000..b691eb9 --- /dev/null +++ b/Server/Gateway/handler/ws_handler/handler.go @@ -0,0 +1,71 @@ +package ws_handler + +import ( + "common/net/grpc/service" + "common/proto/gen/grpc_pb" + "encoding/json" + "gateway/grpc_server/stream_client" + "time" +) + +func (c *Client) handle(event Event) { + switch e := event.(type) { + case *ClientEvent: + m, err := parseMsg(e.Msg) + if err != nil { + c.logger.Errorf("handle event json.Unmarshal err: %v", err) + c.cancel() + } + c.logger.Infof("收到客户端消息:%+v", *m) + switch m.Type { + case "enter": + c.onEnter(m) + case "action": + c.onAction(m) + } + case *PongEvent: + c.heartBeat = time.Now() + } +} + +func (c *Client) onEnter(msg *tempMsg) { + //_ = c.conn.Write(wapMsg(&tempMsg{ + // Type: "init", + // Data: fmt.Sprintf("[%v,%v]", utils.RandInt(1, 100), utils.RandInt(1, 100)), + //})) + client, err := service.SceneNewClient() + if err != nil { + c.logger.Errorf("SceneNewClient err: %v", err) + return + } + resp, err := client.Enter(c.ctx, &grpc_pb.EnterReq{ + UID: int32(c.UID), + SID: 0, + InstanceID: 1, + }) + if err != nil { + c.logger.Errorf("enter err: %v", err) + return + } + c.SceneSID = resp.SID +} + +func (c *Client) onAction(msg *tempMsg) { + if c.SceneSID == 0 { + return + } + d := &tempAction{} + if err := json.Unmarshal([]byte(msg.Data), d); err != nil { + return + } + m := &tempActionMove{} + if err := json.Unmarshal([]byte(d.Data), m); err != nil { + return + } + stream := stream_client.FindSceneBySID(c.SceneSID, stream_client.FunAction) + if err := stream.SendMsg(&grpc_pb.ActionReq{ + Action: int32(m.Move), + }); err != nil { + c.logger.Errorf("send action err: %v", err) + } +} diff --git a/Server/Gateway/handler/ws_handler/manager.go b/Server/Gateway/handler/ws_handler/manager.go index e186d2d..96b79d4 100644 --- a/Server/Gateway/handler/ws_handler/manager.go +++ b/Server/Gateway/handler/ws_handler/manager.go @@ -7,35 +7,35 @@ import ( var UserMgr *userManager type userManager struct { - userMap map[int32]*Client + userMap map[int]*Client sync.RWMutex } func init() { UserMgr = &userManager{ - userMap: make(map[int32]*Client), + userMap: make(map[int]*Client), } } -func (m *userManager) Add(uid int32, client *Client) { +func (m *userManager) Add(uid int, client *Client) { m.Lock() defer m.Unlock() m.userMap[uid] = client } -func (m *userManager) Delete(uid int32) { +func (m *userManager) Delete(uid int) { m.Lock() defer m.Unlock() delete(m.userMap, uid) } -func (m *userManager) GetAll() map[int32]*Client { +func (m *userManager) GetAll() map[int]*Client { m.RLock() defer m.RUnlock() return m.userMap } -func (m *userManager) GetByUID(uid int32) *Client { +func (m *userManager) GetByUID(uid int) *Client { m.RLock() defer m.RUnlock() return m.userMap[uid] diff --git a/Server/Gateway/handler/ws_handler/temp.go b/Server/Gateway/handler/ws_handler/temp.go index 7a2cdda..159220f 100644 --- a/Server/Gateway/handler/ws_handler/temp.go +++ b/Server/Gateway/handler/ws_handler/temp.go @@ -2,20 +2,29 @@ package ws_handler import "encoding/json" -type msg struct { +type tempMsg struct { Type string `json:"type"` Data string `json:"data"` } -func parseMsg(data []byte) (*msg, error) { - m := &msg{} +type tempAction struct { + Action int `json:"action"` + Data string `json:"data"` +} + +type tempActionMove struct { + Move int `json:"move"` +} + +func parseMsg(data []byte) (*tempMsg, error) { + m := &tempMsg{} if err := json.Unmarshal(data, m); err != nil { return nil, err } return m, nil } -func wapMsg(m *msg) []byte { +func wapMsg(m *tempMsg) []byte { data, _ := json.Marshal(m) return data } diff --git a/Server/Gateway/net/ws_gateway/server.go b/Server/Gateway/net/ws_gateway/server.go index f3b475b..6d1de8b 100644 --- a/Server/Gateway/net/ws_gateway/server.go +++ b/Server/Gateway/net/ws_gateway/server.go @@ -30,8 +30,8 @@ func (g *GatewayWsServer) OnHandShake(conn socket.ISocketConn) { if err != nil { _ = conn.Close() } - client := ws_handler.NewClient(int32(t), conn) - ws_handler.UserMgr.Add(int32(t), client) + client := ws_handler.NewClient(t, conn) + ws_handler.UserMgr.Add(t, client) conn.SetParam("client", client) } diff --git a/Server/common/discover/common/define.go b/Server/common/discover/common/define.go index 60675f2..54732e7 100644 --- a/Server/common/discover/common/define.go +++ b/Server/common/discover/common/define.go @@ -1,25 +1,36 @@ package common -type ListenerType int32 +type ListenerType int const ( - ListenerTypeNewServer = 1 // 服务启动 - ListenerTypeCloseServer = 2 // 服务关闭 + ListenerTypeNewServer = 1 // 服务启动 + ListenerTypeCloseServer = 2 // 服务关闭 + ListenerTypeNewInstance = 3 // 副本启动 + ListenerTypeCloseInstance = 4 // 副本关闭 ) var ( - KeyDiscover = "xh-discover" - KeyDiscoverService = KeyDiscover + "/service" + KeyDiscover = "xh-discover" + KeyDiscoverService = KeyDiscover + "/service" + KeyDiscoverInstance = KeyDiscover + "/instance" ) var ( - KeyDiscoverGateway = KeyDiscoverService + "/gateway" - KeyDiscoverDatabase = KeyDiscoverService + "/database" + KeyDiscoverGateway = KeyDiscoverService + "/gateway" // 网关服 + KeyDiscoverDatabase = KeyDiscoverService + "/database" // 数据服 + KeyDiscoverScene = KeyDiscoverService + "/scene" // 场景服 ) // ServiceProvider 服务提供者 type ServiceProvider struct { Target string - SID string + SID int64 Addr string } + +// InstanceProvider 副本提供者 +type InstanceProvider struct { + InstanceID int // 副本ID + UniqueNo int64 // 副本唯一编号 + SID string +} diff --git a/Server/common/discover/instance.go b/Server/common/discover/instance.go new file mode 100644 index 0000000..8586a6f --- /dev/null +++ b/Server/common/discover/instance.go @@ -0,0 +1,82 @@ +package discover + +import ( + "common/db/etcd" + "common/discover/common" + "common/log" + "context" + "fmt" + clientv3 "go.etcd.io/etcd/client/v3" + "strconv" + "sync" +) + +// 大量读少量写的情况下,读写锁比同步Map更高效 +var ( + instanceMU = sync.RWMutex{} + instanceM = make(map[int64]string) // [uniqueNo]sid + instanceLeaseM = make(map[int64]clientv3.LeaseID) // [uniqueNo] +) + +func init() { + RegisterListener(common.ListenerTypeNewInstance, onInstanceStart) + RegisterListener(common.ListenerTypeCloseInstance, onInstanceStop) +} + +// FindInstanceByUniqueNo 根据唯一标识查询副本 +func FindInstanceByUniqueNo(uniqueNO int64) (sid string) { + instanceMU.RLock() + defer instanceMU.RUnlock() + if c, ok := instanceM[uniqueNO]; ok { + return c + } + return +} + +// RegisterInstance 注册副本 +func RegisterInstance(sid int64, instanceID int, uniqueNo, ttl int64) error { + serverMU.Lock() + defer serverMU.Unlock() + leaseID, err := common.NewLeaseAndKeepAlive(ttl) + if err != nil { + return err + } + key := fmt.Sprintf("%v/%v/%v", common.KeyDiscoverInstance, instanceID, uniqueNo) + _, err = etcd.Client().Put(context.Background(), key, strconv.Itoa(int(sid)), clientv3.WithLease(leaseID)) + if err != nil { + return err + } + instanceLeaseM[uniqueNo] = leaseID + return nil +} + +// UnRegisterInstance 解注册副本 +func UnRegisterInstance(uniqueNo int64) { + serverMU.Lock() + defer serverMU.Unlock() + if leaseID, ok := instanceLeaseM[uniqueNo]; ok { + _, err := etcd.Client().Revoke(context.Background(), leaseID) + if err != nil { + log.Errorf("UnRegisterInstance err: %v", err) + } + delete(instanceLeaseM, uniqueNo) + } +} + +// 某个副本启动了 +func onInstanceStart(data any) { + if provider, ok := data.(*common.InstanceProvider); ok { + instanceMU.Lock() + defer instanceMU.Unlock() + instanceM[provider.UniqueNo] = provider.SID + } +} + +// 某个副本关闭了 +func onInstanceStop(data any) { + if provider, ok := data.(*common.InstanceProvider); ok { + instanceMU.Lock() + defer instanceMU.Unlock() + delete(instanceM, provider.UniqueNo) + } +} diff --git a/Server/common/discover/listener.go b/Server/common/discover/listener.go index a620698..51152f2 100644 --- a/Server/common/discover/listener.go +++ b/Server/common/discover/listener.go @@ -3,9 +3,11 @@ package discover import ( "common/db/etcd" "common/discover/common" + "common/utils" "context" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "strconv" "strings" "sync" ) @@ -50,12 +52,22 @@ func Listen() { onServerChange(clientv3.EventTypePut, string(kv.Key), string(kv.Value)) } chService := etcd.Client().Watch(stopCtx, common.KeyDiscoverService, clientv3.WithPrefix(), clientv3.WithRev(serviceAll.Header.Revision+1)) + // 副本 + instanceAll, _ := etcd.Client().Get(stopCtx, common.KeyDiscoverInstance, clientv3.WithPrefix()) + for _, kv := range instanceAll.Kvs { + onInstanceChange(clientv3.EventTypePut, string(kv.Key), string(kv.Value), nil) + } + chInstance := etcd.Client().Watch(stopCtx, common.KeyDiscoverScene, clientv3.WithPrefix(), clientv3.WithRev(instanceAll.Header.Revision+1), clientv3.WithPrevKV()) for { select { case msg := <-chService: for _, event := range msg.Events { onServerChange(event.Type, string(event.Kv.Key), string(event.Kv.Value)) } + case msg := <-chInstance: + for _, event := range msg.Events { + onInstanceChange(event.Type, string(event.Kv.Key), string(event.Kv.Value), event.PrevKv) + } case <-stopCtx.Done(): return } @@ -73,13 +85,36 @@ func onServerChange(t mvccpb.Event_EventType, key, value string) { case clientv3.EventTypePut: onCBByType(common.ListenerTypeNewServer, &common.ServiceProvider{ Target: common.KeyDiscoverService + "/" + split[2], - SID: split[3], + SID: utils.StringToInt64(split[3]), Addr: value, }) case clientv3.EventTypeDelete: onCBByType(common.ListenerTypeCloseServer, &common.ServiceProvider{ Target: common.KeyDiscoverService + "/" + split[2], - SID: split[3], + SID: utils.StringToInt64(split[3]), + }) + } +} + +// 副本发生变化 +func onInstanceChange(t mvccpb.Event_EventType, key, value string, preKv *mvccpb.KeyValue) { + split := strings.Split(key, "/") + if len(split) != 4 { + return + } + instanceID, _ := strconv.Atoi(split[2]) + switch t { + case clientv3.EventTypePut: + onCBByType(common.ListenerTypeNewInstance, &common.InstanceProvider{ + InstanceID: instanceID, + UniqueNo: utils.StringToInt64(split[3]), + SID: value, + }) + case clientv3.EventTypeDelete: + onCBByType(common.ListenerTypeCloseInstance, &common.InstanceProvider{ + InstanceID: instanceID, + UniqueNo: utils.StringToInt64(split[3]), + SID: string(preKv.Value), }) } } diff --git a/Server/common/discover/server.go b/Server/common/discover/server.go index 8fc8848..e8b2d37 100644 --- a/Server/common/discover/server.go +++ b/Server/common/discover/server.go @@ -25,7 +25,7 @@ func init() { } // FindServer 根据SID或随机查找服务 -func FindServer(target string, sid ...string) (*grpc.ClientConn, error) { +func FindServer(target string, sid ...int64) (*grpc.ClientConn, error) { serverMU.RLock() defer serverMU.RUnlock() if v, ok := conn[target]; ok { @@ -34,13 +34,13 @@ func FindServer(target string, sid ...string) (*grpc.ClientConn, error) { return nil, fmt.Errorf("cannot find server") } -func FindServerAll(target string) map[string]*grpc.ClientConn { +func FindServerAll(target string) map[int64]*grpc.ClientConn { serverMU.RLock() defer serverMU.RUnlock() if v, ok := conn[target]; ok { return v.LoadAll() } - return make(map[string]*grpc.ClientConn) + return make(map[int64]*grpc.ClientConn) } // RegisterGrpcServer 注册服务提供者 diff --git a/Server/common/net/grpc/grpc_conn/conn.go b/Server/common/net/grpc/grpc_conn/conn.go index b8468d1..7f0c701 100644 --- a/Server/common/net/grpc/grpc_conn/conn.go +++ b/Server/common/net/grpc/grpc_conn/conn.go @@ -8,15 +8,15 @@ import ( ) type GrpcConnection struct { - sid string + sid int64 conn *grpc.ClientConn } -func NewGrpcConnection(sid, address string) (*GrpcConnection, error) { +func NewGrpcConnection(sid int64, address string) (*GrpcConnection, error) { p := &GrpcConnection{ sid: sid, } - conn, err := grpc.Dial( + conn, err := grpc.NewClient( address, grpc.WithInsecure(), grpc.WithKeepaliveParams( diff --git a/Server/common/net/grpc/grpc_conn/conn_mgr.go b/Server/common/net/grpc/grpc_conn/conn_mgr.go index 745fbd0..2a154e4 100644 --- a/Server/common/net/grpc/grpc_conn/conn_mgr.go +++ b/Server/common/net/grpc/grpc_conn/conn_mgr.go @@ -8,18 +8,18 @@ import ( ) type GrpcConnectionMgr struct { - poolM map[string]*GrpcConnection + poolM map[int64]*GrpcConnection poolS []*GrpcConnection } func NewGrpcConnectionMgr() *GrpcConnectionMgr { return &GrpcConnectionMgr{ - poolM: make(map[string]*GrpcConnection), + poolM: make(map[int64]*GrpcConnection), poolS: make([]*GrpcConnection, 0), } } -func (p *GrpcConnectionMgr) Store(sid, addr string) { +func (p *GrpcConnectionMgr) Store(sid int64, addr string) { pool, err := NewGrpcConnection(sid, addr) if err != nil { log.Errorf("create grpc err: %v, sid: %v, addr: %v", err, sid, addr) @@ -29,7 +29,7 @@ func (p *GrpcConnectionMgr) Store(sid, addr string) { p.poolS = append(p.poolS, pool) } -func (p *GrpcConnectionMgr) Delete(sid string) int { +func (p *GrpcConnectionMgr) Delete(sid int64) int { delete(p.poolM, sid) for i, pool := range p.poolS { if pool.sid == sid { @@ -40,9 +40,9 @@ func (p *GrpcConnectionMgr) Delete(sid string) int { return len(p.poolS) } -func (p *GrpcConnectionMgr) Load(sid ...string) (*grpc.ClientConn, error) { +func (p *GrpcConnectionMgr) Load(sid ...int64) (*grpc.ClientConn, error) { var pool *GrpcConnection - if len(sid) > 0 && len(sid[0]) > 0 { + if len(sid) > 0 && sid[0] > 0 { pool = p.poolM[sid[0]] } else { pool = p.poolS[rand.Intn(len(p.poolS))] @@ -53,8 +53,8 @@ func (p *GrpcConnectionMgr) Load(sid ...string) (*grpc.ClientConn, error) { return pool.GetConnection(), nil } -func (p *GrpcConnectionMgr) LoadAll() map[string]*grpc.ClientConn { - sidM := make(map[string]*grpc.ClientConn) +func (p *GrpcConnectionMgr) LoadAll() map[int64]*grpc.ClientConn { + sidM := make(map[int64]*grpc.ClientConn) for sid, pool := range p.poolM { sidM[sid] = pool.GetConnection() } diff --git a/Server/common/net/grpc/service/client_gateway.go b/Server/common/net/grpc/service/client_gateway.go index ab7a8be..51073ba 100644 --- a/Server/common/net/grpc/service/client_gateway.go +++ b/Server/common/net/grpc/service/client_gateway.go @@ -6,7 +6,7 @@ import ( "common/proto/gen/grpc_pb" ) -func GatewayNewClient(sid ...string) (grpc_pb.GatewayClient, error) { +func GatewayNewClient(sid ...int64) (grpc_pb.GatewayClient, error) { c, err := discover.FindServer(common.KeyDiscoverGateway, sid...) if err != nil { return nil, err @@ -14,8 +14,8 @@ func GatewayNewClient(sid ...string) (grpc_pb.GatewayClient, error) { return grpc_pb.NewGatewayClient(c), nil } -func GatewayNewBroadcastClient() map[string]grpc_pb.GatewayClient { - clientM := make(map[string]grpc_pb.GatewayClient) +func GatewayNewBroadcastClient() map[int64]grpc_pb.GatewayClient { + clientM := make(map[int64]grpc_pb.GatewayClient) connM := discover.FindServerAll(common.KeyDiscoverGateway) for sid, conn := range connM { clientM[sid] = grpc_pb.NewGatewayClient(conn) diff --git a/Server/common/net/grpc/service/client_scene.go b/Server/common/net/grpc/service/client_scene.go new file mode 100644 index 0000000..97ed224 --- /dev/null +++ b/Server/common/net/grpc/service/client_scene.go @@ -0,0 +1,24 @@ +package service + +import ( + "common/discover" + "common/discover/common" + "common/proto/gen/grpc_pb" +) + +func SceneNewClient(sid ...int64) (grpc_pb.SceneClient, error) { + c, err := discover.FindServer(common.KeyDiscoverScene, sid...) + if err != nil { + return nil, err + } + return grpc_pb.NewSceneClient(c), nil +} + +func SceneNewBroadcastClient() map[int64]grpc_pb.SceneClient { + clientM := make(map[int64]grpc_pb.SceneClient) + connM := discover.FindServerAll(common.KeyDiscoverScene) + for sid, conn := range connM { + clientM[sid] = grpc_pb.NewSceneClient(conn) + } + return clientM +} diff --git a/Server/common/proto/cs/action.proto b/Server/common/proto/cs/action.proto new file mode 100644 index 0000000..33cf7f7 --- /dev/null +++ b/Server/common/proto/cs/action.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +option go_package = "common/proto/gen/cs"; +import "common.proto"; + +enum ActionID { + ACTION_ID_INVALID = 0; + ACTION_ID_MOVE = 1; // 1-15都是移动指令 + ACTION_ID_ATTACK = 16; // 攻击指令 +} + +// 指令 +message C2S_Action { + ActionID Action = 1; // 指令ID + bytes Payload = 2; // 指令数据 +} diff --git a/Server/common/proto/cs/define.proto b/Server/common/proto/cs/define.proto index a76efc0..aec73a6 100644 --- a/Server/common/proto/cs/define.proto +++ b/Server/common/proto/cs/define.proto @@ -5,15 +5,7 @@ import "common.proto"; enum MessageID { MESSAGE_ID_INVALID = 0; - - // 移动指令 - MESSAGE_TYPE_PLAYER_MOVE = 3; - - // 聊天消息 - MESSAGE_TYPE_CHAT_MESSAGE = 4; - - // 心跳包 - MESSAGE_TYPE_HEARTBEAT = 5; + MESSAGE_ID_C2S_ACTION = 1; // 指令 } message Message { diff --git a/Server/common/proto/gen/grpc_pb/gateway.pb.go b/Server/common/proto/gen/grpc_pb/gateway.pb.go index 7df674c..aac8eea 100644 --- a/Server/common/proto/gen/grpc_pb/gateway.pb.go +++ b/Server/common/proto/gen/grpc_pb/gateway.pb.go @@ -11,6 +11,7 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" + sync "sync" ) const ( @@ -20,25 +21,98 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type ToClientReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UID int32 `protobuf:"varint,1,opt,name=UID,proto3" json:"UID,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=Payload,proto3" json:"Payload,omitempty"` +} + +func (x *ToClientReq) Reset() { + *x = ToClientReq{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_gateway_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ToClientReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ToClientReq) ProtoMessage() {} + +func (x *ToClientReq) ProtoReflect() protoreflect.Message { + mi := &file_grpc_gateway_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ToClientReq.ProtoReflect.Descriptor instead. +func (*ToClientReq) Descriptor() ([]byte, []int) { + return file_grpc_gateway_proto_rawDescGZIP(), []int{0} +} + +func (x *ToClientReq) GetUID() int32 { + if x != nil { + return x.UID + } + return 0 +} + +func (x *ToClientReq) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + var File_grpc_gateway_proto protoreflect.FileDescriptor var file_grpc_gateway_proto_rawDesc = []byte{ 0x0a, 0x12, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x67, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0c, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x32, 0x2a, 0x0a, 0x07, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x12, 0x1f, 0x0a, - 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x06, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x06, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x1a, + 0x74, 0x6f, 0x22, 0x39, 0x0a, 0x0b, 0x54, 0x6f, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, + 0x71, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, + 0x55, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x32, 0x2f, 0x0a, + 0x07, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x12, 0x24, 0x0a, 0x08, 0x54, 0x6f, 0x43, 0x6c, + 0x69, 0x65, 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x54, 0x6f, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, + 0x65, 0x71, 0x1a, 0x06, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x28, 0x01, 0x42, 0x1a, 0x5a, 0x18, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } +var ( + file_grpc_gateway_proto_rawDescOnce sync.Once + file_grpc_gateway_proto_rawDescData = file_grpc_gateway_proto_rawDesc +) + +func file_grpc_gateway_proto_rawDescGZIP() []byte { + file_grpc_gateway_proto_rawDescOnce.Do(func() { + file_grpc_gateway_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_gateway_proto_rawDescData) + }) + return file_grpc_gateway_proto_rawDescData +} + +var file_grpc_gateway_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_grpc_gateway_proto_goTypes = []interface{}{ - (*common.Empty)(nil), // 0: Empty + (*ToClientReq)(nil), // 0: ToClientReq + (*common.Empty)(nil), // 1: Empty } var file_grpc_gateway_proto_depIdxs = []int32{ - 0, // 0: Gateway.SendMessage:input_type -> Empty - 0, // 1: Gateway.SendMessage:output_type -> Empty + 0, // 0: Gateway.ToClient:input_type -> ToClientReq + 1, // 1: Gateway.ToClient:output_type -> Empty 1, // [1:2] is the sub-list for method output_type 0, // [0:1] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -51,18 +125,33 @@ func file_grpc_gateway_proto_init() { if File_grpc_gateway_proto != nil { return } + if !protoimpl.UnsafeEnabled { + file_grpc_gateway_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ToClientReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_grpc_gateway_proto_rawDesc, NumEnums: 0, - NumMessages: 0, + NumMessages: 1, NumExtensions: 0, NumServices: 1, }, GoTypes: file_grpc_gateway_proto_goTypes, DependencyIndexes: file_grpc_gateway_proto_depIdxs, + MessageInfos: file_grpc_gateway_proto_msgTypes, }.Build() File_grpc_gateway_proto = out.File file_grpc_gateway_proto_rawDesc = nil diff --git a/Server/common/proto/gen/grpc_pb/gateway_grpc.pb.go b/Server/common/proto/gen/grpc_pb/gateway_grpc.pb.go index 9a71d33..b0c5317 100644 --- a/Server/common/proto/gen/grpc_pb/gateway_grpc.pb.go +++ b/Server/common/proto/gen/grpc_pb/gateway_grpc.pb.go @@ -23,7 +23,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type GatewayClient interface { - SendMessage(ctx context.Context, in *common.Empty, opts ...grpc.CallOption) (*common.Empty, error) + ToClient(ctx context.Context, opts ...grpc.CallOption) (Gateway_ToClientClient, error) } type gatewayClient struct { @@ -34,20 +34,45 @@ func NewGatewayClient(cc grpc.ClientConnInterface) GatewayClient { return &gatewayClient{cc} } -func (c *gatewayClient) SendMessage(ctx context.Context, in *common.Empty, opts ...grpc.CallOption) (*common.Empty, error) { - out := new(common.Empty) - err := c.cc.Invoke(ctx, "/Gateway/SendMessage", in, out, opts...) +func (c *gatewayClient) ToClient(ctx context.Context, opts ...grpc.CallOption) (Gateway_ToClientClient, error) { + stream, err := c.cc.NewStream(ctx, &Gateway_ServiceDesc.Streams[0], "/Gateway/ToClient", opts...) if err != nil { return nil, err } - return out, nil + x := &gatewayToClientClient{stream} + return x, nil +} + +type Gateway_ToClientClient interface { + Send(*ToClientReq) error + CloseAndRecv() (*common.Empty, error) + grpc.ClientStream +} + +type gatewayToClientClient struct { + grpc.ClientStream +} + +func (x *gatewayToClientClient) Send(m *ToClientReq) error { + return x.ClientStream.SendMsg(m) +} + +func (x *gatewayToClientClient) CloseAndRecv() (*common.Empty, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(common.Empty) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // GatewayServer is the server API for Gateway service. // All implementations must embed UnimplementedGatewayServer // for forward compatibility type GatewayServer interface { - SendMessage(context.Context, *common.Empty) (*common.Empty, error) + ToClient(Gateway_ToClientServer) error mustEmbedUnimplementedGatewayServer() } @@ -55,8 +80,8 @@ type GatewayServer interface { type UnimplementedGatewayServer struct { } -func (UnimplementedGatewayServer) SendMessage(context.Context, *common.Empty) (*common.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method SendMessage not implemented") +func (UnimplementedGatewayServer) ToClient(Gateway_ToClientServer) error { + return status.Errorf(codes.Unimplemented, "method ToClient not implemented") } func (UnimplementedGatewayServer) mustEmbedUnimplementedGatewayServer() {} @@ -71,22 +96,30 @@ func RegisterGatewayServer(s grpc.ServiceRegistrar, srv GatewayServer) { s.RegisterService(&Gateway_ServiceDesc, srv) } -func _Gateway_SendMessage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(common.Empty) - if err := dec(in); err != nil { +func _Gateway_ToClient_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(GatewayServer).ToClient(&gatewayToClientServer{stream}) +} + +type Gateway_ToClientServer interface { + SendAndClose(*common.Empty) error + Recv() (*ToClientReq, error) + grpc.ServerStream +} + +type gatewayToClientServer struct { + grpc.ServerStream +} + +func (x *gatewayToClientServer) SendAndClose(m *common.Empty) error { + return x.ServerStream.SendMsg(m) +} + +func (x *gatewayToClientServer) Recv() (*ToClientReq, error) { + m := new(ToClientReq) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(GatewayServer).SendMessage(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/Gateway/SendMessage", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(GatewayServer).SendMessage(ctx, req.(*common.Empty)) - } - return interceptor(ctx, in, info, handler) + return m, nil } // Gateway_ServiceDesc is the grpc.ServiceDesc for Gateway service. @@ -95,12 +128,13 @@ func _Gateway_SendMessage_Handler(srv interface{}, ctx context.Context, dec func var Gateway_ServiceDesc = grpc.ServiceDesc{ ServiceName: "Gateway", HandlerType: (*GatewayServer)(nil), - Methods: []grpc.MethodDesc{ + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ { - MethodName: "SendMessage", - Handler: _Gateway_SendMessage_Handler, + StreamName: "ToClient", + Handler: _Gateway_ToClient_Handler, + ClientStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "grpc/gateway.proto", } diff --git a/Server/common/proto/gen/grpc_pb/scene.pb.go b/Server/common/proto/gen/grpc_pb/scene.pb.go new file mode 100644 index 0000000..e366258 --- /dev/null +++ b/Server/common/proto/gen/grpc_pb/scene.pb.go @@ -0,0 +1,335 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v4.25.1 +// source: grpc/scene.proto + +package grpc_pb + +import ( + common "common/proto/gen/common" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type EnterReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UID int32 `protobuf:"varint,1,opt,name=UID,proto3" json:"UID,omitempty"` // 用户ID + SID int64 `protobuf:"varint,2,opt,name=SID,proto3" json:"SID,omitempty"` // 服务ID + InstanceID int32 `protobuf:"varint,3,opt,name=InstanceID,proto3" json:"InstanceID,omitempty"` // 副本ID +} + +func (x *EnterReq) Reset() { + *x = EnterReq{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_scene_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EnterReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EnterReq) ProtoMessage() {} + +func (x *EnterReq) ProtoReflect() protoreflect.Message { + mi := &file_grpc_scene_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EnterReq.ProtoReflect.Descriptor instead. +func (*EnterReq) Descriptor() ([]byte, []int) { + return file_grpc_scene_proto_rawDescGZIP(), []int{0} +} + +func (x *EnterReq) GetUID() int32 { + if x != nil { + return x.UID + } + return 0 +} + +func (x *EnterReq) GetSID() int64 { + if x != nil { + return x.SID + } + return 0 +} + +func (x *EnterReq) GetInstanceID() int32 { + if x != nil { + return x.InstanceID + } + return 0 +} + +type EnterResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SID int64 `protobuf:"varint,1,opt,name=SID,proto3" json:"SID,omitempty"` // 服务ID + UniqueNo int64 `protobuf:"varint,2,opt,name=UniqueNo,proto3" json:"UniqueNo,omitempty"` // 副本唯一编号 +} + +func (x *EnterResp) Reset() { + *x = EnterResp{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_scene_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EnterResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EnterResp) ProtoMessage() {} + +func (x *EnterResp) ProtoReflect() protoreflect.Message { + mi := &file_grpc_scene_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EnterResp.ProtoReflect.Descriptor instead. +func (*EnterResp) Descriptor() ([]byte, []int) { + return file_grpc_scene_proto_rawDescGZIP(), []int{1} +} + +func (x *EnterResp) GetSID() int64 { + if x != nil { + return x.SID + } + return 0 +} + +func (x *EnterResp) GetUniqueNo() int64 { + if x != nil { + return x.UniqueNo + } + return 0 +} + +type ActionReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UniqueNo int64 `protobuf:"varint,1,opt,name=UniqueNo,proto3" json:"UniqueNo,omitempty"` // 副本唯一编号 + UID int32 `protobuf:"varint,2,opt,name=UID,proto3" json:"UID,omitempty"` // 用户ID + Action int32 `protobuf:"varint,3,opt,name=Action,proto3" json:"Action,omitempty"` // 动作ID + Payload []byte `protobuf:"bytes,4,opt,name=Payload,proto3" json:"Payload,omitempty"` // 动作数据 +} + +func (x *ActionReq) Reset() { + *x = ActionReq{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_scene_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ActionReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ActionReq) ProtoMessage() {} + +func (x *ActionReq) ProtoReflect() protoreflect.Message { + mi := &file_grpc_scene_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ActionReq.ProtoReflect.Descriptor instead. +func (*ActionReq) Descriptor() ([]byte, []int) { + return file_grpc_scene_proto_rawDescGZIP(), []int{2} +} + +func (x *ActionReq) GetUniqueNo() int64 { + if x != nil { + return x.UniqueNo + } + return 0 +} + +func (x *ActionReq) GetUID() int32 { + if x != nil { + return x.UID + } + return 0 +} + +func (x *ActionReq) GetAction() int32 { + if x != nil { + return x.Action + } + return 0 +} + +func (x *ActionReq) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +var File_grpc_scene_proto protoreflect.FileDescriptor + +var file_grpc_scene_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x73, 0x63, 0x65, 0x6e, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x1a, 0x0c, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x22, 0x4e, 0x0a, 0x08, 0x45, 0x6e, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x10, 0x0a, 0x03, + 0x55, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x55, 0x49, 0x44, 0x12, 0x10, + 0x0a, 0x03, 0x53, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x53, 0x49, 0x44, + 0x12, 0x1e, 0x0a, 0x0a, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, + 0x22, 0x39, 0x0a, 0x09, 0x45, 0x6e, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x12, 0x10, 0x0a, + 0x03, 0x53, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x53, 0x49, 0x44, 0x12, + 0x1a, 0x0a, 0x08, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4e, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x08, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4e, 0x6f, 0x22, 0x6b, 0x0a, 0x09, 0x41, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x12, 0x1a, 0x0a, 0x08, 0x55, 0x6e, 0x69, 0x71, + 0x75, 0x65, 0x4e, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x55, 0x6e, 0x69, 0x71, + 0x75, 0x65, 0x4e, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x03, 0x55, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, + 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x32, 0x4b, 0x0a, 0x05, 0x53, 0x63, 0x65, 0x6e, + 0x65, 0x12, 0x20, 0x0a, 0x05, 0x45, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x09, 0x2e, 0x45, 0x6e, 0x74, + 0x65, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x0a, 0x2e, 0x45, 0x6e, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, + 0x70, 0x22, 0x00, 0x12, 0x20, 0x0a, 0x06, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x2e, + 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x1a, 0x06, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x28, 0x01, 0x42, 0x1a, 0x5a, 0x18, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_grpc_scene_proto_rawDescOnce sync.Once + file_grpc_scene_proto_rawDescData = file_grpc_scene_proto_rawDesc +) + +func file_grpc_scene_proto_rawDescGZIP() []byte { + file_grpc_scene_proto_rawDescOnce.Do(func() { + file_grpc_scene_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_scene_proto_rawDescData) + }) + return file_grpc_scene_proto_rawDescData +} + +var file_grpc_scene_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_grpc_scene_proto_goTypes = []interface{}{ + (*EnterReq)(nil), // 0: EnterReq + (*EnterResp)(nil), // 1: EnterResp + (*ActionReq)(nil), // 2: ActionReq + (*common.Empty)(nil), // 3: Empty +} +var file_grpc_scene_proto_depIdxs = []int32{ + 0, // 0: Scene.Enter:input_type -> EnterReq + 2, // 1: Scene.Action:input_type -> ActionReq + 1, // 2: Scene.Enter:output_type -> EnterResp + 3, // 3: Scene.Action:output_type -> Empty + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_grpc_scene_proto_init() } +func file_grpc_scene_proto_init() { + if File_grpc_scene_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_grpc_scene_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EnterReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_scene_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EnterResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_scene_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ActionReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_grpc_scene_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_grpc_scene_proto_goTypes, + DependencyIndexes: file_grpc_scene_proto_depIdxs, + MessageInfos: file_grpc_scene_proto_msgTypes, + }.Build() + File_grpc_scene_proto = out.File + file_grpc_scene_proto_rawDesc = nil + file_grpc_scene_proto_goTypes = nil + file_grpc_scene_proto_depIdxs = nil +} diff --git a/Server/common/proto/gen/grpc_pb/scene_grpc.pb.go b/Server/common/proto/gen/grpc_pb/scene_grpc.pb.go new file mode 100644 index 0000000..8ad0280 --- /dev/null +++ b/Server/common/proto/gen/grpc_pb/scene_grpc.pb.go @@ -0,0 +1,177 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v4.25.1 +// source: grpc/scene.proto + +package grpc_pb + +import ( + common "common/proto/gen/common" + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// SceneClient is the client API for Scene service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SceneClient interface { + Enter(ctx context.Context, in *EnterReq, opts ...grpc.CallOption) (*EnterResp, error) + Action(ctx context.Context, opts ...grpc.CallOption) (Scene_ActionClient, error) +} + +type sceneClient struct { + cc grpc.ClientConnInterface +} + +func NewSceneClient(cc grpc.ClientConnInterface) SceneClient { + return &sceneClient{cc} +} + +func (c *sceneClient) Enter(ctx context.Context, in *EnterReq, opts ...grpc.CallOption) (*EnterResp, error) { + out := new(EnterResp) + err := c.cc.Invoke(ctx, "/Scene/Enter", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *sceneClient) Action(ctx context.Context, opts ...grpc.CallOption) (Scene_ActionClient, error) { + stream, err := c.cc.NewStream(ctx, &Scene_ServiceDesc.Streams[0], "/Scene/Action", opts...) + if err != nil { + return nil, err + } + x := &sceneActionClient{stream} + return x, nil +} + +type Scene_ActionClient interface { + Send(*ActionReq) error + CloseAndRecv() (*common.Empty, error) + grpc.ClientStream +} + +type sceneActionClient struct { + grpc.ClientStream +} + +func (x *sceneActionClient) Send(m *ActionReq) error { + return x.ClientStream.SendMsg(m) +} + +func (x *sceneActionClient) CloseAndRecv() (*common.Empty, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(common.Empty) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// SceneServer is the server API for Scene service. +// All implementations must embed UnimplementedSceneServer +// for forward compatibility +type SceneServer interface { + Enter(context.Context, *EnterReq) (*EnterResp, error) + Action(Scene_ActionServer) error + mustEmbedUnimplementedSceneServer() +} + +// UnimplementedSceneServer must be embedded to have forward compatible implementations. +type UnimplementedSceneServer struct { +} + +func (UnimplementedSceneServer) Enter(context.Context, *EnterReq) (*EnterResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method Enter not implemented") +} +func (UnimplementedSceneServer) Action(Scene_ActionServer) error { + return status.Errorf(codes.Unimplemented, "method Action not implemented") +} +func (UnimplementedSceneServer) mustEmbedUnimplementedSceneServer() {} + +// UnsafeSceneServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SceneServer will +// result in compilation errors. +type UnsafeSceneServer interface { + mustEmbedUnimplementedSceneServer() +} + +func RegisterSceneServer(s grpc.ServiceRegistrar, srv SceneServer) { + s.RegisterService(&Scene_ServiceDesc, srv) +} + +func _Scene_Enter_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(EnterReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SceneServer).Enter(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Scene/Enter", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SceneServer).Enter(ctx, req.(*EnterReq)) + } + return interceptor(ctx, in, info, handler) +} + +func _Scene_Action_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SceneServer).Action(&sceneActionServer{stream}) +} + +type Scene_ActionServer interface { + SendAndClose(*common.Empty) error + Recv() (*ActionReq, error) + grpc.ServerStream +} + +type sceneActionServer struct { + grpc.ServerStream +} + +func (x *sceneActionServer) SendAndClose(m *common.Empty) error { + return x.ServerStream.SendMsg(m) +} + +func (x *sceneActionServer) Recv() (*ActionReq, error) { + m := new(ActionReq) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Scene_ServiceDesc is the grpc.ServiceDesc for Scene service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Scene_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Scene", + HandlerType: (*SceneServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Enter", + Handler: _Scene_Enter_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Action", + Handler: _Scene_Action_Handler, + ClientStreams: true, + }, + }, + Metadata: "grpc/scene.proto", +} diff --git a/Server/common/proto/grpc/gateway.proto b/Server/common/proto/grpc/gateway.proto index 1c87777..58c92d3 100644 --- a/Server/common/proto/grpc/gateway.proto +++ b/Server/common/proto/grpc/gateway.proto @@ -4,6 +4,11 @@ option go_package = "common/proto/gen/grpc_pb"; import "common.proto"; service Gateway { - rpc SendMessage(Empty) returns (Empty) {} + rpc ToClient(stream ToClientReq) returns (Empty) {} +} + +message ToClientReq { + int32 UID = 1; + bytes Payload = 2; } diff --git a/Server/common/proto/grpc/scene.proto b/Server/common/proto/grpc/scene.proto index 8e1755e..fbde58f 100644 --- a/Server/common/proto/grpc/scene.proto +++ b/Server/common/proto/grpc/scene.proto @@ -5,13 +5,23 @@ import "common.proto"; service Scene { rpc Enter(EnterReq) returns (EnterResp) {} + rpc Action(stream ActionReq) returns (Empty) {} } message EnterReq { - string name = 1; + int32 UID = 1; // 用户ID + int64 SID = 2; // 服务ID + int32 InstanceID = 3; // 副本ID } message EnterResp { - string name = 1; + int64 SID = 1; // 服务ID + int64 UniqueNo = 2; // 副本唯一编号 } +message ActionReq { + int64 UniqueNo = 1; // 副本唯一编号 + int32 UID = 2; // 用户ID + int32 Action = 3; // 动作ID + bytes Payload = 4; // 动作数据 +} diff --git a/Server/common/utils/number.go b/Server/common/utils/number.go index 1fd4f07..ce618ee 100644 --- a/Server/common/utils/number.go +++ b/Server/common/utils/number.go @@ -2,6 +2,7 @@ package utils import ( "math/rand" + "strconv" ) // RandInt 生成 [min, max] 范围内的随机整数 @@ -11,3 +12,12 @@ func RandInt(min, max int) int { } return rand.Intn(max-min+1) + min } + +// StringToInt64 converts a string to int64, returns 0 if failed +func StringToInt64(s string) int64 { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0 + } + return i +} diff --git a/Server/scene/app/app.go b/Server/scene/app/app.go index cf1c9dd..f08028b 100644 --- a/Server/scene/app/app.go +++ b/Server/scene/app/app.go @@ -5,9 +5,7 @@ import ( "common/discover" "common/log" "common/net/grpc/service" - "common/net/socket/websocket" "fmt" - "github.com/gin-gonic/gin" "github.com/judwhite/go-svc" "runtime/debug" "scene/config" @@ -16,11 +14,9 @@ import ( ) type Program struct { - wg *sync.WaitGroup - server service.IService // grpc服务 - webServer *gin.Engine // web服务 - wsServer *websocket.WSServer // websocket服务 - stop chan bool + wg *sync.WaitGroup + server service.IService // grpc服务 + stop chan bool } func (p *Program) Init(_ svc.Environment) error { @@ -47,14 +43,6 @@ func (p *Program) Start() error { discover.Listen() p.server = grpc_server.NewServer(config.Get().Serve.Grpc.TTL) p.server.Init(config.Get().Serve.Grpc.Address, config.Get().Serve.Grpc.Port) - go func() { - cfg := config.Get() - _ = p.wsServer.Run( - log.GetLogger().Named("gnet"), - fmt.Sprintf("tcp4://0.0.0.0:%v", cfg.Serve.Socket.Web.Port), - true, true, false, false, true, 8, - ) - }() return nil } diff --git a/Server/scene/config/config.dev.yaml b/Server/scene/config/config.dev.yaml index 545a53c..9460a4a 100644 --- a/Server/scene/config/config.dev.yaml +++ b/Server/scene/config/config.dev.yaml @@ -1,12 +1,12 @@ app: - name: "gateway-dev" + name: "scene-dev" log: debug: true level: "debug" - max_size: 100 - max_backups: 3 - max_age: 7 + maxSize: 100 + maxBackups: 3 + maxAge: 7 db: etcd: diff --git a/Server/scene/config/config.go b/Server/scene/config/config.go index 94a8c32..6ebc6ec 100644 --- a/Server/scene/config/config.go +++ b/Server/scene/config/config.go @@ -13,9 +13,9 @@ type AppConfig struct { type LogConfig struct { Debug bool `yaml:"debug"` - MaxSize int `yaml:"max_size"` - MaxBackups int `yaml:"max_backups"` - MaxAge int `yaml:"max_age"` + MaxSize int `yaml:"maxSize"` + MaxBackups int `yaml:"maxBackups"` + MaxAge int `yaml:"maxAge"` Level string `yaml:"level"` } @@ -27,17 +27,8 @@ type DBConfig struct { type ServeConfig struct { Grpc *struct { - AddressConfig - TTL int64 `yaml:"ttl"` + Address string `yaml:"address"` + Port int `yaml:"port"` + TTL int64 `yaml:"ttl"` } `yaml:"grpc"` - Socket *struct { - Web *AddressConfig `yaml:"web"` - Raw *AddressConfig `yaml:"raw"` - } `yaml:"socket"` - Http *AddressConfig `yaml:"http"` -} - -type AddressConfig struct { - Address string `yaml:"address"` - Port int `yaml:"port"` } diff --git a/Server/scene/grpc_server/server.go b/Server/scene/grpc_server/server.go index a3ca5b6..55b8af5 100644 --- a/Server/scene/grpc_server/server.go +++ b/Server/scene/grpc_server/server.go @@ -3,10 +3,43 @@ package grpc_server import ( "common/log" "common/proto/gen/common" + "common/proto/gen/grpc_pb" "context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "scene/instance" + "sync" ) -func (s *Server) SendMessage(ctx context.Context, req *common.Empty) (*common.Empty, error) { - log.Infof("未实现函数") - return nil, nil +func (s *Server) Enter(ctx context.Context, req *grpc_pb.EnterReq) (*grpc_pb.EnterResp, error) { + log.Infof("enter 触发 %v", req.SID) + return nil, status.Errorf(codes.Unimplemented, "") +} + +func (s *Server) Action(server grpc_pb.Scene_ActionServer) error { + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + if err := recover(); err != nil { + log.Errorf("Action panic: %v", err) + } + }() + for { + if args, err := server.Recv(); err != nil { + return + } else { + if ins := instance.Mgr.GetByUniqueNo(args.UniqueNo); ins != nil { + select { + case ins.EventIn <- args: + default: + log.Warnf("instance event in full: %v, %v", ins.InstanceID, ins.UniqueNo) + } + } + } + } + }() + wg.Wait() + return server.SendAndClose(&common.Empty{}) } diff --git a/Server/scene/grpc_server/server_init.go b/Server/scene/grpc_server/server_init.go index 8ca2161..f56f4fe 100644 --- a/Server/scene/grpc_server/server_init.go +++ b/Server/scene/grpc_server/server_init.go @@ -8,14 +8,14 @@ import ( ) type Server struct { - grpc_pb.UnimplementedGatewayServer + grpc_pb.UnimplementedSceneServer service.Base } func NewServer(ttl int64) *Server { s := &Server{ Base: service.Base{ - Target: common.KeyDiscoverGateway, + Target: common.KeyDiscoverScene, EtcdTTL: ttl, }, } @@ -25,7 +25,7 @@ func NewServer(ttl int64) *Server { } func (s *Server) OnInit(serve *grpc.Server) { - grpc_pb.RegisterGatewayServer(serve, s) + grpc_pb.RegisterSceneServer(serve, s) } func (s *Server) OnClose() { diff --git a/Server/scene/instance/define.go b/Server/scene/instance/define.go new file mode 100644 index 0000000..725b311 --- /dev/null +++ b/Server/scene/instance/define.go @@ -0,0 +1,9 @@ +package instance + +import "google.golang.org/protobuf/proto" + +type Msg2Client struct { + SID string + UID int + Msg proto.Message +} diff --git a/Server/scene/instance/instance.go b/Server/scene/instance/instance.go new file mode 100644 index 0000000..8681f3b --- /dev/null +++ b/Server/scene/instance/instance.go @@ -0,0 +1,97 @@ +package instance + +import ( + "common/discover" + "common/log" + "common/proto/gen/grpc_pb" + "common/utils" + "context" + "google.golang.org/protobuf/proto" + "runtime/debug" + "sync" + "time" +) + +// Instance 场景类 +type Instance struct { + wg sync.WaitGroup + players map[int]*PlayerNode // 存储所有玩家节点 [uid] + ctx context.Context // 停止指令 + cancel context.CancelFunc // 停止函数 + + SID int64 // 服务ID + InstanceID int // 副本ID + UniqueNo int64 // 唯一编号 + EventIn chan proto.Message // 消息入口 + EventOut chan *Msg2Client // 消息出口 +} + +// NewScene 初始化场景 +func NewScene(sid int64, instanceID int) *Instance { + s := &Instance{ + players: make(map[int]*PlayerNode), + SID: sid, + InstanceID: instanceID, + UniqueNo: utils.SnowflakeInstance().Generate().Int64(), + EventIn: make(chan proto.Message), + EventOut: make(chan *Msg2Client), + } + s.ctx, s.cancel = context.WithCancel(context.Background()) + return s +} + +func (i *Instance) Start(ttl int64) { + i.wg.Add(1) + go func() { + defer i.wg.Done() + defer func() { + if err := recover(); err != nil { + log.Infof("instance err: %v, uniqueNo: %v", err, i.UniqueNo) + debug.PrintStack() + } + }() + Mgr.Add(i.UniqueNo, i) + defer func() { + log.Infof("instance destroy: %v, %v", i.InstanceID, i.UniqueNo) + discover.UnRegisterInstance(i.UniqueNo) + close(i.EventIn) + Mgr.Delete(i.UniqueNo) + }() + + if err := discover.RegisterInstance(i.SID, i.InstanceID, i.UniqueNo, ttl); err != nil { + log.Errorf("register discover error") + return + } + // 场景心跳 + tick := time.NewTicker(50 * time.Millisecond) + for { + select { + case <-i.ctx.Done(): + return + case <-tick.C: + i.onLogic() + case e := <-i.EventIn: + i.onEvent(e) + } + } + }() + log.Infof("new scene start: %v, %v", i.InstanceID, i.UniqueNo) +} + +// 网络帧 +func (i *Instance) onEvent(e proto.Message) { + switch v := e.(type) { + case *grpc_pb.ActionReq: + if node, ok := i.players[int(v.UID)]; ok { + node.addAction(v) + } + } +} + +// 逻辑帧 +func (i *Instance) onLogic() { + // 优先处理移动指令 + for _, node := range i.players { + node.logicMove() + } +} diff --git a/Server/scene/instance/manager.go b/Server/scene/instance/manager.go new file mode 100644 index 0000000..e515fdc --- /dev/null +++ b/Server/scene/instance/manager.go @@ -0,0 +1,42 @@ +package instance + +import ( + "sync" +) + +var Mgr *insManager + +type insManager struct { + sync.RWMutex + insMap map[int64]*Instance // [uniqueNo] +} + +func init() { + Mgr = &insManager{ + insMap: make(map[int64]*Instance), + } +} + +func (m *insManager) Add(uniqueNo int64, ins *Instance) { + m.Lock() + defer m.Unlock() + m.insMap[uniqueNo] = ins +} + +func (m *insManager) Delete(uniqueNo int64) { + m.Lock() + defer m.Unlock() + delete(m.insMap, uniqueNo) +} + +func (m *insManager) GetAll() map[int64]*Instance { + m.RLock() + defer m.RUnlock() + return m.insMap +} + +func (m *insManager) GetByUniqueNo(uniqueNo int64) *Instance { + m.RLock() + defer m.RUnlock() + return m.insMap[uniqueNo] +} diff --git a/Server/scene/instance/player.go b/Server/scene/instance/player.go new file mode 100644 index 0000000..bcccbb6 --- /dev/null +++ b/Server/scene/instance/player.go @@ -0,0 +1,63 @@ +package instance + +import ( + "common/proto/gen/grpc_pb" +) + +// PlayerNode 定义玩家节点结构体 +type PlayerNode struct { + ID int + Position [2]float64 // 二维坐标 [x, y] + MoveCross int8 // 移动十字,1 上 2 下 4 左 8 右 + Action []*grpc_pb.ActionReq // 其他操作 +} + +// 将指令存储到玩家身上 +func (p *PlayerNode) addAction(e *grpc_pb.ActionReq) { + if e.Action < 16 { + p.MoveCross = int8(e.Action) + } else { + p.Action = append(p.Action, e) + } +} + +const ( + DirUp int8 = 1 << iota // 1 (00000001) + DirDown // 2 (00000010) + DirLeft // 4 (00000100) + DirRight // 8 (00001000) +) + +// 逻辑帧-移动 +func (p *PlayerNode) logicMove() { + if p.MoveCross&DirUp != 0 && p.MoveCross&DirDown != 0 { + p.MoveCross &^= DirUp | DirDown + } + if p.MoveCross&DirLeft != 0 && p.MoveCross&DirRight != 0 { + p.MoveCross &^= DirLeft | DirRight + } + + var moveX, moveY float64 + if p.MoveCross&DirUp != 0 { + moveY += 1 + } + if p.MoveCross&DirDown != 0 { + moveY -= 1 + } + if p.MoveCross&DirLeft != 0 { + moveX -= 1 + } + if p.MoveCross&DirRight != 0 { + moveX += 1 + } + + if moveX != 0 && moveY != 0 { + const diagonalFactor = 0.7071 + moveX *= diagonalFactor + moveY *= diagonalFactor + } + + speed := 100.0 + p.Position[0] += moveX * speed + p.Position[1] += moveY * speed +}