feat 结构调整

This commit is contained in:
2025-12-20 15:39:25 +08:00
parent 55c5d4cc18
commit ff1bd1d0b6
96 changed files with 4904 additions and 350 deletions

View File

@@ -3,6 +3,7 @@ package grpc_conn
import (
"common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"time"
)
@@ -18,7 +19,7 @@ func NewGrpcConnection(sid int64, address string) (*GrpcConnection, error) {
}
conn, err := grpc.NewClient(
address,
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: 30 * time.Second, // 保活探测包发送的时间间隔

View File

@@ -1,45 +0,0 @@
package grpc_conn
import (
"context"
"fmt"
"google.golang.org/grpc/stats"
)
// 1. 实现 stats.Handler 接口
type StatsHandler struct{}
func (h *StatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
// 给 RPC 调用打标签(例如记录方法名)
return context.WithValue(ctx, "rpc_method", info.FullMethodName)
}
func (h *StatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
// 处理 RPC 统计信息
switch t := s.(type) {
case *stats.Begin:
fmt.Printf("RPC started: %s\n", ctx.Value("rpc_method"))
case *stats.End:
fmt.Printf("RPC finished: %s (duration: %v)\n",
ctx.Value("rpc_method"), t.EndTime.Sub(t.BeginTime))
case *stats.InPayload:
fmt.Printf("Received %d bytes\n", t.WireLength)
case *stats.OutPayload:
fmt.Printf("Sent %d bytes\n", t.WireLength)
}
}
func (h *StatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
// 给连接打标签
return ctx
}
func (h *StatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
// 处理连接事件
switch s.(type) {
case *stats.ConnBegin:
fmt.Println("Connection established")
case *stats.ConnEnd:
fmt.Println("Connection closed")
}
}

View File

@@ -0,0 +1,30 @@
package resolver
import (
"common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"time"
)
func NewGrpcConnection(target string) (*grpc.ClientConn, error) {
cc, err := grpc.NewClient(
target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin": {}}]}`),
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: 30 * time.Second, // 保活探测包发送的时间间隔
Timeout: 10 * time.Second, // 保活探测包的超时时间
PermitWithoutStream: true,
},
),
//grpc.WithStatsHandler(&StatsHandler{}),
)
if err != nil {
log.Errorf("create grpc err: %v, target: %v", err, target)
return nil, err
}
return cc, nil
}

View File

@@ -0,0 +1,84 @@
package resolver
import (
"common/db/etcd"
"common/discover/common"
"common/log"
"context"
"go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
"strings"
)
const etcdSchema = "etcd"
func init() {
resolver.Register(&etcdBuilder{})
}
type etcdBuilder struct{}
func (*etcdBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
serviceName := strings.TrimPrefix(target.URL.Path, "/")
r := &etcdResolver{
serviceName: serviceName,
cc: cc,
}
r.start()
return r, nil
}
func (*etcdBuilder) Scheme() string { return etcdSchema }
type etcdResolver struct {
serviceName string
cc resolver.ClientConn
ctx context.Context
cancel context.CancelFunc
}
func (r *etcdResolver) start() {
r.ctx, r.cancel = context.WithCancel(context.Background())
r.update()
go r.watch()
}
func (r *etcdResolver) getPrefix() string {
return common.KeyDiscoverService + "/" + r.serviceName
}
func (r *etcdResolver) update() {
resp, err := etcd.GetClient().Get(r.getPrefix(), clientv3.WithPrefix())
if err != nil {
log.Errorf("etcd resolver get error: %v", err)
return
}
var addrArray []resolver.Address
for _, kv := range resp.Kvs {
addr := string(kv.Value)
if addr != "" {
addrArray = append(addrArray, resolver.Address{Addr: addr})
}
}
_ = r.cc.UpdateState(resolver.State{Addresses: addrArray})
}
func (r *etcdResolver) watch() {
watchCh := etcd.GetClient().Watch(r.getPrefix(), clientv3.WithPrefix())
for w := range watchCh {
if w.Err() != nil {
continue
}
r.update()
}
}
func (r *etcdResolver) ResolveNow(resolver.ResolveNowOptions) {
r.update()
}
func (r *etcdResolver) Close() {
r.cancel()
}

