feat 初次提交

This commit is contained in:
2026-01-03 14:26:10 +08:00
parent f4cb8128c7
commit 506f7ed5ed
18 changed files with 885 additions and 0 deletions

View File

@@ -0,0 +1,74 @@
package server
import (
"common/log"
"common/proto/sc/sc_pb"
"common/proto/ss/grpc_pb"
"context"
"google.golang.org/protobuf/proto"
instance2 "scene/internal/instance"
"sync"
)
func (s *Server) Enter(ctx context.Context, req *grpc_pb.EnterReq) (*grpc_pb.EnterResp, error) {
var i *instance2.Instance
if len(instance2.Mgr.GetAll()) == 0 {
i = instance2.NewScene(s.SID, req.InstanceID)
i.Start(s.EtcdTTL)
} else {
for _, v := range instance2.Mgr.GetAll() {
i = v
break
}
}
i.EventIn <- req
payload, _ := proto.Marshal(&sc_pb.S2C_EnterInstance{
Info: &sc_pb.PositionInfo{
USN: req.USN,
X: 1,
Y: 1,
},
})
return &grpc_pb.EnterResp{
SceneSID: s.SID,
UniqueNo: i.UniqueNo,
MessageID: int32(sc_pb.MessageID_MESSAGE_ID_ENTER_INSTANCE),
Payload: payload,
}, nil
}
func (s *Server) Leave(ctx context.Context, req *grpc_pb.LeaveReq) (*grpc_pb.LeaveResp, error) {
if i := instance2.Mgr.GetByUniqueNo(req.UniqueNo); i != nil {
i.EventIn <- req
}
return &grpc_pb.LeaveResp{}, nil
}
func (s *Server) Action(server grpc_pb.Scene_ActionServer) error {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
log.Errorf("Action panic: %v", err)
}
}()
for {
if args, err := server.Recv(); err != nil {
return
} else {
if ins := instance2.Mgr.GetByUniqueNo(args.UniqueNo); ins != nil {
select {
case ins.EventIn <- args:
default:
log.Warnf("instance event in full: %v, %v", ins.InstanceID, ins.UniqueNo)
}
}
}
}
}()
wg.Wait()
return server.SendAndClose(&grpc_pb.ActionResp{})
}

View File

@@ -0,0 +1,32 @@
package server
import (
"common/discover/common"
"common/net/grpc/service"
"common/proto/ss/grpc_pb"
"google.golang.org/grpc"
)
type Server struct {
grpc_pb.UnimplementedSceneServer
service.Base
}
func NewServer(ttl int64) *Server {
s := &Server{
Base: service.Base{
Target: common.KeyDiscoverScene,
EtcdTTL: ttl,
},
}
s.Base.OnInit = s.OnInit
s.Base.OnClose = s.OnClose
return s
}
func (s *Server) OnInit(serve *grpc.Server) {
grpc_pb.RegisterSceneServer(serve, s)
}
func (s *Server) OnClose() {
}

View File

