feat 初次提交

This commit is contained in:
2026-01-03 14:26:08 +08:00
parent ebeb244e1e
commit 887cb242e3
30 changed files with 1918 additions and 0 deletions

View File

@@ -0,0 +1,58 @@
package server
import (
"common/log"
"common/proto/sc/sc_pb"
"common/proto/ss/grpc_pb"
"gateway/internal/handler/ws_handler"
"google.golang.org/protobuf/proto"
"sync"
)
func (s *Server) ToClient(server grpc_pb.Gateway_ToClientServer) error {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
log.Errorf("Action panic: %v", err)
}
}()
for {
if args, err := server.Recv(); err != nil {
return
} else {
if args.USN == -1 {
//utils.WorkerPool(ws_handler.UserMgr.GetAllInterface(), func(task interface{}) {
// client := task.(*ws_handler.Client)
// client.WriteBytes(sc_pb.MessageID(args.MessageID), args.Payload)
//})
data, err := proto.Marshal(&sc_pb.Message{
ID: sc_pb.MessageID(args.MessageID),
Payload: args.Payload,
})
if err != nil {
log.Errorf("ToClient proto.Marshal error: %v", err)
continue
}
for _, client := range ws_handler.UserMgr.GetAll() {
client.WriteBytesPreMarshal(data)
}
//for _, client := range ws_handler.UserMgr.GetAll() {
// client.WriteBytes(sc_pb.MessageID(args.MessageID), args.Payload)
//}
} else {
if client := ws_handler.UserMgr.GetByUSN(args.USN); client != nil {
client.WriteBytes(sc_pb.MessageID(args.MessageID), args.Payload)
}
}
}
}
}()
wg.Wait()
return server.SendAndClose(&grpc_pb.ToClientResp{})
}

View File

@@ -0,0 +1,34 @@
package server
import (
"common/discover/common"
"common/net/grpc/service"
"common/proto/ss/grpc_pb"
"gateway/internal/handler/ws_handler"
"google.golang.org/grpc"
)
type Server struct {
grpc_pb.UnimplementedGatewayServer
service.Base
}
func NewServer(ttl int64) *Server {
s := &Server{
Base: service.Base{
Target: common.KeyDiscoverGateway,
EtcdTTL: ttl,
},
}
s.Base.OnInit = s.OnInit
s.Base.OnClose = s.OnClose
return s
}
func (s *Server) OnInit(serve *grpc.Server) {
ws_handler.GatewaySID = s.SID
grpc_pb.RegisterGatewayServer(serve, s)
}
func (s *Server) OnClose() {
}

View File