View File

@@ -0,0 +1,35 @@
package resolver
import (
"google.golang.org/grpc"
"sync"
)
var (
mu = sync.RWMutex{}
conn = make(map[string]*grpc.ClientConn)
)
func GetGrpcClientConn(target string) (*grpc.ClientConn, error) {
mu.RLock()
if c, ok := conn[target]; ok && c != nil {
mu.RUnlock()
return c, nil
}
mu.RUnlock()
mu.Lock()
defer mu.Unlock()
if c, ok := conn[target]; ok && c != nil {
return c, nil
}
newConn, err := NewGrpcConnection(target)
if err != nil {
return nil, err
}
conn[target] = newConn
return newConn, nil
}

View File

@@ -3,6 +3,7 @@ package service
import (
"common/discover"
"common/discover/common"
"common/net/grpc/resolver"
"common/proto/ss/grpc_pb"
)
@@ -14,6 +15,14 @@ func SceneNewClient(sid ...int64) (grpc_pb.SceneClient, error) {
return grpc_pb.NewSceneClient(c), nil
}
func SceneNewClientLB() (grpc_pb.SceneClient, error) {
c, err := resolver.GetGrpcClientConn("etcd:///" + common.KeyDiscoverServiceNameScene)
if err != nil {
return nil, err
}
return grpc_pb.NewSceneClient(c), nil
}
func SceneNewBroadcastClient() map[int64]grpc_pb.SceneClient {
clientM := make(map[int64]grpc_pb.SceneClient)
connM := discover.FindServerAll(common.KeyDiscoverScene)

View File

@@ -0,0 +1,24 @@
package service
import (
"common/discover"
"common/discover/common"
"common/net/grpc/resolver"
"common/proto/ss/grpc_pb"
)
func UserNewClient(sid ...int64) (grpc_pb.UserClient, error) {
c, err := discover.FindServer(common.KeyDiscoverUser, sid...)
if err != nil {
return nil, err
}
return grpc_pb.NewUserClient(c), nil
}
func UserNewClientLB() (grpc_pb.UserClient, error) {
c, err := resolver.GetGrpcClientConn("etcd:///" + common.KeyDiscoverServiceNameUser)
if err != nil {
return nil, err
}
return grpc_pb.NewUserClient(c), nil
}

View File

@@ -7,7 +7,9 @@ import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"net"
"sync"
"time"
@@ -51,6 +53,7 @@ func (s *Base) Init(addr string, port int32) {
defer func() {
if r := recover(); r != nil {
log.Errorf("server Panic: %v", r)
err = status.Error(codes.Internal, fmt.Sprintf("%v", r))
}
}()
resp, err = handler(ctx, req)

View File

@@ -0,0 +1,46 @@
package http_resp
import (
"common/proto/ss/ss_common"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
OK = NewCode(0, "OK")
Failed = NewCode(1, "Failed")
ParamError = NewCode(1001, "参数错误")
NameEmpty = NewCode(1002, "名称不能为空")
NameDuplicate = NewCode(1003, "名称或编号不能重复")
ListEmpty = NewCode(1004, "列表不能为空")
RepeatCommit = NewCode(1005, "请勿重复提交")
)
type Code struct {
code int
error string
}
func NewCode(code int, error string) *Code {
return &Code{
code: code,
error: error,
}
}
func (c *Code) Code() int {
return c.code
}
func (c *Code) Error() string {
return c.error
}
func (c *Code) Wrap() error {
st := status.New(codes.Unknown, c.Error())
st, _ = st.WithDetails(&ss_common.ErrorInfo{
Code: int32(c.Code()),
Msg: c.Error(),
})
return st.Err()
}

View File

@@ -0,0 +1,22 @@
package http_resp
type RespJsonData struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data interface{} `json:"data,omitempty"`
}
func Success(data interface{}) *RespJsonData {
return &RespJsonData{
Code: OK.Code(),
Msg: OK.Error(),
Data: data,
}
}
func Error(code int, message string) *RespJsonData {
return &RespJsonData{
Code: code,
Msg: message,
}
}