@@ -0,0 +1,65 @@
package stream_client
import (
"common/log"
"common/net/grpc/service"
"context"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
var gatewayServerM map[int64]map[GatewayFun]grpc.ClientStream // map[sid]map[方法名]流连接
type GatewayFun int
const (
FunToClient GatewayFun = iota
)
func init() {
gatewayServerM = make(map[int64]map[GatewayFun]grpc.ClientStream)
}
func FindGatewayBySID(sid int64, fun GatewayFun) (grpc.ClientStream, error) {
g := gatewayServerM[sid]
if g == nil {
g = make(map[GatewayFun]grpc.ClientStream)
gatewayServerM[sid] = g
}
gatewayLink := g[fun]
if gatewayLink == nil {
gatewayClient, err := service.GatewayNewClient(sid)
if err != nil {
log.Errorf("cannot find gatewayClient: %v", err)
return nil, err
}
var link grpc.ClientStream
switch fun {
case FunToClient:
link, err = gatewayClient.ToClient(context.Background())
}
if err != nil {
log.Errorf("FindGatewayBySID %v err: %v, sid: %v", fun, err, sid)
return nil, err
}
g[fun] = link
gatewayLink = link
}
return gatewayLink, nil
}
func SendMessageToGateway(sid int64, fun GatewayFun, msg proto.Message, re ...bool) error {
stream, err := FindGatewayBySID(sid, fun)
if err != nil {
return err
}
if err = stream.SendMsg(msg); err != nil {
if re == nil || !re[0] {
_ = stream.CloseSend()
delete(gatewayServerM[sid], fun)
return SendMessageToGateway(sid, fun, msg, true)
}
return err
}
return nil
}

View File

@@ -0,0 +1,131 @@
package instance
import (
"common/discover"
"common/log"
"common/proto/sc/sc_pb"
"common/proto/ss/grpc_pb"
"common/utils"
"context"
"fmt"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"runtime/debug"
"scene/internal/grpc_server/stream_client"
"scene/internal/npc"
"sync"
"time"
)
// Instance 场景类
type Instance struct {
wg sync.WaitGroup
players map[int64]*npc.PlayerNode // 存储所有玩家节点 [usn]
ctx context.Context // 停止指令
cancel context.CancelFunc // 停止函数
logger *zap.SugaredLogger // 日志
lastLogicTime int64 // 上次逻辑帧执行时间(毫秒时间戳)
SID int64 // 服务ID
InstanceID int32 // 副本ID
UniqueNo int64 // 唯一编号
EventIn chan proto.Message // 消息入口
}
// NewScene 初始化场景
func NewScene(sid int64, instanceID int32) *Instance {
s := &Instance{
players: make(map[int64]*npc.PlayerNode),
SID: sid,
InstanceID: instanceID,
UniqueNo: utils.SnowflakeInstance().Generate().Int64(),
EventIn: make(chan proto.Message, 1024),
}
s.logger = log.GetLogger().Named(fmt.Sprintf("instance %v:%v", s.InstanceID, s.UniqueNo))
s.ctx, s.cancel = context.WithCancel(context.Background())
return s
}
func (i *Instance) Start(ttl int64) {
i.wg.Add(1)
go func() {
defer i.wg.Done()
defer func() {
if err := recover(); err != nil {
i.logger.Infof("instance err: %v", err)
debug.PrintStack()
}
}()
Mgr.Add(i.UniqueNo, i)
defer func() {
i.logger.Infof("instance destroy")
discover.UnRegisterInstance(i.UniqueNo)
close(i.EventIn)
Mgr.Delete(i.UniqueNo)
}()
if err := discover.RegisterInstance(i.SID, i.InstanceID, i.UniqueNo, ttl); err != nil {
i.logger.Errorf("register discover error")
return
}
// 场景心跳
tick := time.NewTicker(50 * time.Millisecond)
i.lastLogicTime = time.Now().UnixMilli()
for {
select {
case <-i.ctx.Done():
return
case <-tick.C:
now := time.Now().UnixMilli()
i.onLogic(now - i.lastLogicTime)
i.lastLogicTime = now
case e := <-i.EventIn:
i.onEvent(e)
}
}
}()
i.logger.Infof("new scene start")
}
// 网络帧,将玩家的操作储存起来,到逻辑帧再处理玩家操作
func (i *Instance) onEvent(e proto.Message) {
switch v := e.(type) {
case *grpc_pb.EnterReq:
i.players[v.USN] = npc.NewPlayerNode(v.GatewaySID, v.USN)
case *grpc_pb.ActionReq:
if node, ok := i.players[v.USN]; ok {
node.AddAction(v)
}
case *grpc_pb.LeaveReq:
delete(i.players, v.USN)
}
}
// 逻辑帧
func (i *Instance) onLogic(delta int64) {
positionUpdate := make([]*sc_pb.PositionInfo, 0)
sid := int64(0)
// 处理玩家指令
for _, node := range i.players {
if node.LogicAction(delta) {
positionUpdate = append(positionUpdate, &sc_pb.PositionInfo{
USN: node.USN,
X: int32(node.Position[0] * 100),
Y: int32(node.Position[1] * 100),
})
sid = node.GatewaySID
}
}
if len(positionUpdate) > 0 {
payload, _ := proto.Marshal(&sc_pb.S2C_Position{
Info: positionUpdate,
})
if err := stream_client.SendMessageToGateway(sid, stream_client.FunToClient, &grpc_pb.ToClientReq{
USN: -1,
MessageID: int32(sc_pb.MessageID_MESSAGE_ID_POSITION),
Payload: payload,
}); err != nil {
i.logger.Errorf("send action err: %v", err)
}
}
}

View File

@@ -0,0 +1,42 @@
package instance
import (
"sync"
)
var Mgr *insManager
type insManager struct {
sync.RWMutex
insMap map[int64]*Instance // [uniqueNo]
}
func init() {
Mgr = &insManager{
insMap: make(map[int64]*Instance),
}
}
func (m *insManager) Add(uniqueNo int64, ins *Instance) {
m.Lock()
defer m.Unlock()
m.insMap[uniqueNo] = ins
}
func (m *insManager) Delete(uniqueNo int64) {
m.Lock()
defer m.Unlock()
delete(m.insMap, uniqueNo)
}
func (m *insManager) GetAll() map[int64]*Instance {
m.RLock()
defer m.RUnlock()
return m.insMap
}
func (m *insManager) GetByUniqueNo(uniqueNo int64) *Instance {
m.RLock()
defer m.RUnlock()
return m.insMap[uniqueNo]
}

25
internal/npc/npc.go Normal file
View File

@@ -0,0 +1,25 @@
package npc
import (
"common/proto/ss/grpc_pb"
)
// NPCNode 定义NPC节点
type NPCNode struct {
USN int64 // 用户ID
GatewaySID int64 // 网关服务ID
Position [2]float64 // 二维坐标 [x, y]
MoveCross int8 // 移动十字1 上 2 下 4 左 8 右
Action []*grpc_pb.ActionReq // 其他操作
}
func NewNPCNode(gatewaySID int64, usn int64) *NPCNode {
return &NPCNode{
USN: usn,
GatewaySID: gatewaySID,
Position: [2]float64{0, 0},
MoveCross: 0,
Action: make([]*grpc_pb.ActionReq, 0),
}
}

64
internal/npc/player.go Normal file
View File

@@ -0,0 +1,64 @@
package npc
import (
"common/proto/sc/sc_pb"
"common/proto/ss/grpc_pb"
)
// PlayerNode 定义玩家节点结构体
type PlayerNode struct {
USN int64 // 用户ID
GatewaySID int64 // 网关服务ID
MoveSpeed float32 // 移动速度
Position [2]float32 // 二维坐标 [x, y]
MoveDirection [2]int8 // 移动方向 [x, y]
MoveCount int8 // 移动指令使用计数
Action []*grpc_pb.ActionReq // 其他操作
}
func NewPlayerNode(gatewaySID int64, usn int64) *PlayerNode {
return &PlayerNode{
USN: usn,
GatewaySID: gatewaySID,
MoveSpeed: 5 * 0.00001,
Position: [2]float32{0, 0},
Action: make([]*grpc_pb.ActionReq, 0),
}
}
// AddAction 将指令存储到玩家身上
func (p *PlayerNode) AddAction(e *grpc_pb.ActionReq) {
p.Action = append(p.Action, e)
}
// LogicAction 处理玩家操作指令
// return 是否更新玩家状态
func (p *PlayerNode) LogicAction(delta int64) bool {
for _, a := range p.Action {
switch sc_pb.ActionID(a.Action) {
case sc_pb.ActionID_ACTION_ID_MOVE:
p.MoveDirection = [2]int8{int8(a.DirX), int8(a.DirY)}
p.MoveCount = 0
case sc_pb.ActionID_ACTION_ID_ATTACK:
}
}
p.Action = make([]*grpc_pb.ActionReq, 0)
return p.move(delta)
}
func (p *PlayerNode) move(delta int64) bool {
if p.MoveDirection[0] == 0 && p.MoveDirection[1] == 0 {
return false
}
// 移动指令只能用2秒客户端需要定期续上
if p.MoveCount >= 40 {
p.MoveDirection = [2]int8{0, 0}
return false
}
p.Position[0] += float32(p.MoveDirection[0]) * float32(delta) * p.MoveSpeed
p.Position[1] += float32(p.MoveDirection[1]) * float32(delta) * p.MoveSpeed
p.MoveCount++
return true
}