@@ -0,0 +1,65 @@
package stream_client
import (
"common/log"
"common/net/grpc/service"
"context"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
var sceneServerM map[int64]map[SceneFun]grpc.ClientStream // map[sid]map[方法名]流连接
type SceneFun int
const (
FunAction SceneFun = iota
)
func init() {
sceneServerM = make(map[int64]map[SceneFun]grpc.ClientStream)
}
func findSceneBySID(sid int64, fun SceneFun) (grpc.ClientStream, error) {
g := sceneServerM[sid]
if g == nil {
g = make(map[SceneFun]grpc.ClientStream)
sceneServerM[sid] = g
}
sceneLink := g[fun]
if sceneLink == nil {
sceneClient, err := service.SceneNewClient(sid)
if err != nil {
log.Errorf("cannot find sceneClient: %v", err)
return nil, err
}
var link grpc.ClientStream
switch fun {
case FunAction:
link, err = sceneClient.Action(context.Background())
}
if err != nil {
log.Errorf("findSceneBySID %v err: %v, sid: %v", fun, err, sid)
return nil, err
}
g[fun] = link
sceneLink = link
}
return sceneLink, nil
}
func SendMessageToScene(sid int64, fun SceneFun, msg proto.Message, re ...bool) error {
stream, err := findSceneBySID(sid, fun)
if err != nil {
return err
}
if err = stream.SendMsg(msg); err != nil {
if re == nil || !re[0] {
_ = stream.CloseSend()
delete(sceneServerM[sid], fun)
return SendMessageToScene(sid, fun, msg, true)
}
return err
}
return nil
}

View File

@@ -0,0 +1,118 @@
package http_handler
import (
"common/db/redis"
"common/log"
"common/net/grpc/service"
"common/net/http/http_resp"
"common/proto/ss/grpc_pb"
"common/utils"
"context"
"fmt"
"gateway/config"
"github.com/gin-gonic/gin"
"time"
)
// 这个模块处理用户登录
type LoginReq struct {
Phone string `json:"phone" binding:"required,min=1"`
Code string `json:"code" binding:"required,min=1"`
}
type LoginResp struct {
USN int64 `json:"usn"`
Name string `json:"name"`
AccessToken string `json:"accessToken"`
RefreshToken string `json:"refreshToken"`
}
func Login(c *gin.Context) {
req := &LoginReq{}
if err := c.ShouldBindJSON(req); err != nil {
http_resp.JsonBadRequest(c)
return
}
client, err := service.UserNewClientLB()
if err != nil {
log.Errorf("Login UserNewClientLB error: %v", err)
http_resp.JsonOK(c, http_resp.Error(http_resp.Failed))
return
}
login, err := client.Login(c, &grpc_pb.LoginReq{
Phone: req.Phone,
Code: req.Code,
})
if err != nil {
log.Errorf("Login Login error: %v", err)
http_resp.JsonOK(c, http_resp.Error(http_resp.Failed))
return
}
at, rt, err := genToken(c, login.USN)
http_resp.JsonOK(c, http_resp.Success(&LoginResp{
USN: login.USN,
Name: login.Name,
AccessToken: at,
RefreshToken: rt,
}))
}
type RefreshTokenReq struct {
RefreshToken string `json:"refreshToken" binding:"required,min=1"`
}
type RefreshTokenResp struct {
AccessToken string `json:"accessToken"`
RefreshToken string `json:"refreshToken"`
}
func RefreshToken(c *gin.Context) {
req := &RefreshTokenReq{}
if err := c.ShouldBindJSON(req); err != nil {
http_resp.JsonBadRequest(c)
return
}
claims, err := utils.ParseToken(req.RefreshToken, config.Get().Auth.Secret)
if err != nil {
http_resp.JsonOK(c, http_resp.Error(http_resp.TokenInvalid))
return
}
if redis.GetClient().Get(c, fmt.Sprintf(config.KeyUserRefreshToken, claims.USN)).String() != req.RefreshToken {
http_resp.JsonOK(c, http_resp.Error(http_resp.TokenInvalid))
return
}
at, rt, err := genToken(c, claims.USN)
if err != nil {
log.Errorf("RefreshToken genToken error: %v, usn: %v", err, claims.USN)
http_resp.JsonOK(c, http_resp.Error(http_resp.Failed))
return
}
http_resp.JsonOK(c, http_resp.Success(&RefreshTokenResp{
AccessToken: at,
RefreshToken: rt,
}))
}
func genToken(ctx context.Context, usn int64) (string, string, error) {
at, err := genTokenOne(ctx, config.KeyUserAccessToken, usn, 2*time.Hour)
if err != nil {
return "", "", err
}
rt, err := genTokenOne(ctx, config.KeyUserRefreshToken, usn, 3*24*time.Hour)
if err != nil {
return "", "", err
}
return at, rt, nil
}
func genTokenOne(ctx context.Context, key string, usn int64, ttl time.Duration) (string, error) {
token, err := utils.GenToken(usn, config.Get().Auth.Secret, time.Duration(config.Get().Auth.Expire)*time.Second)
if err != nil {
return "", err
}
redis.GetClient().Set(ctx, fmt.Sprintf(key, usn), token, ttl)
return token, err
}

View File

@@ -0,0 +1,27 @@
package http_handler
import (
"common/net/http/http_resp"
"github.com/gin-gonic/gin"
)
// 这个模块处理用户登录
type TestReq struct {
}
type TestResp struct {
Info string `json:"info"`
}
func Test(c *gin.Context) {
req := &TestReq{}
if err := c.ShouldBindJSON(req); err != nil {
http_resp.JsonBadRequest(c)
return
}
http_resp.JsonOK(c, http_resp.Success(&TestResp{
Info: "成功了",
}))
}

View File

@@ -0,0 +1,106 @@
package ws_handler
import (
"common/log"
"common/net/socket"
"context"
"fmt"
"go.uber.org/zap"
"runtime/debug"
"sync"
"time"
)
var GatewaySID int64
type Client struct {
sync.WaitGroup
conn socket.ISocketConn // Socket
mailChan chan Event // 邮箱队列
logger *zap.SugaredLogger // 日志
ctx context.Context // 上下文
cancel context.CancelFunc // 取消上下文
heartBeat time.Time // 最后一次心跳
USN int64 // 用户ID
SceneSID int64 // 场景服ID
InstanceID int32 // 副本ID副本类型
UniqueNo int64 // 副本唯一编号
}
func NewClient(usn int64, conn socket.ISocketConn) *Client {
client := &Client{
USN: usn,
conn: conn,
logger: log.GetLogger().Named(fmt.Sprintf("usn:%v", usn)),
heartBeat: time.Now(),
mailChan: make(chan Event, 1024),
}
client.ctx, client.cancel = context.WithCancel(context.Background())
client.Add(1)
go client.Loop()
return client
}
func (c *Client) Loop() {
defer func() {
if err := recover(); err != nil {
c.logger.Errorf("Client Loop err: %v", err)
debug.PrintStack()
}
}()
defer c.onClose()
t := time.NewTicker(20 * time.Second)
defer t.Stop()
for {
select {
case <-c.ctx.Done():
return
case evt, ok := <-c.mailChan:
if ok {
c.handle(evt)
}
case <-t.C:
_ = c.conn.Ping()
if time.Now().Sub(c.heartBeat) > 60*time.Second {
return
}
}
}
}
func (c *Client) OnEvent(event Event) {
defer func() {
if err := recover(); err != nil {
c.logger.Warnf(fmt.Sprintf("send event chan error: %v", err))
}
}()
select {
case c.mailChan <- event:
default:
c.logger.Warnf("Client mailChan full")
}
}
// CloseClient 关闭客户端同步会等待onClose执行完成
func (c *Client) CloseClient() {
if c.cancel != nil {
c.cancel()
c.Wait()
}
}
// onClose 客户端关闭自动触发
func (c *Client) onClose() {
if c.conn != nil {
_ = c.conn.Close()
c.conn = nil
}
if c.mailChan != nil {
close(c.mailChan)
c.mailChan = nil
}
UserMgr.Delete(c.USN)
c.onLeave()
c.Done()
}

View File

@@ -0,0 +1,57 @@
package ws_handler
import (
"common/proto/sc/sc_pb"
"google.golang.org/protobuf/proto"
)
// WriteMessage 向客户端发送消息
func (c *Client) WriteMessage(id sc_pb.MessageID, data proto.Message) {
if c.conn == nil || c.conn.IsClose() {
return
}
d, err := proto.Marshal(data)
if err != nil {
c.logger.Errorf("WriteMessage proto.Marshal err: %v", err)
return
}
m, err := proto.Marshal(&sc_pb.Message{
ID: id,
Payload: d,
})
if err != nil {
c.logger.Errorf("WriteMessage proto.Marshal err: %v", err)
return
}
if err = c.conn.Write(m); err != nil {
c.logger.Errorf("WriteMessage err: %v", err)
}
}
// WriteBytes 向客户端发送字节数据
func (c *Client) WriteBytes(id sc_pb.MessageID, data []byte) {
if c.conn == nil || c.conn.IsClose() {
return
}
m, err := proto.Marshal(&sc_pb.Message{
ID: id,
Payload: data,
})
if err != nil {
c.logger.Errorf("WriteBytes proto.Marshal err: %v", err)
return
}
if err = c.conn.Write(m); err != nil {
c.logger.Errorf("WriteBytes err: %v", err)
}
}
// WriteBytesPreMarshal 向客户端发送字节数据(需要预先打包,适合广播相同数据)
func (c *Client) WriteBytesPreMarshal(data []byte) {
if c.conn == nil || c.conn.IsClose() {
return
}
if err := c.conn.Write(data); err != nil {
c.logger.Errorf("WriteBytes err: %v", err)
}
}

View File

@@ -0,0 +1,13 @@
package ws_handler
type Event interface {
}
type ClientEvent struct {
Event
Msg []byte
}
type PongEvent struct {
Event
}

View File

@@ -0,0 +1,97 @@
package ws_handler
import (
"common/net/grpc/service"
"common/proto/sc/sc_pb"
"common/proto/ss/grpc_pb"
"gateway/internal/grpc_server/stream_client"
"google.golang.org/protobuf/proto"
"time"
)
func (c *Client) handle(event Event) {
switch e := event.(type) {
case *ClientEvent:
msg := &sc_pb.Message{}
if err := proto.Unmarshal(e.Msg, msg); err != nil {
c.logger.Errorf("handle event proto.Unmarshal err: %v", err)
c.cancel()
return
}
switch msg.ID {
case sc_pb.MessageID_MESSAGE_ID_ENTER_INSTANCE:
m := &sc_pb.C2S_EnterInstance{}
if err := proto.Unmarshal(msg.Payload, m); err != nil {
c.logger.Errorf("handle event proto.Unmarshal err: %v", err)
c.cancel()
return
}
c.onEnter(m)
case sc_pb.MessageID_MESSAGE_ID_ACTION:
m := &sc_pb.C2S_Action{}
if err := proto.Unmarshal(msg.Payload, m); err != nil {
c.logger.Errorf("handle event proto.Unmarshal err: %v", err)
c.cancel()
return
}
c.onAction(m)
}
case *PongEvent:
c.heartBeat = time.Now()
}
}
func (c *Client) onEnter(msg *sc_pb.C2S_EnterInstance) {
client, err := service.SceneNewClientLB()
if err != nil {
c.logger.Errorf("SceneNewClient err: %v", err)
return
}
resp, err := client.Enter(c.ctx, &grpc_pb.EnterReq{
USN: c.USN,
GatewaySID: GatewaySID,
InstanceID: msg.InstanceID,
})
if err != nil {
c.logger.Errorf("enter err: %v", err)
return
}
c.SceneSID = resp.SceneSID
c.UniqueNo = resp.UniqueNo
c.InstanceID = msg.InstanceID
c.WriteBytes(sc_pb.MessageID(resp.MessageID), resp.Payload)
}
func (c *Client) onLeave() {
client, err := service.SceneNewClient(c.SceneSID)
if err != nil {
c.logger.Errorf("SceneNewClient err: %v", err)
return
}
_, err = client.Leave(c.ctx, &grpc_pb.LeaveReq{
USN: c.USN,
GatewaySID: GatewaySID,
InstanceID: c.InstanceID,
UniqueNo: c.UniqueNo,
})
if err != nil {
c.logger.Errorf("leave err: %v", err)
return
}
}
func (c *Client) onAction(msg *sc_pb.C2S_Action) {
if c.SceneSID == 0 {
return
}
if err := stream_client.SendMessageToScene(c.SceneSID, stream_client.FunAction, &grpc_pb.ActionReq{
UniqueNo: c.UniqueNo,
USN: c.USN,
Action: int32(msg.Action),
DirX: msg.DirX,
DirY: msg.DirY,
SkillID: msg.SkillID,
}); err != nil {
c.logger.Errorf("send action err: %v", err)
}
}

View File

@@ -0,0 +1,58 @@
package ws_handler
import (
"sync"
)
var UserMgr *userManager
type userManager struct {
userMap map[int64]*Client
sync.RWMutex
}
func init() {
UserMgr = &userManager{
userMap: make(map[int64]*Client),
}
}
func (m *userManager) Add(usn int64, client *Client) {
m.Lock()
defer m.Unlock()
m.userMap[usn] = client
}
func (m *userManager) Delete(usn int64) {
m.Lock()
defer m.Unlock()
delete(m.userMap, usn)
}
func (m *userManager) GetAll() map[int64]*Client {
m.RLock()
defer m.RUnlock()
copyMap := make(map[int64]*Client, len(m.userMap))
for k, v := range m.userMap {
copyMap[k] = v
}
return copyMap
}
func (m *userManager) GetAllInterface() []interface{} {
m.RLock()
defer m.RUnlock()
r := make([]interface{}, 0)
for _, v := range m.userMap {
r = append(r, v)
}
return r
}
func (m *userManager) GetByUSN(usn int64) *Client {
m.RLock()
defer m.RUnlock()
return m.userMap[usn]
}

View File

@@ -0,0 +1,78 @@
package http_gateway
import (
"common/net/http/http_resp"
"common/utils"
"fmt"
"gateway/config"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"strconv"
"strings"
"time"
)
func corsConfig() cors.Config {
return cors.Config{
AllowMethods: []string{"GET", "POST", "OPTIONS"},
AllowHeaders: []string{"Content-Type", "Authorization"},
AllowCredentials: false,
AllowAllOrigins: true,
MaxAge: 12 * time.Hour,
}
}
func ginLogger(logger *zap.SugaredLogger) gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
path := c.Request.URL.Path
c.Next()
cost := time.Since(start)
logger.Infof(fmt.Sprintf(
"HTTP Method:%v Code:%v Time:%v IP:%v Path:%v",
c.Request.Method,
c.Writer.Status(),
cost,
c.ClientIP(),
path),
)
}
}
func authJwt() gin.HandlerFunc {
return func(c *gin.Context) {
// 如果是Public接口有Token就读没有就算了
public := false
for _, path := range config.PublicPaths {
if strings.HasPrefix(c.Request.URL.Path, path) {
public = true
break
}
}
token := strings.TrimPrefix(c.GetHeader("Authorization"), "Bearer ")
if token == "" {
if public {
c.Next()
return
}
http_resp.AbortUnauthorized(c)
return
}
claims, err := utils.ParseToken(token, config.Get().Auth.Secret)
if err != nil {
if public {
c.Next()
return
}
http_resp.AbortUnauthorized(c)
return
}
// 这里将Header写到请求中grpc-gateway框架会读取然后传给grpc服务
c.Request.Header.Set("X-Usn", strconv.Itoa(int(claims.USN)))
c.Next()
}
}

