package discover import ( "common/db/etcd" "common/discover/common" "common/discover/grpc_client" "common/log" "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "sync" ) // 大量读少量写的情况下,读写锁比同步Map更高效 var ( serverMU = sync.RWMutex{} conn = make(map[string]*grpc_client.GrpcConnectionMgr) serverLeaseM = make(map[string]clientv3.LeaseID) ) func init() { RegisterListener(common.ListenerTypeNewServer, onServerStart) RegisterListener(common.ListenerTypeCloseServer, onServerStop) } // FindServer 根据SID或随机查找服务 func FindServer(target string, sid ...string) (*grpc.ClientConn, error) { serverMU.RLock() defer serverMU.RUnlock() if v, ok := conn[target]; ok { return v.Load(sid...) } return nil, fmt.Errorf("cannot find server") } func FindServerAll(target string) map[string]*grpc.ClientConn { serverMU.RLock() defer serverMU.RUnlock() if v, ok := conn[target]; ok { return v.LoadAll() } return make(map[string]*grpc.ClientConn) } // RegisterGrpcServer 注册服务提供者 func RegisterGrpcServer(target, sid, addr string, ttl int64) error { serverMU.Lock() defer serverMU.Unlock() leaseID, err := common.NewLeaseAndKeepAlive(ttl) if err != nil { return err } _, err = etcd.Client().Put(context.Background(), target+"/"+sid, addr, clientv3.WithLease(leaseID)) if err != nil { return err } serverLeaseM[sid] = leaseID return nil } // UnRegisterGrpcServer 解注册服务提供者 func UnRegisterGrpcServer(sid string) { serverMU.Lock() defer serverMU.Unlock() if leaseID, ok := serverLeaseM[sid]; ok { _, err := etcd.Client().Revoke(context.Background(), leaseID) if err != nil { log.Errorf("server.go UnRegisterGrpcServer err: %v", err) } delete(serverLeaseM, sid) } } // 某个服务启动了 func onServerStart(data any) { if provider, ok := data.(*common.ServiceProvider); ok { serverMU.Lock() defer serverMU.Unlock() if v, ok := conn[provider.Target]; ok { v.Store(provider.SID, provider.Addr) } else { mgr := grpc_client.NewGrpcConnectionMgr() mgr.Store(provider.SID, provider.Addr) conn[provider.Target] = mgr } } } // 某个服务关闭了 func onServerStop(data any) { if provider, ok := data.(*common.ServiceProvider); ok { serverMU.Lock() defer serverMU.Unlock() if v, ok := conn[provider.Target]; ok { if v.Delete(provider.SID) == 0 { delete(conn, provider.Target) } } } }