网络层

This commit is contained in:
2025-06-28 17:38:22 +08:00
parent 54dc7ba173
commit 605197345b
20 changed files with 482 additions and 376 deletions

View File

@@ -15,19 +15,33 @@ const (
Shutdown
)
type OpCode byte
const (
OpContinuation OpCode = 0x0
OpText OpCode = 0x1
OpBinary OpCode = 0x2
OpClose OpCode = 0x8
OpPing OpCode = 0x9
OpPong OpCode = 0xa
)
// ISocketServer 由应用层实现
type ISocketServer interface {
OnOpen(ISocketConn) ([]byte, Action) // 开启连接
OnHandShake(ISocketConn) // 开始握手
OnMessage(ISocketConn, []byte) Action // 收到消息
OnClose(ISocketConn, error) Action // 关闭连接
OnPong(ISocketConn)
OnClose(ISocketConn, error) Action // 关闭连接
OnTick() (time.Duration, Action)
}
// ISocketConn 由网络层实现
type ISocketConn interface {
GetParam(key string) string
SetParam(key string, values string)
GetParam(key string) interface{}
SetParam(key string, values interface{})
RemoteAddr() string
Ping() error // 需要隔一段时间调用一下建议20s
Write(data []byte) error
Close() error
}

View File

@@ -66,7 +66,8 @@ func (s *WSServer) OnOpen(c gnet.Conn) ([]byte, gnet.Action) {
curHeader: nil,
cachedBuf: bytes.Buffer{},
},
param: make(map[string]string),
param: make(map[string]interface{}),
remoteAddr: c.RemoteAddr().String(),
}
c.SetContext(ws)
s.unUpgradeConn.Store(c.RemoteAddr().String(), ws)
@@ -78,10 +79,10 @@ func (s *WSServer) OnOpen(c gnet.Conn) ([]byte, gnet.Action) {
// The parameter err is the last known connection error.
func (s *WSServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
s.unUpgradeConn.Delete(c.RemoteAddr().String())
tmp := c.Context()
ws, ok := tmp.(*WSConn)
ws, ok := c.Context().(*WSConn)
if ok {
s.logger.Warnf("connection close")
ws.isClose = true
ws.logger.Warnf("connection close: %v --------------------------------------------------", err)
return gnet.Action(s.i.OnClose(ws, err))
}
return
@@ -91,18 +92,14 @@ func (s *WSServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
func (s *WSServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
tmp := c.Context()
if tmp == nil {
if s.logger != nil {
s.logger.Errorf("OnTraffic context nil: %v", c)
}
s.logger.Errorf("OnTraffic context nil: %v", c)
action = gnet.Close
return
}
ws, ok := tmp.(*WSConn)
if !ok {
if s.logger != nil {
s.logger.Errorf("OnTraffic convert ws error: %v", tmp)
}
ws.logger.Errorf("OnTraffic convert ws error: %v", tmp)
action = gnet.Close
return
}
@@ -121,9 +118,7 @@ func (s *WSServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
if data != nil {
err := ws.Conn.AsyncWrite(data, nil)
if err != nil {
if ws.logger != nil {
ws.logger.Errorf("update ws write upgrade protocol error", err)
}
ws.logger.Errorf("update ws write upgrade protocol error", err)
action = gnet.Close
}
}
@@ -131,14 +126,22 @@ func (s *WSServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
} else {
msg, err := ws.readWsMessages()
if err != nil {
if ws.logger != nil {
ws.logger.Errorf("read ws messages errors", err)
}
ws.logger.Errorf("read ws messages errors", err)
return gnet.Close
}
if msg != nil {
for _, m := range msg {
if socket.OpCode(m.OpCode) == socket.OpPong {
s.i.OnPong(ws)
continue
}
if socket.OpCode(m.OpCode) == socket.OpClose {
return gnet.Close
}
if socket.OpCode(m.OpCode) == socket.OpPing {
continue
}
a := s.i.OnMessage(ws, m.Payload)
if gnet.Action(a) != gnet.None {
action = gnet.Action(a)
@@ -182,10 +185,7 @@ func (s *WSServer) OnTick() (delay time.Duration, action gnet.Action) {
continue
}
if err := v.Close(); err != nil {
if s.logger != nil {
s.logger.Errorf("upgrade ws time out close socket error: %v", err)
continue
}
v.logger.Errorf("upgrade ws time out close socket error: %v", err)
}
}
d, a := s.i.OnTick()

View File

@@ -15,12 +15,13 @@ import (
// WSConn 实现ISocketConn接口
type WSConn struct {
gnet.Conn
buf bytes.Buffer
logger logging.Logger
isUpgrade bool
isClose bool
param map[string]string
openTime int64
buf bytes.Buffer
logger logging.Logger
isUpgrade bool
isClose bool
param map[string]interface{}
openTime int64
remoteAddr string
wsMessageBuf
}
@@ -169,19 +170,24 @@ func (w *WSConn) OnRequest(u []byte) error {
return nil
}
func (w *WSConn) GetParam(key string) string {
func (w *WSConn) GetParam(key string) interface{} {
return w.param[key]
}
func (w *WSConn) SetParam(key string, values string) {
func (w *WSConn) SetParam(key string, values interface{}) {
w.param[key] = values
}
func (w *WSConn) RemoteAddr() string {
return w.remoteAddr
}
func (w *WSConn) Write(data []byte) error {
if w.isClose {
return errors.New("connection has close")
}
return w.write(data, ws.OpBinary)
return w.write(data, ws.OpText)
}
func (w *WSConn) Ping() (err error) {
return w.write(make([]byte, 0), ws.OpPing)
}
func (w *WSConn) Close() (err error) {
@@ -192,74 +198,12 @@ func (w *WSConn) Close() (err error) {
}
func (w *WSConn) write(data []byte, opCode ws.OpCode) error {
if w.isClose {
return errors.New("connection has close")
}
buf := bytes.Buffer{}
err := wsutil.WriteServerMessage(&buf, opCode, data)
if err != nil {
if err := wsutil.WriteServerMessage(&buf, opCode, data); err != nil {
return err
}
return w.Conn.AsyncWrite(buf.Bytes(), nil)
}
//func (w *WSConn) Pong(data []byte) (err error) {
// buf := bytes.Buffer{}
// if data == nil {
// err = wsutil.WriteServerMessage(&buf, ws.OpPong, emptyPayload)
// } else {
// err = wsutil.WriteServerMessage(&buf, ws.OpPong, data)
// }
// if w.isClose {
// return errors.New("connection has close")
// }
// if err != nil {
// return
// }
// return w.Conn.AsyncWrite(buf.Bytes(), nil)
//}
//func (w *WSConn) Ping(data []byte) (err error) {
// buf := bytes.Buffer{}
// if data == nil {
// err = wsutil.WriteServerMessage(&buf, ws.OpPing, emptyPayload)
// } else {
// err = wsutil.WriteServerMessage(&buf, ws.OpPing, data)
// }
// if w.isClose {
// return errors.New("connection has close")
// }
// if err != nil {
// return err
// }
//
// return w.Conn.AsyncWrite(buf.Bytes(), nil)
//}
//func (w *WSConn) GetProperty(key string) (interface{}, error) {
// if w.context[key] == nil {
// return nil, errors.New("not found this key")
// }
// return w.context[key], nil
//}
//
//func (w *WSConn) SetProperty(key string, ctx interface{}) {
// w.context[key] = ctx
//}
//
//func (w *WSConn) RemoveProperty(key string) {
// delete(w.context, key)
//}
//
//func (w *WSConn) GetConnID() uint64 {
// return w.connID
//}
//
//func (w *WSConn) Clear() {
// w.context = make(map[string]interface{})
//}
//
//func (w *WSConn) GetUrlParam() map[string][]string {
// return w.urlParma
//}
//
//func (w *WSConn) SetUrlParam(key string, values []string) {
// w.urlParma[key] = values
//}