View File

@@ -0,0 +1,93 @@
package http_gateway
import (
"common/log"
"common/net/grpc/service"
"common/net/http/http_resp"
"common/proto/ss/grpc_pb"
"context"
"gateway/internal/handler/http_handler"
"gateway/internal/net/http_gateway/wrapper"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/protobuf/encoding/protojson"
)
func InitServeMux() *runtime.ServeMux {
baseMarshaler := &runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{
UseProtoNames: false,
EmitUnpopulated: true,
},
UnmarshalOptions: protojson.UnmarshalOptions{
DiscardUnknown: true,
},
}
unifiedMarshaler := wrapper.NewWrappedMarshaler(baseMarshaler)
mux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, unifiedMarshaler),
runtime.WithErrorHandler(wrapper.ErrorHandler),
runtime.WithIncomingHeaderMatcher(func(header string) (string, bool) {
if header == "X-Usn" {
return "X-Usn", true
}
return runtime.DefaultHeaderMatcher(header)
}),
)
return mux
}
func InitRouter() *gin.Engine {
gin.SetMode(gin.ReleaseMode)
r := gin.New()
r.Use(
gin.Recovery(),
ginLogger(log.GetLogger().Named("GIN")),
cors.New(corsConfig()),
)
r.HandleMethodNotAllowed = true
r.NoMethod(func(c *gin.Context) {
http_resp.JsonMethodNotAllowed(c)
})
r.NoRoute(func(c *gin.Context) {
http_resp.JsonNotFound(c)
})
initBaseRoute(r.Group("/"))
auth := r.Group("/")
auth.Use(authJwt())
// 用户中心
initUserPath(auth)
return r
}
func initBaseRoute(r *gin.RouterGroup) {
g := r.Group("/gw")
g.POST("/login", http_handler.Login)
g.POST("/refresh_token", http_handler.RefreshToken)
g.GET("/test", http_handler.Test)
}
func initUserPath(r *gin.RouterGroup) {
g := r.Group("/user")
client, err := service.UserNewClientLB()
if err != nil {
log.Errorf("get user conn failed: %v", err)
return
}
gwMux := InitServeMux()
if err = grpc_pb.RegisterUserHandlerClient(context.Background(), gwMux, client); err != nil {
log.Errorf("RegisterUserHandlerClient err: %v", err)
return
}
g.Any("/*path", gin.WrapH(gwMux))
}

