feat 接入Prometheus

This commit is contained in:
2025-12-17 21:05:12 +08:00
parent efa9f50d3e
commit da91cff056
24 changed files with 289 additions and 60 deletions

View File

@@ -12,6 +12,13 @@ type LogConfig struct {
Level string `yaml:"level"`
}
type MonitorConfig struct {
Prometheus *struct {
Address string `yaml:"address"`
Port int `yaml:"port"`
} `yaml:"prometheus"`
}
type DBConfig struct {
Etcd *struct {
Address []string `yaml:"address"`

View File

@@ -0,0 +1,33 @@
package utils
import (
"gorm.io/driver/mysql"
"gorm.io/gen"
"gorm.io/gorm"
"testing"
)
func TestGenGormStruct(t *testing.T) {
dsn := "user:password@tcp(47.108.184.184:3306)/point?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn))
if err != nil {
panic(err)
}
g := gen.NewGenerator(gen.Config{
OutPath: "./model", // 生成的 model 文件路径
})
g.UseDB(db)
// 生成所有表的 struct
g.ApplyBasic(g.GenerateAllTable()...)
// 或者指定表
// g.ApplyBasic(
// g.GenerateModel("users"),
// g.GenerateModel("orders"),
// )
g.Execute()
}

View File

@@ -0,0 +1,41 @@
package utils
import (
"sync"
)
// DefaultMaxWorkers 默认并发数
const DefaultMaxWorkers = 32
// WorkerPool 并发执行一批任务。
func WorkerPool(tasks []interface{}, taskFunc func(interface{}), maxWorkers ...int) {
if len(tasks) == 0 {
return
}
workers := DefaultMaxWorkers
if len(maxWorkers) > 0 && maxWorkers[0] > 0 {
workers = maxWorkers[0]
}
if workers > len(tasks) {
workers = len(tasks)
}
jobs := make(chan interface{}, len(tasks))
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range jobs {
taskFunc(task)
}
}()
}
for _, task := range tasks {
jobs <- task
}
close(jobs)
wg.Wait()
}

View File

@@ -21,6 +21,7 @@ type Module interface {
func (p *Program) Init(_ svc.Environment) error {
p.moduleList = append(p.moduleList, &ModuleBase{})
p.moduleList = append(p.moduleList, &ModuleDB{})
p.moduleList = append(p.moduleList, &ModulePrometheus{})
p.moduleList = append(p.moduleList, &ModuleWebServer{})
p.moduleList = append(p.moduleList, &ModuleWebsocketServer{})
p.moduleList = append(p.moduleList, &ModuleGrpcServer{})

View File

@@ -0,0 +1,47 @@
package app
import (
"common/log"
"context"
"errors"
"fmt"
"gateway/config"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"sync"
)
// ModulePrometheus 普罗米修斯模块
type ModulePrometheus struct {
wg *sync.WaitGroup
server *http.Server
}
func (m *ModulePrometheus) Init() error {
m.wg = &sync.WaitGroup{}
return nil
}
func (m *ModulePrometheus) Start() error {
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.server = &http.Server{
Addr: fmt.Sprintf("%v:%v", config.Get().Monitor.Prometheus.Address, config.Get().Monitor.Prometheus.Port),
Handler: promhttp.Handler(),
}
if err := m.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Errorf("prometheus server failed: %v", err.Error())
}
log.Infof("prometheus server stop.")
}()
return nil
}
func (m *ModulePrometheus) Stop() error {
if err := m.server.Shutdown(context.Background()); err != nil {
log.Errorf("stop prometheus server failed: %v", err)
}
m.wg.Wait()
return nil
}

View File

