185 lines
4.4 KiB
Go
185 lines
4.4 KiB
Go
package login
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"git.hlsq.asia/mmorpg/service-common/db/redis"
|
|
"git.hlsq.asia/mmorpg/service-common/log"
|
|
"git.hlsq.asia/mmorpg/service-common/net/grpc/grpc_client"
|
|
"git.hlsq.asia/mmorpg/service-common/proto/rs/grpc_pb"
|
|
"git.hlsq.asia/mmorpg/service-common/proto/ss/ss_pb"
|
|
"git.hlsq.asia/mmorpg/service-common/utils"
|
|
"git.hlsq.asia/mmorpg/service-gateway/internal/global"
|
|
"git.hlsq.asia/mmorpg/service-gateway/internal/handler/ws_handler/client"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var loginQueue *Login
|
|
|
|
// Login 登录队列结构
|
|
type Login struct {
|
|
queue chan *User // 用户队列
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type User struct {
|
|
Cli *client.Client
|
|
Token string
|
|
}
|
|
|
|
func NewLoginQueue(maxSize int) *Login {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
loginQueue = &Login{
|
|
queue: make(chan *User, maxSize),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
return loginQueue
|
|
}
|
|
|
|
func GetLoginQueue() *Login {
|
|
return loginQueue
|
|
}
|
|
|
|
// AddToLoginQueue 添加到登录队列
|
|
func (l *Login) AddToLoginQueue(user *User) bool {
|
|
select {
|
|
case l.queue <- user:
|
|
return true
|
|
default:
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (l *Login) Start(num int) {
|
|
for i := 0; i < num; i++ {
|
|
l.wg.Add(1)
|
|
go func() {
|
|
defer l.wg.Done()
|
|
for {
|
|
select {
|
|
case <-l.ctx.Done():
|
|
return
|
|
case user, ok := <-l.queue:
|
|
if ok {
|
|
l.StartLogin(user)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
// 定时从排队的队列中取用户
|
|
l.wg.Add(1)
|
|
go func() {
|
|
defer l.wg.Done()
|
|
tick := time.NewTicker(1 * time.Second)
|
|
for {
|
|
select {
|
|
case <-tick.C:
|
|
for {
|
|
if client.UserMgr.GetSize() < global.MaxOnlineSize {
|
|
cli, err := GetQueueUp().Dequeue()
|
|
if err != nil {
|
|
log.Errorf("dequeue err: %v", err)
|
|
return
|
|
}
|
|
if cli == nil {
|
|
break
|
|
}
|
|
l.LoginSuccess(cli)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
case <-l.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (l *Login) Stop() {
|
|
l.cancel()
|
|
l.wg.Wait()
|
|
}
|
|
|
|
// StartLogin 开始登录流程
|
|
func (l *Login) StartLogin(user *User) {
|
|
if !l.CheckToken(user) {
|
|
user.Cli.WriteMessage(ss_pb.MessageID_MESSAGE_ID_KICK_OUT, &ss_pb.S2C_KickOut{
|
|
ID: ss_pb.KickOutID_KICK_OUT_ID_TOKEN_INVALID,
|
|
})
|
|
user.Cli.CloseClient()
|
|
return
|
|
}
|
|
if gatewaySID := l.CheckOnline(user); len(gatewaySID) > 0 {
|
|
// 如果在线就要踢,如果踢失败了就返回服务器繁忙,一般不应该走到这里
|
|
if !l.KickUser(user.Cli.SceneSID, user.Cli.USN) {
|
|
user.Cli.WriteMessage(ss_pb.MessageID_MESSAGE_ID_KICK_OUT, &ss_pb.S2C_KickOut{
|
|
ID: ss_pb.KickOutID_KICK_OUT_ID_SERVER_BUSY,
|
|
})
|
|
user.Cli.CloseClient()
|
|
return
|
|
}
|
|
}
|
|
if client.UserMgr.GetSize() >= global.MaxOnlineSize {
|
|
// 如果人数满了就排队
|
|
if err := GetQueueUp().Enqueue(user.Cli); err != nil {
|
|
user.Cli.WriteMessage(ss_pb.MessageID_MESSAGE_ID_KICK_OUT, &ss_pb.S2C_KickOut{
|
|
ID: ss_pb.KickOutID_KICK_OUT_ID_QUEUE_UP_FULL,
|
|
})
|
|
user.Cli.CloseClient()
|
|
return
|
|
}
|
|
// 告诉客户端正在排队
|
|
position, ok := GetQueueUp().GetPosition(user.Cli.USN)
|
|
if !ok {
|
|
user.Cli.WriteMessage(ss_pb.MessageID_MESSAGE_ID_KICK_OUT, &ss_pb.S2C_KickOut{
|
|
ID: ss_pb.KickOutID_KICK_OUT_ID_SERVER_BUSY,
|
|
})
|
|
user.Cli.CloseClient()
|
|
return
|
|
}
|
|
user.Cli.WriteMessage(ss_pb.MessageID_MESSAGE_ID_QUEUE_UP, &ss_pb.S2C_QueueUp{
|
|
QueueUpCount: int32(position),
|
|
})
|
|
} else {
|
|
l.LoginSuccess(user.Cli)
|
|
}
|
|
}
|
|
|
|
// CheckToken 校验Token是否有效
|
|
func (l *Login) CheckToken(user *User) bool {
|
|
usn, _ := redis.GetClient().HGet(context.Background(), global.KeyGatewayAccessToken+user.Token, (&utils.UserSession{}).GetUsnKey()).Int64()
|
|
user.Cli.SetUSN(usn)
|
|
return usn > 0
|
|
}
|
|
|
|
// CheckOnline 校验是否在线
|
|
func (l *Login) CheckOnline(user *User) string {
|
|
return redis.GetClient().HGet(l.ctx, fmt.Sprintf(global.KeyGatewayInfo, user.Cli.USN), global.HFieldInfoGatewaySID).Val()
|
|
}
|
|
|
|
// KickUser 把玩家踢下线
|
|
func (l *Login) KickUser(gatewaySID int64, usn int64) bool {
|
|
gc, err := grpc_client.GatewayNewClient(gatewaySID)
|
|
if err != nil {
|
|
log.Errorf("KickUser cannot find gateway client: %v, sid: %v", err, gatewaySID)
|
|
return false
|
|
}
|
|
_, err = gc.KickUser(l.ctx, &grpc_pb.KickUserReq{USN: usn})
|
|
if err != nil {
|
|
log.Errorf("KickUser err: %v, sid: %v, usn: %v", err, gatewaySID, usn)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// LoginSuccess 登录成功
|
|
func (l *Login) LoginSuccess(user *client.Client) {
|
|
user.OnEvent(&client.SystemLoginSuccessEvent{})
|
|
}
|