View File

@@ -0,0 +1,57 @@
package wrapper
import (
"common/net/http/http_resp"
"common/proto/ss/ss_common"
"context"
"encoding/json"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"net/http"
)
// ErrorHandler 将 gRPC 错误转为统一 JSON 格式
func ErrorHandler(_ context.Context, _ *runtime.ServeMux, _ runtime.Marshaler, w http.ResponseWriter, _ *http.Request, err error) {
st, ok := status.FromError(err)
if !ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(http_resp.Error(http_resp.Failed))
return
}
code, msg := 0, ""
for _, detail := range st.Details() {
if errorInfo, ok := detail.(*ss_common.ErrorInfo); ok {
code = int(errorInfo.Code)
msg = errorInfo.Msg
break
}
}
if code == 0 {
code = http_resp.Failed.Code()
msg = http_resp.Failed.Error()
}
if st.Code() == codes.Unknown ||
st.Code() == codes.Unimplemented ||
st.Code() == codes.NotFound {
msg = st.Message()
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(grpcCodeToHTTPCode(st.Code()))
_ = json.NewEncoder(w).Encode(http_resp.Error(http_resp.NewCode(code, msg)))
}
// 这里定义 Internal 属于业务错误,其他的属于 500 报错
func grpcCodeToHTTPCode(c codes.Code) int {
switch c {
case codes.OK, codes.Unknown:
return http.StatusOK
case codes.Unimplemented, codes.NotFound:
return http.StatusNotFound
default:
return http.StatusInternalServerError
}
}

