完善服务器

This commit is contained in:
2025-07-01 00:08:27 +08:00
parent b45eb83fe4
commit 7c2c32a31a
37 changed files with 1307 additions and 160 deletions

View File

@@ -1,25 +1,36 @@
package common
type ListenerType int32
type ListenerType int
const (
ListenerTypeNewServer = 1 // 服务启动
ListenerTypeCloseServer = 2 // 服务关闭
ListenerTypeNewServer = 1 // 服务启动
ListenerTypeCloseServer = 2 // 服务关闭
ListenerTypeNewInstance = 3 // 副本启动
ListenerTypeCloseInstance = 4 // 副本关闭
)
var (
KeyDiscover = "xh-discover"
KeyDiscoverService = KeyDiscover + "/service"
KeyDiscover = "xh-discover"
KeyDiscoverService = KeyDiscover + "/service"
KeyDiscoverInstance = KeyDiscover + "/instance"
)
var (
KeyDiscoverGateway = KeyDiscoverService + "/gateway"
KeyDiscoverDatabase = KeyDiscoverService + "/database"
KeyDiscoverGateway = KeyDiscoverService + "/gateway" // 网关服
KeyDiscoverDatabase = KeyDiscoverService + "/database" // 数据服
KeyDiscoverScene = KeyDiscoverService + "/scene" // 场景服
)
// ServiceProvider 服务提供者
type ServiceProvider struct {
Target string
SID string
SID int64
Addr string
}
// InstanceProvider 副本提供者
type InstanceProvider struct {
InstanceID int // 副本ID
UniqueNo int64 // 副本唯一编号
SID string
}

View File

@@ -0,0 +1,82 @@
package discover
import (
"common/db/etcd"
"common/discover/common"
"common/log"
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"strconv"
"sync"
)
// 大量读少量写的情况下读写锁比同步Map更高效
var (
instanceMU = sync.RWMutex{}
instanceM = make(map[int64]string) // [uniqueNo]sid
instanceLeaseM = make(map[int64]clientv3.LeaseID) // [uniqueNo]
)
func init() {
RegisterListener(common.ListenerTypeNewInstance, onInstanceStart)
RegisterListener(common.ListenerTypeCloseInstance, onInstanceStop)
}
// FindInstanceByUniqueNo 根据唯一标识查询副本
func FindInstanceByUniqueNo(uniqueNO int64) (sid string) {
instanceMU.RLock()
defer instanceMU.RUnlock()
if c, ok := instanceM[uniqueNO]; ok {
return c
}
return
}
// RegisterInstance 注册副本
func RegisterInstance(sid int64, instanceID int, uniqueNo, ttl int64) error {
serverMU.Lock()
defer serverMU.Unlock()
leaseID, err := common.NewLeaseAndKeepAlive(ttl)
if err != nil {
return err
}
key := fmt.Sprintf("%v/%v/%v", common.KeyDiscoverInstance, instanceID, uniqueNo)
_, err = etcd.Client().Put(context.Background(), key, strconv.Itoa(int(sid)), clientv3.WithLease(leaseID))
if err != nil {
return err
}
instanceLeaseM[uniqueNo] = leaseID
return nil
}
// UnRegisterInstance 解注册副本
func UnRegisterInstance(uniqueNo int64) {
serverMU.Lock()
defer serverMU.Unlock()
if leaseID, ok := instanceLeaseM[uniqueNo]; ok {
_, err := etcd.Client().Revoke(context.Background(), leaseID)
if err != nil {
log.Errorf("UnRegisterInstance err: %v", err)
}
delete(instanceLeaseM, uniqueNo)
}
}
// 某个副本启动了
func onInstanceStart(data any) {
if provider, ok := data.(*common.InstanceProvider); ok {
instanceMU.Lock()
defer instanceMU.Unlock()
instanceM[provider.UniqueNo] = provider.SID
}
}
// 某个副本关闭了
func onInstanceStop(data any) {
if provider, ok := data.(*common.InstanceProvider); ok {
instanceMU.Lock()
defer instanceMU.Unlock()
delete(instanceM, provider.UniqueNo)
}
}

