feat 初次提交
This commit is contained in:
42
discover/common/define.go
Normal file
42
discover/common/define.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package common
|
||||
|
||||
type ListenerType int
|
||||
|
||||
const (
|
||||
ListenerTypeNewServer = 1 // 服务启动
|
||||
ListenerTypeCloseServer = 2 // 服务关闭
|
||||
ListenerTypeNewInstance = 3 // 副本启动
|
||||
ListenerTypeCloseInstance = 4 // 副本关闭
|
||||
)
|
||||
|
||||
var (
|
||||
KeyDiscover = "xh-discover"
|
||||
KeyDiscoverService = KeyDiscover + "/service"
|
||||
KeyDiscoverInstance = KeyDiscover + "/instance"
|
||||
)
|
||||
|
||||
var (
|
||||
KeyDiscoverServiceNameGateway = "gateway" // 网关服
|
||||
KeyDiscoverServiceNameScene = "scene" // 场景服
|
||||
KeyDiscoverServiceNameUser = "user" // 用户中心
|
||||
)
|
||||
|
||||
var (
|
||||
KeyDiscoverGateway = KeyDiscoverService + "/" + KeyDiscoverServiceNameGateway // 网关服
|
||||
KeyDiscoverScene = KeyDiscoverService + "/" + KeyDiscoverServiceNameScene // 场景服
|
||||
KeyDiscoverUser = KeyDiscoverService + "/" + KeyDiscoverServiceNameUser // 用户中心
|
||||
)
|
||||
|
||||
// ServiceProvider 服务提供者
|
||||
type ServiceProvider struct {
|
||||
Target string
|
||||
SID int64
|
||||
Addr string
|
||||
}
|
||||
|
||||
// InstanceProvider 副本提供者
|
||||
type InstanceProvider struct {
|
||||
InstanceID int // 副本ID
|
||||
UniqueNo int64 // 副本唯一编号
|
||||
SID string
|
||||
}
|
||||
25
discover/common/tool.go
Normal file
25
discover/common/tool.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"common/db/etcd"
|
||||
"common/log"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
// NewLeaseAndKeepAlive 创建租约并保活
|
||||
func NewLeaseAndKeepAlive(ttl int64) (clientv3.LeaseID, error) {
|
||||
lease, err := etcd.GetClient().Grant(ttl)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
chKeepAlive, err := etcd.GetClient().KeepAlive(lease.ID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
go func(leaseID clientv3.LeaseID) {
|
||||
for range chKeepAlive {
|
||||
}
|
||||
log.Warnf("Lease %x expired or revoked", leaseID)
|
||||
}(lease.ID)
|
||||
return lease.ID, nil
|
||||
}
|
||||
81
discover/instance.go
Normal file
81
discover/instance.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package discover
|
||||
|
||||
import (
|
||||
"common/db/etcd"
|
||||
"common/discover/common"
|
||||
"common/log"
|
||||
"fmt"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// 大量读少量写的情况下,读写锁比同步Map更高效
|
||||
var (
|
||||
instanceMU = sync.RWMutex{}
|
||||
instanceM = make(map[int64]string) // [uniqueNo]sid
|
||||
instanceLeaseM = make(map[int64]clientv3.LeaseID) // [uniqueNo]
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterListener(common.ListenerTypeNewInstance, onInstanceStart)
|
||||
RegisterListener(common.ListenerTypeCloseInstance, onInstanceStop)
|
||||
}
|
||||
|
||||
// FindInstanceByUniqueNo 根据唯一标识查询副本
|
||||
func FindInstanceByUniqueNo(uniqueNO int64) (sid string) {
|
||||
instanceMU.RLock()
|
||||
defer instanceMU.RUnlock()
|
||||
if c, ok := instanceM[uniqueNO]; ok {
|
||||
return c
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// RegisterInstance 注册副本
|
||||
func RegisterInstance(sid int64, instanceID int32, uniqueNo, ttl int64) error {
|
||||
serverMU.Lock()
|
||||
defer serverMU.Unlock()
|
||||
leaseID, err := common.NewLeaseAndKeepAlive(ttl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := fmt.Sprintf("%v/%v/%v", common.KeyDiscoverInstance, instanceID, uniqueNo)
|
||||
_, err = etcd.GetClient().Put(key, strconv.Itoa(int(sid)), clientv3.WithLease(leaseID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
instanceLeaseM[uniqueNo] = leaseID
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnRegisterInstance 解注册副本
|
||||
func UnRegisterInstance(uniqueNo int64) {
|
||||
serverMU.Lock()
|
||||
defer serverMU.Unlock()
|
||||
if leaseID, ok := instanceLeaseM[uniqueNo]; ok {
|
||||
_, err := etcd.GetClient().Revoke(leaseID)
|
||||
if err != nil {
|
||||
log.Errorf("UnRegisterInstance err: %v", err)
|
||||
}
|
||||
delete(instanceLeaseM, uniqueNo)
|
||||
}
|
||||
}
|
||||
|
||||
// 某个副本启动了
|
||||
func onInstanceStart(data any) {
|
||||
if provider, ok := data.(*common.InstanceProvider); ok {
|
||||
instanceMU.Lock()
|
||||
defer instanceMU.Unlock()
|
||||
instanceM[provider.UniqueNo] = provider.SID
|
||||
}
|
||||
}
|
||||
|
||||
// 某个副本关闭了
|
||||
func onInstanceStop(data any) {
|
||||
if provider, ok := data.(*common.InstanceProvider); ok {
|
||||
instanceMU.Lock()
|
||||
defer instanceMU.Unlock()
|
||||
delete(instanceM, provider.UniqueNo)
|
||||
}
|
||||
}
|
||||
130
discover/listener.go
Normal file
130
discover/listener.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package discover
|
||||
|
||||
import (
|
||||
"common/db/etcd"
|
||||
"common/discover/common"
|
||||
"common/log"
|
||||
"common/utils"
|
||||
"context"
|
||||
"fmt"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
wg = &sync.WaitGroup{}
|
||||
listenerMU = &sync.RWMutex{}
|
||||
listener = make(map[common.ListenerType][]func(data any))
|
||||
stopFunc context.CancelFunc
|
||||
)
|
||||
|
||||
// RegisterListener 注册服务变动监听
|
||||
func RegisterListener(t common.ListenerType, cb func(data any)) {
|
||||
listenerMU.Lock()
|
||||
defer listenerMU.Unlock()
|
||||
arr := listener[t]
|
||||
if arr == nil {
|
||||
arr = make([]func(data any), 0)
|
||||
}
|
||||
arr = append(arr, cb)
|
||||
listener[t] = arr
|
||||
}
|
||||
|
||||
// 根据类型触发回调
|
||||
func onCBByType(t common.ListenerType, data any) {
|
||||
listenerMU.RLock()
|
||||
defer listenerMU.RUnlock()
|
||||
for _, f := range listener[t] {
|
||||
f(data)
|
||||
}
|
||||
}
|
||||
|
||||
func Listen() {
|
||||
var stopCtx context.Context
|
||||
stopCtx, stopFunc = context.WithCancel(context.Background())
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
log.Infof(fmt.Sprintf("Discover start listen..."))
|
||||
// 服务
|
||||
serviceAll, _ := etcd.GetClient().Get(common.KeyDiscoverService, clientv3.WithPrefix())
|
||||
for _, kv := range serviceAll.Kvs {
|
||||
onServerChange(clientv3.EventTypePut, string(kv.Key), string(kv.Value))
|
||||
}
|
||||
chService := etcd.GetClient().Watch(common.KeyDiscoverService, clientv3.WithPrefix(), clientv3.WithRev(serviceAll.Header.Revision+1))
|
||||
// 副本
|
||||
instanceAll, _ := etcd.GetClient().Get(common.KeyDiscoverInstance, clientv3.WithPrefix())
|
||||
for _, kv := range instanceAll.Kvs {
|
||||
onInstanceChange(clientv3.EventTypePut, string(kv.Key), string(kv.Value), nil)
|
||||
}
|
||||
chInstance := etcd.GetClient().Watch(common.KeyDiscoverScene, clientv3.WithPrefix(), clientv3.WithRev(instanceAll.Header.Revision+1), clientv3.WithPrevKV())
|
||||
for {
|
||||
select {
|
||||
case msg := <-chService:
|
||||
for _, event := range msg.Events {
|
||||
onServerChange(event.Type, string(event.Kv.Key), string(event.Kv.Value))
|
||||
}
|
||||
case msg := <-chInstance:
|
||||
for _, event := range msg.Events {
|
||||
onInstanceChange(event.Type, string(event.Kv.Key), string(event.Kv.Value), event.PrevKv)
|
||||
}
|
||||
case <-stopCtx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 服务发生变化
|
||||
func onServerChange(t mvccpb.Event_EventType, key, value string) {
|
||||
split := strings.Split(key, "/")
|
||||
if len(split) != 4 {
|
||||
return
|
||||
}
|
||||
switch t {
|
||||
case clientv3.EventTypePut:
|
||||
onCBByType(common.ListenerTypeNewServer, &common.ServiceProvider{
|
||||
Target: common.KeyDiscoverService + "/" + split[2],
|
||||
SID: utils.StringToInt64(split[3]),
|
||||
Addr: value,
|
||||
})
|
||||
case clientv3.EventTypeDelete:
|
||||
onCBByType(common.ListenerTypeCloseServer, &common.ServiceProvider{
|
||||
Target: common.KeyDiscoverService + "/" + split[2],
|
||||
SID: utils.StringToInt64(split[3]),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 副本发生变化
|
||||
func onInstanceChange(t mvccpb.Event_EventType, key, value string, preKv *mvccpb.KeyValue) {
|
||||
split := strings.Split(key, "/")
|
||||
if len(split) != 4 {
|
||||
return
|
||||
}
|
||||
instanceID, _ := strconv.Atoi(split[2])
|
||||
switch t {
|
||||
case clientv3.EventTypePut:
|
||||
onCBByType(common.ListenerTypeNewInstance, &common.InstanceProvider{
|
||||
InstanceID: instanceID,
|
||||
UniqueNo: utils.StringToInt64(split[3]),
|
||||
SID: value,
|
||||
})
|
||||
case clientv3.EventTypeDelete:
|
||||
onCBByType(common.ListenerTypeCloseInstance, &common.InstanceProvider{
|
||||
InstanceID: instanceID,
|
||||
UniqueNo: utils.StringToInt64(split[3]),
|
||||
SID: string(preKv.Value),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Close() {
|
||||
if stopFunc != nil {
|
||||
stopFunc()
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
100
discover/server.go
Normal file
100
discover/server.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package discover
|
||||
|
||||
import (
|
||||
"common/db/etcd"
|
||||
"common/discover/common"
|
||||
"common/log"
|
||||
"common/net/grpc/grpc_conn"
|
||||
"fmt"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"google.golang.org/grpc"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// 大量读少量写的情况下,读写锁比同步Map更高效
|
||||
var (
|
||||
serverMU = sync.RWMutex{}
|
||||
conn = make(map[string]*grpc_conn.GrpcConnectionMgr)
|
||||
serverLeaseM = make(map[int64]clientv3.LeaseID)
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterListener(common.ListenerTypeNewServer, onServerStart)
|
||||
RegisterListener(common.ListenerTypeCloseServer, onServerStop)
|
||||
}
|
||||
|
||||
// FindServer 根据SID或随机查找服务
|
||||
func FindServer(target string, sid ...int64) (*grpc.ClientConn, error) {
|
||||
serverMU.RLock()
|
||||
defer serverMU.RUnlock()
|
||||
if v, ok := conn[target]; ok {
|
||||
return v.Load(sid...)
|
||||
}
|
||||
return nil, fmt.Errorf("cannot find server")
|
||||
}
|
||||
|
||||
func FindServerAll(target string) map[int64]*grpc.ClientConn {
|
||||
serverMU.RLock()
|
||||
defer serverMU.RUnlock()
|
||||
if v, ok := conn[target]; ok {
|
||||
return v.LoadAll()
|
||||
}
|
||||
return make(map[int64]*grpc.ClientConn)
|
||||
}
|
||||
|
||||
// RegisterGrpcServer 注册服务提供者
|
||||
func RegisterGrpcServer(target string, sid int64, addr string, ttl int64) error {
|
||||
serverMU.Lock()
|
||||
defer serverMU.Unlock()
|
||||
leaseID, err := common.NewLeaseAndKeepAlive(ttl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = etcd.GetClient().Put(fmt.Sprintf("%v/%v", target, sid), addr, clientv3.WithLease(leaseID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serverLeaseM[sid] = leaseID
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnRegisterGrpcServer 解注册服务提供者
|
||||
func UnRegisterGrpcServer(sid int64) {
|
||||
serverMU.Lock()
|
||||
defer serverMU.Unlock()
|
||||
if leaseID, ok := serverLeaseM[sid]; ok {
|
||||
_, err := etcd.GetClient().Revoke(leaseID)
|
||||
if err != nil {
|
||||
log.Errorf("server.go UnRegisterGrpcServer err: %v", err)
|
||||
}
|
||||
delete(serverLeaseM, sid)
|
||||
}
|
||||
}
|
||||
|
||||
// 某个服务启动了
|
||||
func onServerStart(data any) {
|
||||
if provider, ok := data.(*common.ServiceProvider); ok {
|
||||
serverMU.Lock()
|
||||
defer serverMU.Unlock()
|
||||
if v, ok := conn[provider.Target]; ok {
|
||||
v.Store(provider.SID, provider.Addr)
|
||||
} else {
|
||||
mgr := grpc_conn.NewGrpcConnectionMgr()
|
||||
mgr.Store(provider.SID, provider.Addr)
|
||||
conn[provider.Target] = mgr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 某个服务关闭了
|
||||
func onServerStop(data any) {
|
||||
if provider, ok := data.(*common.ServiceProvider); ok {
|
||||
serverMU.Lock()
|
||||
defer serverMU.Unlock()
|
||||
if v, ok := conn[provider.Target]; ok {
|
||||
if v.Delete(provider.SID) == 0 {
|
||||
delete(conn, provider.Target)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user