加入网络层

This commit is contained in:
2025-06-26 23:57:54 +08:00
parent 53106465ed
commit 0f29dccec4
57 changed files with 1859 additions and 1274 deletions

View File

@@ -8,7 +8,7 @@ const (
)
var (
KeyDiscover = "discover"
KeyDiscover = "xh-discover"
KeyDiscoverService = KeyDiscover + "/service"
)

View File

@@ -1,41 +0,0 @@
package grpc_client
import (
"common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"time"
)
type GrpcConnection struct {
sid string
conn *grpc.ClientConn
}
func NewGrpcConnection(sid, address string) (*GrpcConnection, error) {
p := &GrpcConnection{
sid: sid,
}
conn, err := grpc.Dial(
address,
grpc.WithInsecure(),
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: 30 * time.Second, // 保活探测包发送的时间间隔
Timeout: 10 * time.Second, // 保活探测包的超时时间
PermitWithoutStream: true,
},
),
//grpc.WithStatsHandler(&StatsHandler{}),
)
if err != nil {
log.Errorf("create grpc err: %v, sid: %v, addr: %v", err, sid, address)
return nil, err
}
p.conn = conn
return p, nil
}
func (g *GrpcConnection) GetConnection() *grpc.ClientConn {
return g.conn
}

View File

@@ -1,62 +0,0 @@
package grpc_client
import (
"common/log"
"fmt"
"google.golang.org/grpc"
"math/rand"
)
type GrpcConnectionMgr struct {
poolM map[string]*GrpcConnection
poolS []*GrpcConnection
}
func NewGrpcConnectionMgr() *GrpcConnectionMgr {
return &GrpcConnectionMgr{
poolM: make(map[string]*GrpcConnection),
poolS: make([]*GrpcConnection, 0),
}
}
func (p *GrpcConnectionMgr) Store(sid, addr string) {
pool, err := NewGrpcConnection(sid, addr)
if err != nil {
log.Errorf("create grpc err: %v, sid: %v, addr: %v", err, sid, addr)
return
}
p.poolM[sid] = pool
p.poolS = append(p.poolS, pool)
}
func (p *GrpcConnectionMgr) Delete(sid string) int {
delete(p.poolM, sid)
for i, pool := range p.poolS {
if pool.sid == sid {
p.poolS = append(p.poolS[:i], p.poolS[i+1:]...)
break
}
}
return len(p.poolS)
}
func (p *GrpcConnectionMgr) Load(sid ...string) (*grpc.ClientConn, error) {
var pool *GrpcConnection
if len(sid) > 0 && len(sid[0]) > 0 {
pool = p.poolM[sid[0]]
} else {
pool = p.poolS[rand.Intn(len(p.poolS))]
}
if pool == nil {
return nil, fmt.Errorf("cannot find connection")
}
return pool.GetConnection(), nil
}
func (p *GrpcConnectionMgr) LoadAll() map[string]*grpc.ClientConn {
sidM := make(map[string]*grpc.ClientConn)
for sid, pool := range p.poolM {
sidM[sid] = pool.GetConnection()
}
return sidM
}

View File

@@ -1,45 +0,0 @@
package grpc_client
import (
"context"
"fmt"
"google.golang.org/grpc/stats"
)
// 1. 实现 stats.Handler 接口
type StatsHandler struct{}
func (h *StatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
// 给 RPC 调用打标签(例如记录方法名)
return context.WithValue(ctx, "rpc_method", info.FullMethodName)
}
func (h *StatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
// 处理 RPC 统计信息
switch t := s.(type) {
case *stats.Begin:
fmt.Printf("RPC started: %s\n", ctx.Value("rpc_method"))
case *stats.End:
fmt.Printf("RPC finished: %s (duration: %v)\n",
ctx.Value("rpc_method"), t.EndTime.Sub(t.BeginTime))
case *stats.InPayload:
fmt.Printf("Received %d bytes\n", t.WireLength)
case *stats.OutPayload:
fmt.Printf("Sent %d bytes\n", t.WireLength)
}
}
func (h *StatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
// 给连接打标签
return ctx
}
func (h *StatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
// 处理连接事件
switch s.(type) {
case *stats.ConnBegin:
fmt.Println("Connection established")
case *stats.ConnEnd:
fmt.Println("Connection closed")
}
}

