diff --git a/db/redis/client.go b/db/redis/client.go index 8683ddc..5330187 100644 --- a/db/redis/client.go +++ b/db/redis/client.go @@ -72,3 +72,11 @@ func (c *Client) HDel(ctx context.Context, key string, fields ...string) *redis. func (c *Client) Pipeline() redis.Pipeliner { return c.cli.Pipeline() } + +func (c *Client) ScriptLoad(ctx context.Context, script string) *redis.StringCmd { + return c.cli.ScriptLoad(ctx, script) +} + +func (c *Client) EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd { + return c.cli.EvalSha(ctx, sha1, keys, args...) +} diff --git a/discover/listener.go b/discover/listener.go index 8016e97..7fdb518 100644 --- a/discover/listener.go +++ b/discover/listener.go @@ -41,7 +41,7 @@ func onCBByType(t common.ListenerType, data any) { } } -func Listen() { +func Listen(ready *sync.WaitGroup) { var stopCtx context.Context stopCtx, stopFunc = context.WithCancel(context.Background()) wg.Add(1) @@ -60,6 +60,8 @@ func Listen() { onInstanceChange(clientv3.EventTypePut, string(kv.Key), string(kv.Value), nil) } chInstance := etcd.GetClient().Watch(common.KeyDiscoverScene, clientv3.WithPrefix(), clientv3.WithRev(instanceAll.Header.Revision+1), clientv3.WithPrevKV()) + // 准备好了 + ready.Done() for { select { case msg := <-chService: diff --git a/module/db.go b/module/db.go index d807114..345f98c 100644 --- a/module/db.go +++ b/module/db.go @@ -10,6 +10,7 @@ import ( // DB 数据库模块 type DB struct { + DefaultModule cfg *config.DBConfig } @@ -35,10 +36,6 @@ func (m *DB) Init() error { return nil } -func (m *DB) Start() error { - return nil -} - func (m *DB) Stop() error { if err := etcd.Close(); err != nil { log.Errorf("close etcd failed: %v", err) diff --git a/module/discover.go b/module/discover.go index ba476a5..263d291 100644 --- a/module/discover.go +++ b/module/discover.go @@ -1,17 +1,17 @@ package module -import "git.hlsq.asia/mmorpg/service-common/discover" +import ( + "git.hlsq.asia/mmorpg/service-common/discover" + "sync" +) // Discover 服务发现模块 type Discover struct { + DefaultModule } -func (m *Discover) Init() error { - return nil -} - -func (m *Discover) Start() error { - discover.Listen() +func (m *Discover) Start(ready *sync.WaitGroup) error { + discover.Listen(ready) return nil } @@ -19,7 +19,3 @@ func (m *Discover) Stop() error { discover.Close() return nil } - -func (m *Discover) Bind(_ ...any) Module { - return m -} diff --git a/module/grpc.go b/module/grpc.go index 641fb32..77f5389 100644 --- a/module/grpc.go +++ b/module/grpc.go @@ -2,19 +2,22 @@ package module import ( "git.hlsq.asia/mmorpg/service-common/net/grpc/service" + "sync" ) // Grpc Grpc模块 type Grpc struct { + DefaultModule server service.IService } -func (m *Grpc) Init() error { +func (m *Grpc) Start(ready *sync.WaitGroup) error { + m.server.Init(ready) return nil } -func (m *Grpc) Start() error { - m.server.Init() +func (m *Grpc) AfterStart() error { + m.server.SetReady() return nil } diff --git a/module/module.go b/module/module.go index 311402a..130c087 100644 --- a/module/module.go +++ b/module/module.go @@ -1,8 +1,37 @@ package module +import "sync" + +// 重点!!!每个模块需要保证同步执行 + type Module interface { - Init() error - Start() error - Stop() error - Bind(data ...any) Module + Init() error // 初始化 + Start(ready *sync.WaitGroup) error // 启动 + AfterStart() error // 启动之后 + Stop() error // 停止 + Bind(data ...any) Module // 绑定数据 +} + +type DefaultModule struct { +} + +func (m *DefaultModule) Init() error { + return nil +} + +func (m *DefaultModule) Start(ready *sync.WaitGroup) error { + ready.Done() + return nil +} + +func (m *DefaultModule) AfterStart() error { + return nil +} + +func (m *DefaultModule) Stop() error { + return nil +} + +func (m *DefaultModule) Bind(_ ...any) Module { + return m } diff --git a/module/prometheus.go b/module/prometheus.go index ec50cb2..76eebdc 100644 --- a/module/prometheus.go +++ b/module/prometheus.go @@ -13,6 +13,7 @@ import ( // Prometheus 普罗米修斯模块 type Prometheus struct { + DefaultModule wg *sync.WaitGroup server *http.Server metricCfg *config.MetricConfig @@ -23,7 +24,7 @@ func (m *Prometheus) Init() error { return nil } -func (m *Prometheus) Start() error { +func (m *Prometheus) Start(ready *sync.WaitGroup) error { m.wg.Add(1) go func() { defer m.wg.Done() @@ -31,6 +32,7 @@ func (m *Prometheus) Start() error { Addr: fmt.Sprintf("%v:%v", m.metricCfg.Prometheus.Address, m.metricCfg.Prometheus.Port), Handler: promhttp.Handler(), } + ready.Done() if err := m.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Errorf("prometheus server failed: %v", err.Error()) } diff --git a/module/tracer.go b/module/tracer.go index de4a15e..d220b1a 100644 --- a/module/tracer.go +++ b/module/tracer.go @@ -14,6 +14,7 @@ import ( // Tracer 链路追踪模块 type Tracer struct { + DefaultModule tp *sdktrace.TracerProvider metricCfg *config.MetricConfig serviceName string @@ -42,10 +43,6 @@ func (m *Tracer) Init() error { return nil } -func (m *Tracer) Start() error { - return nil -} - func (m *Tracer) Stop() error { if m.tp != nil { if err := m.tp.Shutdown(context.Background()); err != nil { diff --git a/net/grpc/resolver/conn.go b/net/grpc/resolver/conn.go index 8c35210..b2a2ed4 100644 --- a/net/grpc/resolver/conn.go +++ b/net/grpc/resolver/conn.go @@ -9,12 +9,23 @@ import ( "time" ) +const serviceConfig = `{ + "loadBalancingConfig": [ + { + "round_robin": {} + } + ], + "healthCheckConfig": { + "serviceName": "" + } +}` + func NewGrpcConnection(target string) (*grpc.ClientConn, error) { cc, err := grpc.NewClient( target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(otelgrpc.NewClientHandler()), - grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin": {}}]}`), + grpc.WithDefaultServiceConfig(serviceConfig), grpc.WithKeepaliveParams( keepalive.ClientParameters{ Time: 30 * time.Second, // 保活探测包发送的时间间隔 diff --git a/net/grpc/service/service.go b/net/grpc/service/service.go index eba14b0..0541ee9 100644 --- a/net/grpc/service/service.go +++ b/net/grpc/service/service.go @@ -8,6 +8,8 @@ import ( "git.hlsq.asia/mmorpg/service-common/utils" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" "net" "sync" @@ -15,7 +17,8 @@ import ( ) type IService interface { - Init() + Init(ready *sync.WaitGroup) + SetReady() Close() } @@ -29,10 +32,11 @@ type Base struct { OnInit func(serve *grpc.Server) OnClose func() - wg *sync.WaitGroup + wg *sync.WaitGroup + healthcheck *health.Server } -func (s *Base) Init() { +func (s *Base) Init(ready *sync.WaitGroup) { s.wg = &sync.WaitGroup{} s.wg.Add(1) s.SID = utils.SnowflakeInstance().Generate().String() @@ -63,11 +67,18 @@ func (s *Base) Init() { s.Serve = grpc.NewServer(options...) s.OnInit(s.Serve) + // 健康检查 + s.healthcheck = health.NewServer() + s.healthcheck.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + grpc_health_v1.RegisterHealthServer(s.Serve, s.healthcheck) + // 服务注册 if err = discover.RegisterGrpcServer(s.Target, s.SID, fmt.Sprintf("%v:%d", s.Cfg.Address, s.Cfg.Port), s.Cfg.TTL); err != nil { log.Errorf("%v RegisterGrpcServer err: %v", s.Target, err) return } + // 准备好了 + ready.Done() if err = s.Serve.Serve(lis); err != nil { log.Errorf("%v Serve err: %v", s.Target, err) return @@ -76,9 +87,18 @@ func (s *Base) Init() { }() } +func (s *Base) SetReady() { + if s.healthcheck != nil { + s.healthcheck.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + } +} + func (s *Base) Close() { + if s.healthcheck != nil { + s.healthcheck.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + } if s.Serve != nil { - s.Serve.Stop() + s.Serve.GracefulStop() s.wg.Wait() } }