Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【等CI结束就合并】feat: Implement the discovery service by Etcd #605

Merged
merged 15 commits into from
Feb 17, 2024
22 changes: 6 additions & 16 deletions pkg/discovery/etcd3.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
const (
clusterNameSplitChar = "-"
AddressSplitChar = ":"
yizhibian marked this conversation as resolved.
Show resolved Hide resolved
EtcdClusterPrefix = "registry-seata"
)

type EtcdRegistryService struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

给EtcdRegistryService编写完整的单元测试

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已添加

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iSuperCoder 你好,已经加上单元测试了,还需要补充什么吗?

Expand Down Expand Up @@ -68,7 +69,7 @@ func newEtcdRegistryService(config *ServiceConfig, etcd3Config *Etcd3Config) Reg
grouplist: grouplist,
stopCh: make(chan struct{}),
}
go etcdRegistryService.watch("registry-seata")
go etcdRegistryService.watch(EtcdClusterPrefix)

return etcdRegistryService
}
Expand Down Expand Up @@ -119,7 +120,7 @@ func (s *EtcdRegistryService) watch(key string) {
for _, event := range watchResp.Events {
switch event.Type {
case etcd3.EventTypePut:
fmt.Printf("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)
log.Infof("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)

k := event.Kv.Key
v := event.Kv.Value
Expand Down Expand Up @@ -148,8 +149,8 @@ func (s *EtcdRegistryService) watch(key string) {
s.rwLock.Unlock()

case etcd3.EventTypeDelete:
fmt.Printf("Key %s deleted.\n", event.Kv.Key)
// 进行你想要的操作
log.Infof("Key %s deleted.\n", event.Kv.Key)

cluster, ip, port, err := getClusterAndAddress(event.Kv.Key)
if err != nil {
log.Errorf("etcd key err: ", err)
Expand All @@ -167,7 +168,7 @@ func (s *EtcdRegistryService) watch(key string) {
s.rwLock.Unlock()
}
}
case <-s.stopCh: // stop信号
case <-s.stopCh:
log.Warn("stop etcd watch")
return
}
Expand Down Expand Up @@ -246,24 +247,13 @@ func removeValueFromList(list []*ServiceInstance, ip string, port int) []*Servic

func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
s.rwLock.RLock()
fmt.Println("lets begin")
cluster := s.vgroupMapping[key]
if cluster == "" {
s.rwLock.Unlock()
yizhibian marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("cluster doesnt exit")
}

list := s.grouplist[cluster]
//if len(list) == 0 {
// return nil, fmt.Errorf("service instance doesnt exit in %s", cluster)
//}

if len(list) != 0 {
for _, v := range list {
fmt.Println("here is instance", v.Addr, ":", v.Port)
}
}
fmt.Println("over")
s.rwLock.RUnlock()
yizhibian marked this conversation as resolved.
Show resolved Hide resolved
return list, nil
}
Expand Down