View File

@@ -66,7 +66,7 @@ func Listen() {
// 服务发生变化
func onServerChange(t mvccpb.Event_EventType, key, value string) {
split := strings.Split(key, "/")
if len(split) != 5 {
if len(split) != 4 {
return
}
switch t {

View File

@@ -3,8 +3,8 @@ package discover
import (
"common/db/etcd"
"common/discover/common"
"common/discover/grpc_client"
"common/log"
"common/net/grpc/grpc_conn"
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
@@ -15,7 +15,7 @@ import (
// 大量读少量写的情况下读写锁比同步Map更高效
var (
serverMU = sync.RWMutex{}
conn = make(map[string]*grpc_client.GrpcConnectionMgr)
conn = make(map[string]*grpc_conn.GrpcConnectionMgr)
serverLeaseM = make(map[string]clientv3.LeaseID)
)
@@ -80,7 +80,7 @@ func onServerStart(data any) {
if v, ok := conn[provider.Target]; ok {
v.Store(provider.SID, provider.Addr)
} else {
mgr := grpc_client.NewGrpcConnectionMgr()
mgr := grpc_conn.NewGrpcConnectionMgr()
mgr.Store(provider.SID, provider.Addr)
conn[provider.Target] = mgr
}

View File

@@ -1,24 +0,0 @@
package service
import (
"common/discover"
"common/discover/common"
"common/discover/service/game/game_pb"
)
func GatewayNewClient(sid ...string) (game_pb.GameClient, error) {
c, err := discover.FindServer(common.KeyDiscoverGateway, sid...)
if err != nil {
return nil, err
}
return game_pb.NewGameClient(c), nil
}
func GatewayNewBroadcastClient() map[string]game_pb.GameClient {
clientM := make(map[string]game_pb.GameClient)
connM := discover.FindServerAll(common.KeyDiscoverGateway)
for sid, conn := range connM {
clientM[sid] = game_pb.NewGameClient(conn)
}
return clientM
}

View File

@@ -1,94 +0,0 @@
package service
import (
"common/discover"
"common/discover/common"
"common/log"
"common/utils"
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"os"
"strconv"
"sync"
"time"
)
type IService interface {
Init(addr string)
Close()
}
type ServiceBase struct {
Target string
SID string
Serve *grpc.Server
EtcdTTL int64
OnInit func(serve *grpc.Server)
OnClose func()
wg *sync.WaitGroup
}
func (s *ServiceBase) Init(addr string) {
s.wg = &sync.WaitGroup{}
s.wg.Add(1)
s.SID = utils.SnowflakeInstance().Generate().String()
go func() {
defer s.wg.Done()
defer s.OnClose()
defer discover.UnRegisterGrpcServer(s.SID)
pMin, _ := strconv.ParseInt(os.Getenv("P_MIN"), 10, 64)
pMax, _ := strconv.ParseInt(os.Getenv("P_MAX"), 10, 64)
if pMin == 0 || pMax == 0 || pMin > pMax {
log.Errorf(" %v init err: pMin or pMax is 0 or pMin > pMax", s.Target)
return
}
// 服务注册
lis, port, err := common.ListenRandomPort(pMin, pMax)
if lis == nil {
log.Errorf(" %v ListenRandomPort err: %v", s.Target, err)
return
}
s.Serve = grpc.NewServer(
grpc.UnaryInterceptor(
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("server Panic: %v", r)
}
}()
resp, err = handler(ctx, req)
return
},
),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 20 * time.Second,
PermitWithoutStream: true,
}),
)
s.OnInit(s.Serve)
// 服务注册
if err = discover.RegisterGrpcServer(s.Target, s.SID, fmt.Sprintf("%v:%d", addr, port), s.EtcdTTL); err != nil {
log.Errorf("%v RegisterGrpcServer err: %v", s.Target, err)
return
}
if err = s.Serve.Serve(lis); err != nil {
log.Errorf(" %v Serve err: %v", s.Target, err)
return
}
log.Infof("%v server stop.", s.Target)
}()
}
func (s *ServiceBase) Close() {
if s.Serve != nil {
s.Serve.Stop()
s.wg.Wait()
}
}