101 lines
2.5 KiB
Go
101 lines
2.5 KiB
Go
package discover
|
||
|
||
import (
|
||
"fmt"
|
||
"git.hlsq.asia/mmorpg/service-common/db/etcd"
|
||
"git.hlsq.asia/mmorpg/service-common/discover/common"
|
||
"git.hlsq.asia/mmorpg/service-common/log"
|
||
"git.hlsq.asia/mmorpg/service-common/net/grpc/grpc_conn"
|
||
clientv3 "go.etcd.io/etcd/client/v3"
|
||
"google.golang.org/grpc"
|
||
"sync"
|
||
)
|
||
|
||
// 大量读少量写的情况下,读写锁比同步Map更高效
|
||
var (
|
||
serverMU = sync.RWMutex{}
|
||
conn = make(map[string]*grpc_conn.GrpcConnectionMgr)
|
||
serverLeaseM = make(map[int64]clientv3.LeaseID)
|
||
)
|
||
|
||
func init() {
|
||
RegisterListener(common.ListenerTypeNewServer, onServerStart)
|
||
RegisterListener(common.ListenerTypeCloseServer, onServerStop)
|
||
}
|
||
|
||
// FindServer 根据SID或随机查找服务
|
||
func FindServer(target string, sid ...int64) (*grpc.ClientConn, error) {
|
||
serverMU.RLock()
|
||
defer serverMU.RUnlock()
|
||
if v, ok := conn[target]; ok {
|
||
return v.Load(sid...)
|
||
}
|
||
return nil, fmt.Errorf("cannot find server")
|
||
}
|
||
|
||
func FindServerAll(target string) map[int64]*grpc.ClientConn {
|
||
serverMU.RLock()
|
||
defer serverMU.RUnlock()
|
||
if v, ok := conn[target]; ok {
|
||
return v.LoadAll()
|
||
}
|
||
return make(map[int64]*grpc.ClientConn)
|
||
}
|
||
|
||
// RegisterGrpcServer 注册服务提供者
|
||
func RegisterGrpcServer(target string, sid int64, addr string, ttl int64) error {
|
||
serverMU.Lock()
|
||
defer serverMU.Unlock()
|
||
leaseID, err := common.NewLeaseAndKeepAlive(ttl)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
_, err = etcd.GetClient().Put(fmt.Sprintf("%v/%v", target, sid), addr, clientv3.WithLease(leaseID))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
serverLeaseM[sid] = leaseID
|
||
return nil
|
||
}
|
||
|
||
// UnRegisterGrpcServer 解注册服务提供者
|
||
func UnRegisterGrpcServer(sid int64) {
|
||
serverMU.Lock()
|
||
defer serverMU.Unlock()
|
||
if leaseID, ok := serverLeaseM[sid]; ok {
|
||
_, err := etcd.GetClient().Revoke(leaseID)
|
||
if err != nil {
|
||
log.Errorf("server.go UnRegisterGrpcServer err: %v", err)
|
||
}
|
||
delete(serverLeaseM, sid)
|
||
}
|
||
}
|
||
|
||
// 某个服务启动了
|
||
func onServerStart(data any) {
|
||
if provider, ok := data.(*common.ServiceProvider); ok {
|
||
serverMU.Lock()
|
||
defer serverMU.Unlock()
|
||
if v, ok := conn[provider.Target]; ok {
|
||
v.Store(provider.SID, provider.Addr)
|
||
} else {
|
||
mgr := grpc_conn.NewGrpcConnectionMgr()
|
||
mgr.Store(provider.SID, provider.Addr)
|
||
conn[provider.Target] = mgr
|
||
}
|
||
}
|
||
}
|
||
|
||
// 某个服务关闭了
|
||
func onServerStop(data any) {
|
||
if provider, ok := data.(*common.ServiceProvider); ok {
|
||
serverMU.Lock()
|
||
defer serverMU.Unlock()
|
||
if v, ok := conn[provider.Target]; ok {
|
||
if v.Delete(provider.SID) == 0 {
|
||
delete(conn, provider.Target)
|
||
}
|
||
}
|
||
}
|
||
}
|