@@ -2,6 +2,7 @@ package app
import (
"common/log"
"context"
"errors"
"fmt"
"gateway/config"
@@ -41,7 +42,7 @@ func (m *ModuleWebServer) Start() error {
}
func (m *ModuleWebServer) Stop() error {
if err := m.server.Close(); err != nil {
if err := m.server.Shutdown(context.Background()); err != nil {
log.Errorf("stop http server failed: %v", err)
}
m.wg.Wait()

View File

@@ -8,6 +8,11 @@ log:
maxBackups: 100
maxAge: 7
monitor:
prometheus:
address: "0.0.0.0"
port: 8504
db:
etcd:
address: [ "10.0.40.9:2379" ]

View File

@@ -5,10 +5,11 @@ import "common/config"
const path = "./config"
type Config struct {
App *config.AppConfig `yaml:"app"`
Log *config.LogConfig `yaml:"log"`
DB *config.DBConfig `yaml:"db"`
Serve *config.ServeConfig `yaml:"serve"`
App *config.AppConfig `yaml:"app"`
Log *config.LogConfig `yaml:"log"`
Monitor *config.MonitorConfig `yaml:"monitor"`
DB *config.DBConfig `yaml:"db"`
Serve *config.ServeConfig `yaml:"serve"`
}
var cfg *Config

View File

@@ -8,6 +8,11 @@ log:
maxBackups: 100
maxAge: 7
monitor:
prometheus:
address: "0.0.0.0"
port: 8504
db:
etcd:
address: [ "172.18.28.0:2379" ]

View File

@@ -5,6 +5,7 @@ import (
"common/proto/sc/sc_pb"
"common/proto/ss/grpc_pb"
"gateway/handler/ws_handler"
"google.golang.org/protobuf/proto"
"sync"
)
@@ -23,9 +24,27 @@ func (s *Server) ToClient(server grpc_pb.Gateway_ToClientServer) error {
return
} else {
if args.UID == -1 {
for _, client := range ws_handler.UserMgr.GetAll() {
client.WriteBytes(sc_pb.MessageID(args.MessageID), args.Payload)
//utils.WorkerPool(ws_handler.UserMgr.GetAllInterface(), func(task interface{}) {
// client := task.(*ws_handler.Client)
// client.WriteBytes(sc_pb.MessageID(args.MessageID), args.Payload)
//})
data, err := proto.Marshal(&sc_pb.Message{
ID: sc_pb.MessageID(args.MessageID),
Payload: args.Payload,
})
if err != nil {
log.Errorf("ToClient proto.Marshal error: %v", err)
continue
}
for _, client := range ws_handler.UserMgr.GetAll() {
client.WriteBytesPreMarshal(data)
}
//for _, client := range ws_handler.UserMgr.GetAll() {
// client.WriteBytes(sc_pb.MessageID(args.MessageID), args.Payload)
//}
} else {
if client := ws_handler.UserMgr.GetByUID(args.UID); client != nil {
client.WriteBytes(sc_pb.MessageID(args.MessageID), args.Payload)

View File

@@ -3,11 +3,9 @@ package ws_handler
import (
"common/log"
"common/net/socket"
"common/proto/sc/sc_pb"
"context"
"fmt"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"runtime/debug"
"sync"
"time"
@@ -84,47 +82,6 @@ func (c *Client) OnEvent(event Event) {
}
}
// WriteMessage 向客户端发送消息
func (c *Client) WriteMessage(id sc_pb.MessageID, data proto.Message) {
if c.conn.IsClose() {
return
}
d, err := proto.Marshal(data)
if err != nil {
c.logger.Errorf("WriteMessage proto.Marshal err: %v", err)
return
}
m, err := proto.Marshal(&sc_pb.Message{
ID: id,
Payload: d,
})
if err != nil {
c.logger.Errorf("WriteMessage proto.Marshal err: %v", err)
return
}
if err = c.conn.Write(m); err != nil {
c.logger.Errorf("WriteMessage err: %v", err)
}
}
// WriteBytes 向客户端发送字节数据
func (c *Client) WriteBytes(id sc_pb.MessageID, data []byte) {
if c.conn.IsClose() {
return
}
m, err := proto.Marshal(&sc_pb.Message{
ID: id,
Payload: data,
})
if err != nil {
c.logger.Errorf("WriteBytes proto.Marshal err: %v", err)
return
}
if err = c.conn.Write(m); err != nil {
c.logger.Errorf("WriteBytes err: %v", err)
}
}
// CloseClient 关闭客户端同步会等待onClose执行完成
func (c *Client) CloseClient() {
if c.cancel != nil {

View File

@@ -0,0 +1,57 @@
package ws_handler
import (
"common/proto/sc/sc_pb"
"google.golang.org/protobuf/proto"
)
// WriteMessage 向客户端发送消息
func (c *Client) WriteMessage(id sc_pb.MessageID, data proto.Message) {
if c.conn == nil || c.conn.IsClose() {
return
}
d, err := proto.Marshal(data)
if err != nil {
c.logger.Errorf("WriteMessage proto.Marshal err: %v", err)
return
}
m, err := proto.Marshal(&sc_pb.Message{
ID: id,
Payload: d,
})
if err != nil {
c.logger.Errorf("WriteMessage proto.Marshal err: %v", err)
return
}
if err = c.conn.Write(m); err != nil {
c.logger.Errorf("WriteMessage err: %v", err)
}
}
// WriteBytes 向客户端发送字节数据
func (c *Client) WriteBytes(id sc_pb.MessageID, data []byte) {
if c.conn == nil || c.conn.IsClose() {
return
}
m, err := proto.Marshal(&sc_pb.Message{
ID: id,
Payload: data,
})
if err != nil {
c.logger.Errorf("WriteBytes proto.Marshal err: %v", err)
return
}
if err = c.conn.Write(m); err != nil {
c.logger.Errorf("WriteBytes err: %v", err)
}
}
// WriteBytesPreMarshal 向客户端发送字节数据(需要预先打包,适合广播相同数据)
func (c *Client) WriteBytesPreMarshal(data []byte) {
if c.conn == nil || c.conn.IsClose() {
return
}
if err := c.conn.Write(data); err != nil {
c.logger.Errorf("WriteBytes err: %v", err)
}
}

View File

@@ -32,7 +32,23 @@ func (m *userManager) Delete(uid int32) {
func (m *userManager) GetAll() map[int32]*Client {
m.RLock()
defer m.RUnlock()
return m.userMap
copyMap := make(map[int32]*Client, len(m.userMap))
for k, v := range m.userMap {
copyMap[k] = v
}
return copyMap
}
func (m *userManager) GetAllInterface() []interface{} {
m.RLock()
defer m.RUnlock()
r := make([]interface{}, 0)
for _, v := range m.userMap {
r = append(r, v)
}
return r
}
func (m *userManager) GetByUID(uid int32) *Client {

View File

@@ -9,7 +9,8 @@ log:
maxAge: 7
client:
count: 100
uid: [ 1,1000000 ]
websocket:
address: "ws://127.0.0.1"
port: 8501
address: "wss://www.hlsq.asia/ws/"
port: 0

View File

@@ -8,6 +8,7 @@ type Config struct {
App *config.AppConfig `yaml:"app"`
Log *config.LogConfig `yaml:"log"`
Client *struct {
Count int32 `yaml:"count"`
UID []int32 `yaml:"uid"`
Websocket *config.AddressConfig `yaml:"websocket"`
} `yaml:"client"`

View File

@@ -9,6 +9,7 @@ log:
maxAge: 7
client:
count: 100
uid: [ 1,1000000 ]
websocket:
address: "ws://172.18.28.0"

View File

@@ -4,6 +4,7 @@ import (
"common/utils"
"robot/config"
"strconv"
"time"
)
type Manager struct {
@@ -19,8 +20,9 @@ func NewManager(addr string) *Manager {
func (c *Manager) Start() {
cfg := config.Get().Client
for i := 0; i < 300; i++ {
for i := int32(0); i < cfg.Count; i++ {
client := NewClient(c.addr, strconv.Itoa(utils.RandInt(int(cfg.UID[0]), int(cfg.UID[1]))))
client.Start()
time.Sleep(time.Millisecond * 10)
}
}

View File

@@ -15,5 +15,5 @@ db:
serve:
grpc:
address: "127.0.0.1"
port: 8504
port: 8601
ttl: 20

View File

@@ -15,5 +15,5 @@ db:
serve:
grpc:
address: "172.18.28.0"
port: 8504
port: 8601
ttl: 20