View File

@@ -3,9 +3,11 @@ package discover
import (
"common/db/etcd"
"common/discover/common"
"common/utils"
"context"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"strconv"
"strings"
"sync"
)
@@ -50,12 +52,22 @@ func Listen() {
onServerChange(clientv3.EventTypePut, string(kv.Key), string(kv.Value))
}
chService := etcd.Client().Watch(stopCtx, common.KeyDiscoverService, clientv3.WithPrefix(), clientv3.WithRev(serviceAll.Header.Revision+1))
// 副本
instanceAll, _ := etcd.Client().Get(stopCtx, common.KeyDiscoverInstance, clientv3.WithPrefix())
for _, kv := range instanceAll.Kvs {
onInstanceChange(clientv3.EventTypePut, string(kv.Key), string(kv.Value), nil)
}
chInstance := etcd.Client().Watch(stopCtx, common.KeyDiscoverScene, clientv3.WithPrefix(), clientv3.WithRev(instanceAll.Header.Revision+1), clientv3.WithPrevKV())
for {
select {
case msg := <-chService:
for _, event := range msg.Events {
onServerChange(event.Type, string(event.Kv.Key), string(event.Kv.Value))
}
case msg := <-chInstance:
for _, event := range msg.Events {
onInstanceChange(event.Type, string(event.Kv.Key), string(event.Kv.Value), event.PrevKv)
}
case <-stopCtx.Done():
return
}
@@ -73,13 +85,36 @@ func onServerChange(t mvccpb.Event_EventType, key, value string) {
case clientv3.EventTypePut:
onCBByType(common.ListenerTypeNewServer, &common.ServiceProvider{
Target: common.KeyDiscoverService + "/" + split[2],
SID: split[3],
SID: utils.StringToInt64(split[3]),
Addr: value,
})
case clientv3.EventTypeDelete:
onCBByType(common.ListenerTypeCloseServer, &common.ServiceProvider{
Target: common.KeyDiscoverService + "/" + split[2],
SID: split[3],
SID: utils.StringToInt64(split[3]),
})
}
}
// 副本发生变化
func onInstanceChange(t mvccpb.Event_EventType, key, value string, preKv *mvccpb.KeyValue) {
split := strings.Split(key, "/")
if len(split) != 4 {
return
}
instanceID, _ := strconv.Atoi(split[2])
switch t {
case clientv3.EventTypePut:
onCBByType(common.ListenerTypeNewInstance, &common.InstanceProvider{
InstanceID: instanceID,
UniqueNo: utils.StringToInt64(split[3]),
SID: value,
})
case clientv3.EventTypeDelete:
onCBByType(common.ListenerTypeCloseInstance, &common.InstanceProvider{
InstanceID: instanceID,
UniqueNo: utils.StringToInt64(split[3]),
SID: string(preKv.Value),
})
}
}

View File

@@ -25,7 +25,7 @@ func init() {
}
// FindServer 根据SID或随机查找服务
func FindServer(target string, sid ...string) (*grpc.ClientConn, error) {
func FindServer(target string, sid ...int64) (*grpc.ClientConn, error) {
serverMU.RLock()
defer serverMU.RUnlock()
if v, ok := conn[target]; ok {
@@ -34,13 +34,13 @@ func FindServer(target string, sid ...string) (*grpc.ClientConn, error) {
return nil, fmt.Errorf("cannot find server")
}
func FindServerAll(target string) map[string]*grpc.ClientConn {
func FindServerAll(target string) map[int64]*grpc.ClientConn {
serverMU.RLock()
defer serverMU.RUnlock()
if v, ok := conn[target]; ok {
return v.LoadAll()
}
return make(map[string]*grpc.ClientConn)
return make(map[int64]*grpc.ClientConn)
}
// RegisterGrpcServer 注册服务提供者