From 3522cfb22e7277ee95a8769ecdbdd15b23f13adc Mon Sep 17 00:00:00 2001 From: FanOne Date: Thu, 7 Mar 2024 13:53:36 +0000 Subject: [PATCH 1/8] feat:redis discovery --- go.mod | 2 + go.sum | 4 ++ pkg/discovery/config.go | 16 ++++++++ pkg/discovery/etcd3.go | 5 ++- pkg/discovery/etcd3_test.go | 9 +++-- pkg/discovery/redis.go | 77 +++++++++++++++++++++++++++++++++++-- 6 files changed, 104 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index eae719d06..8112241f6 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/creasty/defaults v1.5.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect @@ -80,6 +81,7 @@ require ( github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/redis/go-redis/v9 v9.5.1 // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect github.com/shirou/gopsutil/v3 v3.22.2 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect diff --git a/go.sum b/go.sum index 9a370bcc4..409354a33 100644 --- a/go.sum +++ b/go.sum @@ -171,6 +171,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dsnet/compress v0.0.1 h1:PlZu0n3Tuv04TzpfPbrnI0HW/YwodEXDS+oPKahKF0Q= github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo= @@ -667,6 +669,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= +github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= diff --git a/pkg/discovery/config.go b/pkg/discovery/config.go index c03c3da1f..908228ae5 100644 --- a/pkg/discovery/config.go +++ b/pkg/discovery/config.go @@ -90,3 +90,19 @@ func (cfg *Etcd3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server address of registry.") f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "http://localhost:2379", "The server address of registry.") } + +type RedisConfig struct { + Cluster string `yaml:"cluster" json:"cluster" koanf:"cluster"` + ServerAddr string `yaml:"server-addr" json:"server-addr" koanf:"server-addr"` + Username string `yaml:"username" json:"username" koanf:"username"` + Password string `yaml:"password" json:"password" koanf:"password"` + DB int `yaml:"db" json:"db" koanf:"db"` +} + +func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server address of redis.") + f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "http://localhost:2379", "The server address of redis.") + f.StringVar(&cfg.Username, prefix+".username", "redis", "The name of redis") + f.StringVar(&cfg.Password, prefix+".password", "", "The password of redis.") + f.IntVar(&cfg.DB, prefix+".db", 0, "The db of redis to discovery.") +} diff --git a/pkg/discovery/etcd3.go b/pkg/discovery/etcd3.go index 667a86058..08c170e5d 100644 --- a/pkg/discovery/etcd3.go +++ b/pkg/discovery/etcd3.go @@ -20,11 +20,12 @@ package discovery import ( "context" "fmt" - "github.com/seata/seata-go/pkg/util/log" - etcd3 "go.etcd.io/etcd/client/v3" "strconv" "strings" "sync" + + "github.com/seata/seata-go/pkg/util/log" + etcd3 "go.etcd.io/etcd/client/v3" ) const ( diff --git a/pkg/discovery/etcd3_test.go b/pkg/discovery/etcd3_test.go index fbde6fb43..5ba70663d 100644 --- a/pkg/discovery/etcd3_test.go +++ b/pkg/discovery/etcd3_test.go @@ -1,14 +1,15 @@ package discovery import ( + "reflect" + "testing" + "time" + "github.com/golang/mock/gomock" "github.com/seata/seata-go/pkg/discovery/mock" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/mvccpb" - "go.etcd.io/etcd/client/v3" - "reflect" - "testing" - "time" + clientv3 "go.etcd.io/etcd/client/v3" ) func TestEtcd3RegistryService_Lookup(t *testing.T) { diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go index 61adcc628..ab32fa306 100644 --- a/pkg/discovery/redis.go +++ b/pkg/discovery/redis.go @@ -17,11 +17,82 @@ package discovery -type RedisRegistryService struct{} +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/redis/go-redis/v9" + "github.com/seata/seata-go/pkg/util/log" +) + +const ( + RedisFilekeyPrefix = "registry.redis." +) + +type RedisRegistryService struct { + cli *redis.Client + + // serverMap the map of discovery server + // key: server name value: server address + serverMap *sync.Map + + ctx context.Context +} + +func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService { + if redisConfig == nil { + log.Fatalf("redis config is nil") + panic("redis config is nil") + } + + cfg := &redis.Options{ + Addr: redisConfig.ServerAddr, + Username: redisConfig.Username, + Password: redisConfig.Password, + DB: redisConfig.DB, + } + cli := redis.NewClient(cfg) + + redisRegistryService := &RedisRegistryService{ + cli: cli, + ctx: context.Background(), + } + + go redisRegistryService.watch() + + return redisRegistryService +} func (s *RedisRegistryService) Lookup(key string) ([]*ServiceInstance, error) { - //TODO implement me - panic("implement me") + insList, ok := s.serverMap.Load(key) + if !ok { + s.cli.SRandMember(s.ctx, key).Result() + } + return nil, nil +} + +func (s *RedisRegistryService) watch() error { + return nil +} + +func (s *RedisRegistryService) getRedisRegistryKey() string { + return fmt.Sprintf("%s%s", RedisFilekeyPrefix) +} + +func (s *RedisRegistryService) SetHeartBeat(key, addr string) error { + akey := "alives." + addr + + aliveDuration, _ := time.ParseDuration("10s") + s.cli.Set(s.ctx, akey, key, aliveDuration) + + select { + case <-time.After(aliveDuration): + go s.SetHeartBeat(key, addr) + } + + return nil } func (RedisRegistryService) Close() { From 531c5249fef5f7acc9754d257ab2bb2543998775 Mon Sep 17 00:00:00 2001 From: FanOne Date: Mon, 11 Mar 2024 16:12:23 +0000 Subject: [PATCH 2/8] feat:redis discovery --- go.mod | 2 +- go.sum | 2 ++ pkg/discovery/redis.go | 58 ++++++++++++++++++++++++++++++++++-------- 3 files changed, 51 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 8112241f6..f8bf9a200 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( require ( github.com/agiledragon/gomonkey v2.0.2+incompatible github.com/agiledragon/gomonkey/v2 v2.9.0 + github.com/redis/go-redis/v9 v9.5.1 go.etcd.io/etcd/api/v3 v3.5.6 go.etcd.io/etcd/client/v3 v3.5.6 ) @@ -81,7 +82,6 @@ require ( github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/redis/go-redis/v9 v9.5.1 // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect github.com/shirou/gopsutil/v3 v3.22.2 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect diff --git a/go.sum b/go.sum index 409354a33..9604fb523 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edY github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw= github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go index ab32fa306..49efe83ac 100644 --- a/pkg/discovery/redis.go +++ b/pkg/discovery/redis.go @@ -28,10 +28,18 @@ import ( ) const ( - RedisFilekeyPrefix = "registry.redis." + RedisFileKeyPrefix = "registry.redis." + + // redis registry key live 5 seconds, auto refresh key every 2 seconds + KeyTTL = 5 + KeyRefreshPeriod = 2 ) type RedisRegistryService struct { + // the config about redis + config *RedisConfig + + // client for redis cli *redis.Client // serverMap the map of discovery server @@ -56,8 +64,9 @@ func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) Re cli := redis.NewClient(cfg) redisRegistryService := &RedisRegistryService{ - cli: cli, - ctx: context.Background(), + config: redisConfig, + cli: cli, + ctx: context.Background(), } go redisRegistryService.watch() @@ -66,19 +75,47 @@ func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) Re } func (s *RedisRegistryService) Lookup(key string) ([]*ServiceInstance, error) { - insList, ok := s.serverMap.Load(key) + ins, ok := s.serverMap.Load(key) if !ok { - s.cli.SRandMember(s.ctx, key).Result() + insTmp, err := s.cli.Get(s.ctx, key).Result() + if err != nil { + return nil, err + } + } return nil, nil } -func (s *RedisRegistryService) watch() error { +func (s *RedisRegistryService) subscribe() error { + s.cli.Subscribe() + // 定时更新Map + go func() { + for range time.Tick(KeyRefreshPeriod * time.Millisecond) { + func() { + defer s.cli.Close() + updateClusterAddressMap(redisRegistryKey, cluster) + }() + } + }() + + // 定时订阅 + go func() { + for range time.Tick(1 * time.Millisecond) { + func() { + defer s.cli.Close() + s.cli.Subscribe(s.ctx, func() { + notifySub := NotifySub{listeners: listenerServiceMap[cluster]} + return notifySub + }(), redisRegistryKey) + }() + } + }() + return nil } func (s *RedisRegistryService) getRedisRegistryKey() string { - return fmt.Sprintf("%s%s", RedisFilekeyPrefix) + return fmt.Sprintf("%s%s", RedisFileKeyPrefix, s.config.Cluster) } func (s *RedisRegistryService) SetHeartBeat(key, addr string) error { @@ -95,7 +132,8 @@ func (s *RedisRegistryService) SetHeartBeat(key, addr string) error { return nil } -func (RedisRegistryService) Close() { - //TODO implement me - panic("implement me") +func (s *RedisRegistryService) Close() { + if s.cli != nil { + s.cli.Close() + } } From a40ea285bd6284a0fb140bb6b12b5ce8bd4f2fb0 Mon Sep 17 00:00:00 2001 From: FanOne Date: Tue, 12 Mar 2024 10:48:46 +0000 Subject: [PATCH 3/8] feat:discovery by redis --- pkg/discovery/redis.go | 301 ++++++++++++++++++++++++----------------- 1 file changed, 179 insertions(+), 122 deletions(-) diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go index 49efe83ac..0c6d4f577 100644 --- a/pkg/discovery/redis.go +++ b/pkg/discovery/redis.go @@ -15,125 +15,182 @@ * limitations under the License. */ -package discovery - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/redis/go-redis/v9" - "github.com/seata/seata-go/pkg/util/log" -) - -const ( - RedisFileKeyPrefix = "registry.redis." - - // redis registry key live 5 seconds, auto refresh key every 2 seconds - KeyTTL = 5 - KeyRefreshPeriod = 2 -) - -type RedisRegistryService struct { - // the config about redis - config *RedisConfig - - // client for redis - cli *redis.Client - - // serverMap the map of discovery server - // key: server name value: server address - serverMap *sync.Map - - ctx context.Context -} - -func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService { - if redisConfig == nil { - log.Fatalf("redis config is nil") - panic("redis config is nil") - } - - cfg := &redis.Options{ - Addr: redisConfig.ServerAddr, - Username: redisConfig.Username, - Password: redisConfig.Password, - DB: redisConfig.DB, - } - cli := redis.NewClient(cfg) - - redisRegistryService := &RedisRegistryService{ - config: redisConfig, - cli: cli, - ctx: context.Background(), - } - - go redisRegistryService.watch() - - return redisRegistryService -} - -func (s *RedisRegistryService) Lookup(key string) ([]*ServiceInstance, error) { - ins, ok := s.serverMap.Load(key) - if !ok { - insTmp, err := s.cli.Get(s.ctx, key).Result() - if err != nil { - return nil, err - } - - } - return nil, nil -} - -func (s *RedisRegistryService) subscribe() error { - s.cli.Subscribe() - // 定时更新Map - go func() { - for range time.Tick(KeyRefreshPeriod * time.Millisecond) { - func() { - defer s.cli.Close() - updateClusterAddressMap(redisRegistryKey, cluster) - }() - } - }() - - // 定时订阅 - go func() { - for range time.Tick(1 * time.Millisecond) { - func() { - defer s.cli.Close() - s.cli.Subscribe(s.ctx, func() { - notifySub := NotifySub{listeners: listenerServiceMap[cluster]} - return notifySub - }(), redisRegistryKey) - }() - } - }() - - return nil -} - -func (s *RedisRegistryService) getRedisRegistryKey() string { - return fmt.Sprintf("%s%s", RedisFileKeyPrefix, s.config.Cluster) -} - -func (s *RedisRegistryService) SetHeartBeat(key, addr string) error { - akey := "alives." + addr - - aliveDuration, _ := time.ParseDuration("10s") - s.cli.Set(s.ctx, akey, key, aliveDuration) - - select { - case <-time.After(aliveDuration): - go s.SetHeartBeat(key, addr) - } - - return nil -} - -func (s *RedisRegistryService) Close() { - if s.cli != nil { - s.cli.Close() - } -} + package discovery + + import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/redis/go-redis/v9" + + "github.com/seata/seata-go/pkg/util/log" + ) + + const ( + RedisFileKeyPrefix = "registry.redis." + + // redis registry key live 5 seconds, auto refresh key every 2 seconds + KeyTTL = 5 + KeyRefreshPeriod = 2 + ) + + type RedisConfig struct { + Cluster string `yaml:"cluster" json:"cluster" koanf:"cluster"` + ServerAddr string `yaml:"server-addr" json:"server-addr" koanf:"server-addr"` + Username string `yaml:"username" json:"username" koanf:"username"` + Password string `yaml:"password" json:"password" koanf:"password"` + DB int `yaml:"db" json:"db" koanf:"db"` + } + + type RedisRegistryService struct { + // the config about redis + config *RedisConfig + + // client for redis + cli *redis.Client + + // serverMap the map of discovery server + // key: server name value: server address + serverMap *sync.Map + + ctx context.Context + } + + func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService { + if redisConfig == nil { + log.Fatalf("redis config is nil") + panic("redis config is nil") + } + + cfg := &redis.Options{ + Addr: redisConfig.ServerAddr, + Username: redisConfig.Username, + Password: redisConfig.Password, + DB: redisConfig.DB, + } + cli := redis.NewClient(cfg) + + redisRegistryService := &RedisRegistryService{ + config: redisConfig, + cli: cli, + ctx: context.Background(), + } + + go redisRegistryService.subscribe() + + return redisRegistryService + } + + func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) { + r = make([]*ServiceInstance, 0) + ins, ok := s.serverMap.Load(key) + if !ok { + list, err := s.cli.HGetAll(s.ctx, key).Result() + if err != nil { + return nil, err + } + for _, v := range list { + addrList := strings.Split(v, ":") + if len(addrList) < 2 { + continue + } + addr := addrList[0] + port, _err := strconv.Atoi(addrList[1]) + if _err != nil { + continue + } + r = append(r, &ServiceInstance{ + Addr: addr, + Port: port, + }) + } + return + } + + r = append(r, ins.([]*ServiceInstance)...) + return + } + + func (s *RedisRegistryService) subscribe() error { + // 定时更新Map + go func() { + for range time.Tick(KeyRefreshPeriod * time.Millisecond) { + allServiceList, err := s.cli.HGetAll(s.ctx, "*").Result() + if err != nil { + return + } + + for k, v := range allServiceList { + addrList := strings.Split(v, ":") + if len(addrList) < 2 { + continue + } + addr := addrList[0] + port, _err := strconv.Atoi(addrList[1]) + if _err != nil { + continue + } + r = append(r, &ServiceInstance{ + Addr: addr, + Port: port, + }) + } + } + }() + + // // 定时订阅 + // go func() { + // for range time.Tick(1 * time.Millisecond) { + // func() { + // defer s.cli.Close() + // s.cli.Subscribe(s.ctx, func() { + // notifySub := NotifySub{listeners: listenerServiceMap[cluster]} + // return notifySub + // }()) + // }() + // } + // }() + + return nil + } + + func (s *RedisRegistryService) getRedisRegistryKey() string { + return fmt.Sprintf("%s%s", RedisFileKeyPrefix, s.config.Cluster) + } + + func (s *RedisRegistryService) register(key, value string) (err error) { + _, err = s.cli.HSet(s.ctx, key, value).Result() + if err != nil { + return + } + + go func() { + s.keepAlive(s.ctx, key) + }() + + return + } + + func (s *RedisRegistryService) Close() { + if s.cli != nil { + s.cli.Close() + } + } + + func (s *RedisRegistryService) keepAlive(ctx context.Context, key string) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.cli.Expire(ctx, key, 2*time.Second) + case <-ctx.Done(): + break + } + } + } + \ No newline at end of file From 1af1939e66f64f506889c9e511c9bb9e30e0a1b6 Mon Sep 17 00:00:00 2001 From: FanOne <294350394@qq.com> Date: Wed, 13 Mar 2024 00:41:10 +0800 Subject: [PATCH 4/8] feat:redis discovery --- pkg/discovery/redis.go | 355 ++++++++++++++++++++--------------------- 1 file changed, 176 insertions(+), 179 deletions(-) diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go index 0c6d4f577..e53d59f6c 100644 --- a/pkg/discovery/redis.go +++ b/pkg/discovery/redis.go @@ -15,182 +15,179 @@ * limitations under the License. */ - package discovery - - import ( - "context" - "fmt" - "strconv" - "strings" - "sync" - "time" - - "github.com/redis/go-redis/v9" - - "github.com/seata/seata-go/pkg/util/log" - ) - - const ( - RedisFileKeyPrefix = "registry.redis." - - // redis registry key live 5 seconds, auto refresh key every 2 seconds - KeyTTL = 5 - KeyRefreshPeriod = 2 - ) - - type RedisConfig struct { - Cluster string `yaml:"cluster" json:"cluster" koanf:"cluster"` - ServerAddr string `yaml:"server-addr" json:"server-addr" koanf:"server-addr"` - Username string `yaml:"username" json:"username" koanf:"username"` - Password string `yaml:"password" json:"password" koanf:"password"` - DB int `yaml:"db" json:"db" koanf:"db"` - } - - type RedisRegistryService struct { - // the config about redis - config *RedisConfig - - // client for redis - cli *redis.Client - - // serverMap the map of discovery server - // key: server name value: server address - serverMap *sync.Map - - ctx context.Context - } - - func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService { - if redisConfig == nil { - log.Fatalf("redis config is nil") - panic("redis config is nil") - } - - cfg := &redis.Options{ - Addr: redisConfig.ServerAddr, - Username: redisConfig.Username, - Password: redisConfig.Password, - DB: redisConfig.DB, - } - cli := redis.NewClient(cfg) - - redisRegistryService := &RedisRegistryService{ - config: redisConfig, - cli: cli, - ctx: context.Background(), - } - - go redisRegistryService.subscribe() - - return redisRegistryService - } - - func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) { - r = make([]*ServiceInstance, 0) - ins, ok := s.serverMap.Load(key) - if !ok { - list, err := s.cli.HGetAll(s.ctx, key).Result() - if err != nil { - return nil, err - } - for _, v := range list { - addrList := strings.Split(v, ":") - if len(addrList) < 2 { - continue - } - addr := addrList[0] - port, _err := strconv.Atoi(addrList[1]) - if _err != nil { - continue - } - r = append(r, &ServiceInstance{ - Addr: addr, - Port: port, - }) - } - return - } - - r = append(r, ins.([]*ServiceInstance)...) - return - } - - func (s *RedisRegistryService) subscribe() error { - // 定时更新Map - go func() { - for range time.Tick(KeyRefreshPeriod * time.Millisecond) { - allServiceList, err := s.cli.HGetAll(s.ctx, "*").Result() - if err != nil { - return - } - - for k, v := range allServiceList { - addrList := strings.Split(v, ":") - if len(addrList) < 2 { - continue - } - addr := addrList[0] - port, _err := strconv.Atoi(addrList[1]) - if _err != nil { - continue - } - r = append(r, &ServiceInstance{ - Addr: addr, - Port: port, - }) - } - } - }() - - // // 定时订阅 - // go func() { - // for range time.Tick(1 * time.Millisecond) { - // func() { - // defer s.cli.Close() - // s.cli.Subscribe(s.ctx, func() { - // notifySub := NotifySub{listeners: listenerServiceMap[cluster]} - // return notifySub - // }()) - // }() - // } - // }() - - return nil - } - - func (s *RedisRegistryService) getRedisRegistryKey() string { - return fmt.Sprintf("%s%s", RedisFileKeyPrefix, s.config.Cluster) - } - - func (s *RedisRegistryService) register(key, value string) (err error) { - _, err = s.cli.HSet(s.ctx, key, value).Result() - if err != nil { - return - } - - go func() { - s.keepAlive(s.ctx, key) - }() - - return - } - - func (s *RedisRegistryService) Close() { - if s.cli != nil { - s.cli.Close() - } - } - - func (s *RedisRegistryService) keepAlive(ctx context.Context, key string) { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - s.cli.Expire(ctx, key, 2*time.Second) - case <-ctx.Done(): - break - } - } - } - \ No newline at end of file +package discovery + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/redis/go-redis/v9" + + "github.com/seata/seata-go/pkg/util/log" +) + +const ( + RedisFileKeyPrefix = "registry.redis." + RedisRegisterChannel = "redis_registry_channel" + + // redis registry key live 5 seconds, auto refresh key every 2 seconds + KeyTTL = 5 + KeyRefreshPeriod = 2 +) + +type RedisRegistryService struct { + // the config about redis + config *RedisConfig + + // client for redis + cli *redis.Client + + // serverMap the map of discovery server + // key: server name value: server address + serverMap *sync.Map + + ctx context.Context +} + +func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService { + if redisConfig == nil { + log.Fatalf("redis config is nil") + panic("redis config is nil") + } + + cfg := &redis.Options{ + Addr: redisConfig.ServerAddr, + Username: redisConfig.Username, + Password: redisConfig.Password, + DB: redisConfig.DB, + } + cli := redis.NewClient(cfg) + + redisRegistryService := &RedisRegistryService{ + config: redisConfig, + cli: cli, + ctx: context.Background(), + } + + go redisRegistryService.subscribe() + + return redisRegistryService +} + +func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) { + r = make([]*ServiceInstance, 0) + ins, ok := s.serverMap.Load(key) + if !ok { + list, err := s.cli.HGetAll(s.ctx, key).Result() + if err != nil { + return nil, err + } + for _, v := range list { + addrList := strings.Split(v, ":") + if len(addrList) < 2 { + continue + } + addr := addrList[0] + port, _err := strconv.Atoi(addrList[1]) + if _err != nil { + continue + } + r = append(r, &ServiceInstance{ + Addr: addr, + Port: port, + }) + } + return + } + + r = append(r, ins.([]*ServiceInstance)...) + return +} + +func (s *RedisRegistryService) subscribe() (err error) { + // 实时更新 + go func() { + for range time.Tick(KeyRefreshPeriod * time.Millisecond) { + func() { + defer s.Close() + // updateClusterAddressMap(jedis, redisRegistryKey, cluster) + // 获取所有的key,然后进行更新 + // 更新所有的map + }() + } + }() + + // 定时订阅 + go func() { + for range time.Tick(1 * time.Millisecond) { + func() { + // 订阅更新Map + msgs := s.cli.Subscribe(s.ctx, RedisRegisterChannel).Channel() + for msg := range msgs { + var data *NotifyMessage + err = json.Unmarshal([]byte(msg.Payload), &data) + if err != nil { + log.Errorf("RedisRegistryService-subscribe:%+v", err) + continue + } + s.serverMap.Store(data.Key, data.Value) + } + }() + } + }() + + return +} + +func (s *RedisRegistryService) getRedisRegistryKey() string { + return fmt.Sprintf("%s%s", RedisFileKeyPrefix, s.config.Cluster) +} + +type NotifyMessage struct { + Key string `json:"key"` + Value string `json:"value"` +} + +func (s *RedisRegistryService) register(key, value string) (err error) { + _, err = s.cli.HSet(s.ctx, key, value).Result() + if err != nil { + return + } + + msg := &NotifyMessage{ + Key: key, + Value: value, + } + + s.cli.Publish(s.ctx, RedisRegisterChannel, msg) + + go func() { + s.keepAlive(s.ctx, key) + }() + + return +} + +func (s *RedisRegistryService) Close() { + if s.cli != nil { + s.cli.Close() + } +} + +func (s *RedisRegistryService) keepAlive(ctx context.Context, key string) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.cli.Expire(ctx, key, 2*time.Second) + case <-ctx.Done(): + break + } + } +} From 5d4a870b886ce6749611c27f2b3ed9bbe830e0b7 Mon Sep 17 00:00:00 2001 From: FanOne <294350394@qq.com> Date: Sat, 16 Mar 2024 15:19:38 +0800 Subject: [PATCH 5/8] feat:test redis --- pkg/discovery/config.go | 2 + pkg/discovery/redis.go | 94 +++++++++++++++---------------------- pkg/discovery/redis_test.go | 39 +++++++++++++++ 3 files changed, 80 insertions(+), 55 deletions(-) create mode 100644 pkg/discovery/redis_test.go diff --git a/pkg/discovery/config.go b/pkg/discovery/config.go index 908228ae5..49cec177a 100644 --- a/pkg/discovery/config.go +++ b/pkg/discovery/config.go @@ -42,6 +42,7 @@ type RegistryConfig struct { File FileConfig `yaml:"file" json:"file" koanf:"file"` Nacos NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"` Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"` + Redis RedisConfig `yaml:"redis" json:"redis" koanf:"redis"` } func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -49,6 +50,7 @@ func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSe cfg.File.RegisterFlagsWithPrefix(prefix+".file", f) cfg.Nacos.RegisterFlagsWithPrefix(prefix+".nacos", f) cfg.Etcd3.RegisterFlagsWithPrefix(prefix+".etcd3", f) + cfg.Redis.RegisterFlagsWithPrefix(prefix+".redis", f) } type FileConfig struct { diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go index e53d59f6c..7fb558238 100644 --- a/pkg/discovery/redis.go +++ b/pkg/discovery/redis.go @@ -21,8 +21,6 @@ import ( "context" "encoding/json" "fmt" - "strconv" - "strings" "sync" "time" @@ -32,12 +30,10 @@ import ( ) const ( - RedisFileKeyPrefix = "registry.redis." - RedisRegisterChannel = "redis_registry_channel" + redisFileKeyPrefix = "registry.redis." + redisRegisterChannel = "redis_registry_channel" - // redis registry key live 5 seconds, auto refresh key every 2 seconds - KeyTTL = 5 - KeyRefreshPeriod = 2 + keyRefreshPeriod = 2 ) type RedisRegistryService struct { @@ -54,6 +50,11 @@ type RedisRegistryService struct { ctx context.Context } +type NotifyMessage struct { + Key string `json:"key"` + Value string `json:"value"` +} + func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService { if redisConfig == nil { log.Fatalf("redis config is nil") @@ -80,28 +81,10 @@ func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) Re } func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) { + r = make([]*ServiceInstance, 0) ins, ok := s.serverMap.Load(key) if !ok { - list, err := s.cli.HGetAll(s.ctx, key).Result() - if err != nil { - return nil, err - } - for _, v := range list { - addrList := strings.Split(v, ":") - if len(addrList) < 2 { - continue - } - addr := addrList[0] - port, _err := strconv.Atoi(addrList[1]) - if _err != nil { - continue - } - r = append(r, &ServiceInstance{ - Addr: addr, - Port: port, - }) - } return } @@ -109,48 +92,49 @@ func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err err return } -func (s *RedisRegistryService) subscribe() (err error) { - // 实时更新 +func (s *RedisRegistryService) subscribe() { + // regular update all keys each 2 second go func() { - for range time.Tick(KeyRefreshPeriod * time.Millisecond) { + for range time.Tick(keyRefreshPeriod * time.Second) { func() { defer s.Close() - // updateClusterAddressMap(jedis, redisRegistryKey, cluster) - // 获取所有的key,然后进行更新 - // 更新所有的map - }() - } - }() - - // 定时订阅 - go func() { - for range time.Tick(1 * time.Millisecond) { - func() { - // 订阅更新Map - msgs := s.cli.Subscribe(s.ctx, RedisRegisterChannel).Channel() - for msg := range msgs { - var data *NotifyMessage - err = json.Unmarshal([]byte(msg.Payload), &data) + // find all key and update value + keys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result() + if err != nil { + log.Errorf("RedisRegistryService-Scan-Key-Error:%+v", err) + return + } + for _, key := range keys { + val, err := s.cli.Get(s.ctx, key).Result() if err != nil { - log.Errorf("RedisRegistryService-subscribe:%+v", err) + log.Errorf("RedisRegistryService-Get-Key:%+v, Err:", key, err) continue } - s.serverMap.Store(data.Key, data.Value) + s.serverMap.Store(key, val) } }() } }() + // real time subscripting + go func() { + msgs := s.cli.Subscribe(s.ctx, redisRegisterChannel).Channel() + for msg := range msgs { + var data *NotifyMessage + err := json.Unmarshal([]byte(msg.Payload), &data) + if err != nil { + log.Errorf("RedisRegistryService-subscribe-Subscribe:%+v", err) + continue + } + s.serverMap.Store(data.Key, data.Value) + } + }() + return } func (s *RedisRegistryService) getRedisRegistryKey() string { - return fmt.Sprintf("%s%s", RedisFileKeyPrefix, s.config.Cluster) -} - -type NotifyMessage struct { - Key string `json:"key"` - Value string `json:"value"` + return fmt.Sprintf("%s%s", redisFileKeyPrefix, s.config.Cluster) } func (s *RedisRegistryService) register(key, value string) (err error) { @@ -164,7 +148,7 @@ func (s *RedisRegistryService) register(key, value string) (err error) { Value: value, } - s.cli.Publish(s.ctx, RedisRegisterChannel, msg) + s.cli.Publish(s.ctx, redisRegisterChannel, msg) go func() { s.keepAlive(s.ctx, key) @@ -175,7 +159,7 @@ func (s *RedisRegistryService) register(key, value string) (err error) { func (s *RedisRegistryService) Close() { if s.cli != nil { - s.cli.Close() + _ = s.cli.Close() } } diff --git a/pkg/discovery/redis_test.go b/pkg/discovery/redis_test.go new file mode 100644 index 000000000..ffb5a58a5 --- /dev/null +++ b/pkg/discovery/redis_test.go @@ -0,0 +1,39 @@ +package discovery + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_newRedisRegisterService(t *testing.T) { + type args struct { + config *ServiceConfig + redisConfig *RedisConfig + } + redisConfig := &RedisConfig{ + Cluster: "default", + ServerAddr: "localhost:2379", + Username: "redis", + Password: "", + DB: 0, + } + tests := []struct { + name string + args args + want RegistryService + }{ + { + name: "default1", + args: args{ + config: nil, + redisConfig: redisConfig, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, newRedisRegisterService(tt.args.config, tt.args.redisConfig), "newRedisRegisterService(%v, %v)", tt.args.config, tt.args.redisConfig) + }) + } +} From 1e74958caf8330e373ec7fc979b0edb8eca6efcc Mon Sep 17 00:00:00 2001 From: FanOne <294350394@qq.com> Date: Sat, 16 Mar 2024 18:37:31 +0800 Subject: [PATCH 6/8] feat:redis discovery --- go.mod | 1 + go.sum | 6 +++ pkg/discovery/config.go | 2 +- pkg/discovery/redis.go | 80 ++++++++++++++++++++++++------------- pkg/discovery/redis_test.go | 71 ++++++++++++++++++++++++-------- 5 files changed, 116 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index f8bf9a200..034963efc 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( require ( github.com/agiledragon/gomonkey v2.0.2+incompatible github.com/agiledragon/gomonkey/v2 v2.9.0 + github.com/go-redis/redismock/v9 v9.2.0 github.com/redis/go-redis/v9 v9.5.1 go.etcd.io/etcd/api/v3 v3.5.6 go.etcd.io/etcd/client/v3 v3.5.6 diff --git a/go.sum b/go.sum index 9604fb523..503631af9 100644 --- a/go.sum +++ b/go.sum @@ -260,6 +260,8 @@ github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91 github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js= github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw= +github.com/go-redis/redismock/v9 v9.2.0/go.mod h1:18KHfGDK4Y6c2R0H38EUGWAdc7ZQS9gfYxc94k7rWT0= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= @@ -574,6 +576,7 @@ github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= @@ -581,7 +584,9 @@ github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:v github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -1250,6 +1255,7 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXL gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/pkg/discovery/config.go b/pkg/discovery/config.go index 49cec177a..ed9681e8c 100644 --- a/pkg/discovery/config.go +++ b/pkg/discovery/config.go @@ -103,7 +103,7 @@ type RedisConfig struct { func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server address of redis.") - f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "http://localhost:2379", "The server address of redis.") + f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "http://localhost:6379", "The server address of redis.") f.StringVar(&cfg.Username, prefix+".username", "redis", "The name of redis") f.StringVar(&cfg.Password, prefix+".password", "", "The password of redis.") f.IntVar(&cfg.DB, prefix+".db", 0, "The db of redis to discovery.") diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go index 7fb558238..2afb1752a 100644 --- a/pkg/discovery/redis.go +++ b/pkg/discovery/redis.go @@ -21,6 +21,8 @@ import ( "context" "encoding/json" "fmt" + "strconv" + "strings" "sync" "time" @@ -70,52 +72,76 @@ func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) Re cli := redis.NewClient(cfg) redisRegistryService := &RedisRegistryService{ - config: redisConfig, - cli: cli, - ctx: context.Background(), + config: redisConfig, + cli: cli, + ctx: context.Background(), + serverMap: new(sync.Map), } + go redisRegistryService.load() go redisRegistryService.subscribe() + go redisRegistryService.flush() return redisRegistryService } func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) { - r = make([]*ServiceInstance, 0) - ins, ok := s.serverMap.Load(key) + value, ok := s.serverMap.Load(key) if !ok { return } + val, ok := value.(string) + if !ok || val == "" { + return + } + addrList := strings.Split(val, ":") + if len(addrList) < 2 { + return + } + addr := addrList[0] + port, _err := strconv.Atoi(addrList[1]) + if _err != nil { + return + } + r = append(r, &ServiceInstance{ + Addr: addr, + Port: port, + }) - r = append(r, ins.([]*ServiceInstance)...) return } -func (s *RedisRegistryService) subscribe() { +func (s *RedisRegistryService) flush() { // regular update all keys each 2 second - go func() { - for range time.Tick(keyRefreshPeriod * time.Second) { - func() { - defer s.Close() - // find all key and update value - keys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result() - if err != nil { - log.Errorf("RedisRegistryService-Scan-Key-Error:%+v", err) - return - } - for _, key := range keys { - val, err := s.cli.Get(s.ctx, key).Result() - if err != nil { - log.Errorf("RedisRegistryService-Get-Key:%+v, Err:", key, err) - continue - } - s.serverMap.Store(key, val) - } - }() + ticker := time.NewTicker(keyRefreshPeriod * time.Second) + defer ticker.Stop() + func(t *time.Ticker) { + // find all key and update value + for { + <-t.C + s.load() } - }() + }(ticker) +} + +func (s *RedisRegistryService) load() { + keys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result() + if err != nil { + log.Errorf("RedisRegistryService-Scan-Key-Error:%s", err) + return + } + for _, key := range keys { + val, err := s.cli.Get(s.ctx, key).Result() + if err != nil { + log.Errorf("RedisRegistryService-Get-Key:%s, Err:%s", key, err) + continue + } + s.serverMap.Store(key, val) + } +} +func (s *RedisRegistryService) subscribe() { // real time subscripting go func() { msgs := s.cli.Subscribe(s.ctx, redisRegisterChannel).Channel() diff --git a/pkg/discovery/redis_test.go b/pkg/discovery/redis_test.go index ffb5a58a5..b61f41fae 100644 --- a/pkg/discovery/redis_test.go +++ b/pkg/discovery/redis_test.go @@ -1,39 +1,78 @@ package discovery import ( + "context" + "fmt" + "sync" "testing" + "time" + "github.com/go-redis/redismock/v9" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" ) -func Test_newRedisRegisterService(t *testing.T) { - type args struct { - config *ServiceConfig - redisConfig *RedisConfig +func TestRedisRegistryService_Lookup(t *testing.T) { + cli, _ := redismock.NewClientMock() + + type fields struct { + config *RedisConfig + cli *redis.Client + serverMap *sync.Map + ctx context.Context } - redisConfig := &RedisConfig{ + config := &RedisConfig{ Cluster: "default", - ServerAddr: "localhost:2379", - Username: "redis", - Password: "", + ServerAddr: "localhost:6379", + Username: "", + Password: "123456", DB: 0, } + type args struct { + key string + } + ctx := context.Background() + _, err := cli.Ping(ctx).Result() + if err != nil { + fmt.Println("ping error", err) + } tests := []struct { - name string - args args - want RegistryService + name string + fields fields + args args + wantR []*ServiceInstance + wantErr assert.ErrorAssertionFunc }{ { - name: "default1", - args: args{ - config: nil, - redisConfig: redisConfig, + name: "default", + fields: fields{ + config: config, + cli: cli, + serverMap: &sync.Map{}, + ctx: ctx, }, + args: args{key: "registry.redis.apple"}, + wantR: nil, + wantErr: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, newRedisRegisterService(tt.args.config, tt.args.redisConfig), "newRedisRegisterService(%v, %v)", tt.args.config, tt.args.redisConfig) + cfgRedis := &RedisRegistryService{ + config: tt.fields.config, + cli: tt.fields.cli, + serverMap: tt.fields.serverMap, + ctx: tt.fields.ctx, + } + s := newRedisRegisterService(nil, cfgRedis.config) + time.Sleep(1 * time.Second) + gotR, err := s.Lookup(tt.args.key) + if err != nil { + return + } + for _, v := range gotR { + fmt.Println(v) + } }) } } From 792a1fcac25dba3a9894659c5ffc585874664eed Mon Sep 17 00:00:00 2001 From: FanOne <294350394@qq.com> Date: Sat, 23 Mar 2024 12:31:33 +0800 Subject: [PATCH 7/8] feat: add cluster name & code comments --- pkg/discovery/etcd3.go | 3 +- pkg/discovery/redis.go | 176 ++++++++++++++++++++++++++++-------- pkg/discovery/redis_test.go | 81 +++++++++-------- 3 files changed, 185 insertions(+), 75 deletions(-) diff --git a/pkg/discovery/etcd3.go b/pkg/discovery/etcd3.go index 08c170e5d..5fe959044 100644 --- a/pkg/discovery/etcd3.go +++ b/pkg/discovery/etcd3.go @@ -24,8 +24,9 @@ import ( "strings" "sync" - "github.com/seata/seata-go/pkg/util/log" etcd3 "go.etcd.io/etcd/client/v3" + + "github.com/seata/seata-go/pkg/util/log" ) const ( diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go index 2afb1752a..100027912 100644 --- a/pkg/discovery/redis.go +++ b/pkg/discovery/redis.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strconv" "strings" "sync" @@ -32,10 +33,25 @@ import ( ) const ( - redisFileKeyPrefix = "registry.redis." + // key = registry.redis.${cluster}_ip:port + + // redisFileKeyPrefix the redis register key prefix + redisFileKeyPrefix = "registry.redis." + + // redisAddressSplitChar A notation for split ip addresses and ports + redisAddressSplitChar = "_" + + // redisRegisterChannel the channel for redis to publish/subscript key&value redisRegisterChannel = "redis_registry_channel" + // defaultCluster default cluster name + defaultCluster = "default" + + // keyRefreshPeriod frequency of refreshing each key keyRefreshPeriod = 2 + + // regexClusterName the regular expression for find the cluster name + regexClusterName = `registry\.redis\.(\w+)_` ) type RedisRegistryService struct { @@ -45,18 +61,28 @@ type RedisRegistryService struct { // client for redis cli *redis.Client - // serverMap the map of discovery server - // key: server name value: server address - serverMap *sync.Map + // rwLock lock groupList when update service instance + rwLock *sync.RWMutex + + // vgroupMapping to store the cluster group + // eg: map[cluster_name_key]cluster_name + vgroupMapping map[string]string + + // grouplist store all addresses under this cluster + // eg: map[cluster_name][]{service_instance1,service_instance2...} + groupList map[string][]*ServiceInstance ctx context.Context } +// NotifyMessage redis subscript structure type NotifyMessage struct { + // key = registry.redis.${cluster}_ip:port Key string `json:"key"` Value string `json:"value"` } +// newRedisRegisterService init the redis register service func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService { if redisConfig == nil { log.Fatalf("redis config is nil") @@ -71,98 +97,173 @@ func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) Re } cli := redis.NewClient(cfg) + vgroupMapping := config.VgroupMapping + groupList := make(map[string][]*ServiceInstance) + redisRegistryService := &RedisRegistryService{ - config: redisConfig, - cli: cli, - ctx: context.Background(), - serverMap: new(sync.Map), + config: redisConfig, + cli: cli, + ctx: context.Background(), + rwLock: &sync.RWMutex{}, + vgroupMapping: vgroupMapping, + groupList: groupList, } - go redisRegistryService.load() + // loading all server at init time + redisRegistryService.load() + // subscribe at real time go redisRegistryService.subscribe() + // flushing all server at regular time go redisRegistryService.flush() return redisRegistryService } func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) { - r = make([]*ServiceInstance, 0) - value, ok := s.serverMap.Load(key) - if !ok { - return - } - val, ok := value.(string) - if !ok || val == "" { - return - } - addrList := strings.Split(val, ":") - if len(addrList) < 2 { - return + s.rwLock.RLock() + defer s.rwLock.RUnlock() + cluster := s.vgroupMapping[key] + if cluster == "" { + return nil, fmt.Errorf("cluster doesnt exit") } - addr := addrList[0] - port, _err := strconv.Atoi(addrList[1]) - if _err != nil { - return - } - r = append(r, &ServiceInstance{ - Addr: addr, - Port: port, - }) + + r = s.groupList[cluster] return } +// flush regular update all keys each 2 seconds func (s *RedisRegistryService) flush() { - // regular update all keys each 2 second ticker := time.NewTicker(keyRefreshPeriod * time.Second) defer ticker.Stop() func(t *time.Ticker) { // find all key and update value for { - <-t.C s.load() + + <-t.C } }(ticker) } +// load loading all key & value into map func (s *RedisRegistryService) load() { + // find all the server list redis register by redisFileKeyPrefix keys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result() if err != nil { log.Errorf("RedisRegistryService-Scan-Key-Error:%s", err) return } for _, key := range keys { + clusterName := s.getClusterNameByKey(key) val, err := s.cli.Get(s.ctx, key).Result() if err != nil { log.Errorf("RedisRegistryService-Get-Key:%s, Err:%s", key, err) continue } - s.serverMap.Store(key, val) + ins, err := s.getServerInstance(val) + if err != nil { + log.Errorf("RedisRegistryService-getServerInstance-val:%s, Err:%s", val, err) + continue + } + // put server instance list in group list + s.rwLock.Lock() + if s.groupList[clusterName] == nil { + s.groupList[clusterName] = make([]*ServiceInstance, 0) + } + s.groupList[clusterName] = append(s.groupList[clusterName], ins) + s.rwLock.Unlock() } } +// subscribe real time subscripting the change of data func (s *RedisRegistryService) subscribe() { - // real time subscripting go func() { msgs := s.cli.Subscribe(s.ctx, redisRegisterChannel).Channel() for msg := range msgs { var data *NotifyMessage err := json.Unmarshal([]byte(msg.Payload), &data) if err != nil { - log.Errorf("RedisRegistryService-subscribe-Subscribe:%+v", err) + log.Errorf("RedisRegistryService-subscribe-Subscribe-Err:%+v", err) continue } - s.serverMap.Store(data.Key, data.Value) + // get cluster name by key + clusterName := s.getClusterNameByKey(data.Key) + ins, err := s.getServerInstance(data.Value) + if err != nil { + log.Errorf("RedisRegistryService-subscribe-getServerInstance-value:%s, Err:%s", data.Value, err) + continue + } + s.rwLock.Lock() + if s.groupList[clusterName] == nil { + s.groupList[clusterName] = make([]*ServiceInstance, 0) + } + s.groupList[clusterName] = append(s.groupList[clusterName], ins) + s.rwLock.Unlock() } }() return } +// getClusterNameByKey get the cluster name by key +func (s *RedisRegistryService) getClusterNameByKey(key string) string { + // key = registry.redis.${cluster}_ip:port + re := regexp.MustCompile(regexClusterName) + match := re.FindStringSubmatch(key) + // if we find the match , use the match result + if len(match) > 1 { + return match[1] + } + // if not find , return default cluster name for underwriting + return s.getDefaultClusterName() +} + +// getDefaultClusterName get default cluster name +func (s *RedisRegistryService) getDefaultClusterName() string { + if s.config != nil && s.config.Cluster != "" { + return s.config.Cluster + } + return defaultCluster +} + +// getServerInstance parse ip address and port from value +func (s *RedisRegistryService) getServerInstance(value string) (ins *ServiceInstance, err error) { + valueSplit := strings.Split(value, redisAddressSplitChar) + if len(valueSplit) != 2 { + err = fmt.Errorf("redis value has an incorrect format. value: %s", value) + return + } + ip := valueSplit[0] + port, err := strconv.Atoi(valueSplit[1]) + if err != nil { + err = fmt.Errorf("redis port has an incorrect format. err: %w", err) + return + } + ins = &ServiceInstance{ + Addr: ip, + Port: port, + } + + return +} + +// getKey +// @param: addr the register service ip & port +// eg: localhost:7455 +func (s *RedisRegistryService) getKey(addr string) string { + return fmt.Sprintf("%s_%s", s.getRedisRegistryKey(), addr) +} + func (s *RedisRegistryService) getRedisRegistryKey() string { - return fmt.Sprintf("%s%s", redisFileKeyPrefix, s.config.Cluster) + if s.config != nil && s.config.Cluster != "" { + return fmt.Sprintf("%s%s", redisFileKeyPrefix, s.config.Cluster) + } + + return fmt.Sprintf("%s%s", redisFileKeyPrefix, defaultCluster) } +// register register to redis func (s *RedisRegistryService) register(key, value string) (err error) { _, err = s.cli.HSet(s.ctx, key, value).Result() if err != nil { @@ -189,6 +290,7 @@ func (s *RedisRegistryService) Close() { } } +// keepAlive keep every key alive func (s *RedisRegistryService) keepAlive(ctx context.Context, key string) { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() diff --git a/pkg/discovery/redis_test.go b/pkg/discovery/redis_test.go index b61f41fae..d3802d60b 100644 --- a/pkg/discovery/redis_test.go +++ b/pkg/discovery/redis_test.go @@ -3,9 +3,9 @@ package discovery import ( "context" "fmt" + "regexp" "sync" "testing" - "time" "github.com/go-redis/redismock/v9" "github.com/redis/go-redis/v9" @@ -13,29 +13,22 @@ import ( ) func TestRedisRegistryService_Lookup(t *testing.T) { - cli, _ := redismock.NewClientMock() - + db, _ := redismock.NewClientMock() type fields struct { - config *RedisConfig - cli *redis.Client - serverMap *sync.Map - ctx context.Context - } - config := &RedisConfig{ - Cluster: "default", - ServerAddr: "localhost:6379", - Username: "", - Password: "123456", - DB: 0, + config *RedisConfig + cli *redis.Client + rwLock *sync.RWMutex + vgroupMapping map[string]string + groupList map[string][]*ServiceInstance + ctx context.Context } type args struct { key string } - ctx := context.Background() - _, err := cli.Ping(ctx).Result() - if err != nil { - fmt.Println("ping error", err) + redisConfig := &RedisConfig{ + Cluster: "default", } + ctx := context.Background() tests := []struct { name string fields fields @@ -44,35 +37,49 @@ func TestRedisRegistryService_Lookup(t *testing.T) { wantErr assert.ErrorAssertionFunc }{ { - name: "default", + name: "default1", fields: fields{ - config: config, - cli: cli, - serverMap: &sync.Map{}, - ctx: ctx, + config: redisConfig, + cli: db, + rwLock: &sync.RWMutex{}, + vgroupMapping: map[string]string{}, + groupList: map[string][]*ServiceInstance{}, + ctx: ctx, }, - args: args{key: "registry.redis.apple"}, - wantR: nil, - wantErr: nil, + args: args{key: ""}, + wantR: make([]*ServiceInstance, 0), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfgRedis := &RedisRegistryService{ - config: tt.fields.config, - cli: tt.fields.cli, - serverMap: tt.fields.serverMap, - ctx: tt.fields.ctx, + s := &RedisRegistryService{ + config: tt.fields.config, + cli: tt.fields.cli, + rwLock: tt.fields.rwLock, + vgroupMapping: tt.fields.vgroupMapping, + groupList: tt.fields.groupList, + ctx: tt.fields.ctx, } - s := newRedisRegisterService(nil, cfgRedis.config) - time.Sleep(1 * time.Second) gotR, err := s.Lookup(tt.args.key) - if err != nil { + if !tt.wantErr(t, err, fmt.Sprintf("Lookup(%v)", tt.args.key)) { return } - for _, v := range gotR { - fmt.Println(v) - } + assert.Equalf(t, tt.wantR, gotR, "Lookup(%v)", tt.args.key) }) } } + +func TestRedisCluster(t *testing.T) { + input := "registry.redis.my_cluster_localhost:3306" + regex := `registry\.redis\.(\w+)_` + + re := regexp.MustCompile(regex) + match := re.FindStringSubmatch(input) + + if len(match) > 1 { + cluster := match[1] + fmt.Println("Cluster:", cluster) + } else { + fmt.Println("No match found.") + } +} From a41aa772fc45e20432b2c3783169579749eccee5 Mon Sep 17 00:00:00 2001 From: FanOne <294350394@qq.com> Date: Sat, 23 Mar 2024 13:45:54 +0800 Subject: [PATCH 8/8] feat:discovery by redis --- pkg/discovery/init.go | 17 ++++---- pkg/discovery/redis.go | 9 +++-- pkg/discovery/redis_test.go | 80 ++++++++++++++++++------------------- 3 files changed, 54 insertions(+), 52 deletions(-) diff --git a/pkg/discovery/init.go b/pkg/discovery/init.go index 8e7f5e38f..295cbca70 100644 --- a/pkg/discovery/init.go +++ b/pkg/discovery/init.go @@ -30,23 +30,24 @@ func InitRegistry(serviceConfig *ServiceConfig, registryConfig *RegistryConfig) var err error switch registryConfig.Type { case FILE: - //init file registry + // init file registry registryService = newFileRegistryService(serviceConfig) case ETCD: - //init etcd registry + // init etcd registry registryService = newEtcdRegistryService(serviceConfig, ®istryConfig.Etcd3) case NACOS: - //TODO: init nacos registry + // TODO: init nacos registry case EUREKA: - //TODO: init eureka registry + // TODO: init eureka registry case REDIS: - //TODO: init redis registry + // init redis registry + registryService = newRedisRegisterService(serviceConfig, ®istryConfig.Redis) case ZK: - //TODO: init zk registry + // TODO: init zk registry case CONSUL: - //TODO: init consul registry + // TODO: init consul registry case SOFA: - //TODO: init sofa registry + // TODO: init sofa registry default: err = fmt.Errorf("service registry not support registry type:%s", registryConfig.Type) } diff --git a/pkg/discovery/redis.go b/pkg/discovery/redis.go index 100027912..083b79fa3 100644 --- a/pkg/discovery/redis.go +++ b/pkg/discovery/redis.go @@ -39,7 +39,7 @@ const ( redisFileKeyPrefix = "registry.redis." // redisAddressSplitChar A notation for split ip addresses and ports - redisAddressSplitChar = "_" + redisAddressSplitChar = ":" // redisRegisterChannel the channel for redis to publish/subscript key&value redisRegisterChannel = "redis_registry_channel" @@ -68,7 +68,7 @@ type RedisRegistryService struct { // eg: map[cluster_name_key]cluster_name vgroupMapping map[string]string - // grouplist store all addresses under this cluster + // groupList store all addresses under this cluster // eg: map[cluster_name][]{service_instance1,service_instance2...} groupList map[string][]*ServiceInstance @@ -114,7 +114,7 @@ func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) Re // subscribe at real time go redisRegistryService.subscribe() // flushing all server at regular time - go redisRegistryService.flush() + // go redisRegistryService.flush() return redisRegistryService } @@ -124,7 +124,8 @@ func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err err defer s.rwLock.RUnlock() cluster := s.vgroupMapping[key] if cluster == "" { - return nil, fmt.Errorf("cluster doesnt exit") + err = fmt.Errorf("cluster doesnt exit") + return } r = s.groupList[cluster] diff --git a/pkg/discovery/redis_test.go b/pkg/discovery/redis_test.go index d3802d60b..259466199 100644 --- a/pkg/discovery/redis_test.go +++ b/pkg/discovery/redis_test.go @@ -1,70 +1,70 @@ package discovery import ( - "context" "fmt" + "reflect" "regexp" - "sync" "testing" + "time" - "github.com/go-redis/redismock/v9" - "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" ) func TestRedisRegistryService_Lookup(t *testing.T) { - db, _ := redismock.NewClientMock() - type fields struct { - config *RedisConfig - cli *redis.Client - rwLock *sync.RWMutex - vgroupMapping map[string]string - groupList map[string][]*ServiceInstance - ctx context.Context + // db, mockClient := redismock.NewClientMock() + serviceConfig := &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, } type args struct { key string } redisConfig := &RedisConfig{ - Cluster: "default", + Cluster: "default", + ServerAddr: "localhost:6379", + Username: "", + Password: "123456", + DB: 0, } - ctx := context.Background() + // ctrl := gomock.NewController(t) + // mockRedisClient := mock.NewMockRedisClient(ctrl) tests := []struct { - name string - fields fields - args args - wantR []*ServiceInstance - wantErr assert.ErrorAssertionFunc + name string + args args + wantR []*ServiceInstance }{ { - name: "default1", - fields: fields{ - config: redisConfig, - cli: db, - rwLock: &sync.RWMutex{}, - vgroupMapping: map[string]string{}, - groupList: map[string][]*ServiceInstance{}, - ctx: ctx, + name: "default", + args: args{key: "registry.redis.default_localhost:8888"}, + wantR: []*ServiceInstance{ + { + Addr: "localhost", + Port: 8888, + }, }, - args: args{key: ""}, - wantR: make([]*ServiceInstance, 0), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := &RedisRegistryService{ - config: tt.fields.config, - cli: tt.fields.cli, - rwLock: tt.fields.rwLock, - vgroupMapping: tt.fields.vgroupMapping, - groupList: tt.fields.groupList, - ctx: tt.fields.ctx, - } - gotR, err := s.Lookup(tt.args.key) - if !tt.wantErr(t, err, fmt.Sprintf("Lookup(%v)", tt.args.key)) { + s := newRedisRegisterService(serviceConfig, redisConfig) + // mockClient.ExpectSet("registry.redis.default_localhost:8888", "localhost:8888", -1) + // wait 2 second for update all service + time.Sleep(5 * time.Second) + // result := mockClient.ExpectGet("registry.redis.default_localhost:8888") + // fmt.Println("result", result) + serviceInstances, err := s.Lookup("default_tx_group") + if err != nil { + t.Errorf("error happen when look up . err = %s", err) return } - assert.Equalf(t, tt.wantR, gotR, "Lookup(%v)", tt.args.key) + t.Logf("name:%s,key:%s,server length:%d", tt.name, tt.args.key, len(serviceInstances)) + for i := range serviceInstances { + t.Log(serviceInstances[i].Addr) + t.Log(serviceInstances[i].Port) + } + assert.True(t, reflect.DeepEqual(serviceInstances, tt.wantR)) + }) } }