View File

@@ -0,0 +1,42 @@
package wrapper
import (
"common/net/http/http_resp"
"encoding/json"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"io"
)
// WrappedMarshaler 自动包装响应为 { code, message, data }
type WrappedMarshaler struct {
inner runtime.Marshaler
}
func NewWrappedMarshaler(inner runtime.Marshaler) *WrappedMarshaler {
return &WrappedMarshaler{inner: inner}
}
// Marshal 将 gRPC 响应包装成统一格式
func (w *WrappedMarshaler) Marshal(v interface{}) ([]byte, error) {
dataBytes, err := w.inner.Marshal(v)
if err != nil {
return json.Marshal(http_resp.Error(http_resp.Failed))
}
return json.Marshal(http_resp.Success(json.RawMessage(dataBytes)))
}
func (w *WrappedMarshaler) Unmarshal(data []byte, v interface{}) error {
return w.inner.Unmarshal(data, v)
}
func (w *WrappedMarshaler) NewDecoder(r io.Reader) runtime.Decoder {
return w.inner.NewDecoder(r)
}
func (w *WrappedMarshaler) NewEncoder(wr io.Writer) runtime.Encoder {
return w.inner.NewEncoder(wr)
}
func (w *WrappedMarshaler) ContentType(v interface{}) string {
return "application/json"
}

