package discover import ( "common/db/etcd" "common/discover/common" "common/log" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "strconv" "sync" ) // 大量读少量写的情况下,读写锁比同步Map更高效 var ( instanceMU = sync.RWMutex{} instanceM = make(map[int64]string) // [uniqueNo]sid instanceLeaseM = make(map[int64]clientv3.LeaseID) // [uniqueNo] ) func init() { RegisterListener(common.ListenerTypeNewInstance, onInstanceStart) RegisterListener(common.ListenerTypeCloseInstance, onInstanceStop) } // FindInstanceByUniqueNo 根据唯一标识查询副本 func FindInstanceByUniqueNo(uniqueNO int64) (sid string) { instanceMU.RLock() defer instanceMU.RUnlock() if c, ok := instanceM[uniqueNO]; ok { return c } return } // RegisterInstance 注册副本 func RegisterInstance(sid int64, instanceID int, uniqueNo, ttl int64) error { serverMU.Lock() defer serverMU.Unlock() leaseID, err := common.NewLeaseAndKeepAlive(ttl) if err != nil { return err } key := fmt.Sprintf("%v/%v/%v", common.KeyDiscoverInstance, instanceID, uniqueNo) _, err = etcd.GetClient().Put(key, strconv.Itoa(int(sid)), clientv3.WithLease(leaseID)) if err != nil { return err } instanceLeaseM[uniqueNo] = leaseID return nil } // UnRegisterInstance 解注册副本 func UnRegisterInstance(uniqueNo int64) { serverMU.Lock() defer serverMU.Unlock() if leaseID, ok := instanceLeaseM[uniqueNo]; ok { _, err := etcd.GetClient().Revoke(leaseID) if err != nil { log.Errorf("UnRegisterInstance err: %v", err) } delete(instanceLeaseM, uniqueNo) } } // 某个副本启动了 func onInstanceStart(data any) { if provider, ok := data.(*common.InstanceProvider); ok { instanceMU.Lock() defer instanceMU.Unlock() instanceM[provider.UniqueNo] = provider.SID } } // 某个副本关闭了 func onInstanceStop(data any) { if provider, ok := data.(*common.InstanceProvider); ok { instanceMU.Lock() defer instanceMU.Unlock() delete(instanceM, provider.UniqueNo) } }