View File

@@ -0,0 +1,74 @@
package ws_gateway
import (
"common/log"
"common/net/socket"
"common/utils"
"fmt"
"gateway/internal/handler/ws_handler"
"go.uber.org/zap"
"strconv"
"time"
)
type GatewayWsServer struct {
logger *zap.SugaredLogger
}
func (g *GatewayWsServer) OnOpen(conn socket.ISocketConn) ([]byte, socket.Action) {
g.logger = log.GetLogger().Named(fmt.Sprintf("addr:%v", conn.RemoteAddr()))
return nil, socket.None
}
func (g *GatewayWsServer) OnHandShake(conn socket.ISocketConn, bytes []byte, callback func(conn socket.ISocketConn, bytes []byte)) socket.Action {
token, ok := conn.GetParam("token").(string)
if !ok {
g.logger.Warnf("token is not string")
return socket.Close
}
//claims, err := utils.ParseToken(token, config.Get().Auth.Secret)
//if err != nil {
// g.logger.Warnf("token is invalid")
// return socket.Close
//}
t, _ := strconv.Atoi(token)
claims := utils.Claims{
USN: int64(t),
}
go func(shResp []byte) {
if oldClient := ws_handler.UserMgr.GetByUSN(claims.USN); oldClient != nil {
oldClient.CloseClient()
}
client := ws_handler.NewClient(claims.USN, conn)
ws_handler.UserMgr.Add(claims.USN, client)
conn.SetParam("client", client)
callback(conn, shResp)
}(bytes)
return socket.None
}
func (g *GatewayWsServer) OnMessage(conn socket.ISocketConn, bytes []byte) socket.Action {
client, ok := conn.GetParam("client").(*ws_handler.Client)
if !ok || client.USN == 0 {
return socket.Close
}
client.OnEvent(&ws_handler.ClientEvent{Msg: bytes})
return socket.None
}
func (g *GatewayWsServer) OnPong(conn socket.ISocketConn) {
client, ok := conn.GetParam("client").(*ws_handler.Client)
if !ok || client.USN == 0 {
return
}
client.OnEvent(&ws_handler.PongEvent{})
}
func (g *GatewayWsServer) OnClose(_ socket.ISocketConn, _ error) socket.Action {
return socket.Close
}
func (g *GatewayWsServer) OnTick() (time.Duration, socket.Action) {
return 5 * time.Second, socket.None
}