Skip to content

Commit

Permalink
rm deprecated members file (#1195)
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci authored Jan 16, 2024
1 parent 87751b5 commit 2aa5400
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 598 deletions.
47 changes: 2 additions & 45 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ type cluster struct {

layout *Layout

members *members

server *embed.Etcd
client *clientv3.Client
lease *clientv3.LeaseID
Expand All @@ -143,21 +141,13 @@ func New(opt *option.Options) (Cluster, error) {
return nil, fmt.Errorf("invalid cluster request timeout: %v", err)
}

// Member file, members.ClusterMembers and members.KnownMembers will be deprecated in the future.
// When the new configuration way (cluster.initial-cluster or cluster.primary-listen-peer-urls) is used, let's not create member
// instance but let's read member information from pkg/option/options.go's Options.ClusterOptions directly.
var membersFile *members
if len(opt.GetPeerURLs()) == 0 {
membersFile, err = newMembers(opt)
if err != nil {
return nil, fmt.Errorf("new members failed: %v", err)
}
return nil, fmt.Errorf("no peer urls in cluster.initial-cluster for primary and cluster.primary-listen-peer-url for secondary")
}

c := &cluster{
opt: opt,
requestTimeout: requestTimeout,
members: membersFile,
done: make(chan struct{}),
}

Expand Down Expand Up @@ -320,15 +310,7 @@ func (c *cluster) getClient() (*clientv3.Client, error) {
return c.client, nil
}

var endpoints []string
if c.members == nil {
endpoints = c.opt.GetPeerURLs()
} else {
endpoints = c.members.knownPeerURLs()
if c.opt.ForceNewCluster {
endpoints = []string{c.members.self().PeerURL}
}
}
endpoints := c.opt.GetPeerURLs()
logger.Infof("client connect with endpoints: %v", endpoints)
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Expand Down Expand Up @@ -675,10 +657,6 @@ func (c *cluster) heartbeat() {
if err != nil {
logger.Errorf("sync status failed: %v", err)
}
err = c.updateMembers()
if err != nil {
logger.Errorf("update members failed: %v", err)
}
case <-c.done:
return
}
Expand Down Expand Up @@ -756,27 +734,6 @@ func (c *cluster) syncStatus() error {
return nil
}

func (c *cluster) updateMembers() error {
client, err := c.getClient()
if err != nil {
return err
}

resp, err := func() (*clientv3.MemberListResponse, error) {
ctx, cancel := c.requestContext()
defer cancel()
return client.MemberList(ctx)
}()
if err != nil {
return err
}

if c.members != nil {
c.members.updateClusterMembers(resp.Members)
}
return nil
}

func (c *cluster) PurgeMember(memberName string) error {
client, err := c.getClient()
if err != nil {
Expand Down
97 changes: 74 additions & 23 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"sync"
"testing"
"time"
Expand All @@ -39,6 +38,72 @@ import (
"github.com/stretchr/testify/assert"
)

var memberCounter = 0

func mockTestOpt(ports []int) *option.Options {
memberCounter++
name := fmt.Sprintf("test-member-%03d", memberCounter)

opt := option.New()
opt.Name = name
opt.ClusterName = "test-cluster"
opt.ClusterRole = "primary"
opt.ClusterRequestTimeout = "10s"
opt.Cluster.ListenClientURLs = []string{fmt.Sprintf("http://localhost:%d", ports[0])}
opt.Cluster.AdvertiseClientURLs = opt.Cluster.ListenClientURLs
opt.Cluster.ListenPeerURLs = []string{fmt.Sprintf("http://localhost:%d", ports[1])}
opt.Cluster.InitialAdvertisePeerURLs = opt.Cluster.ListenPeerURLs
opt.APIAddr = fmt.Sprintf("localhost:%d", ports[2])
opt.HomeDir = filepath.Join(tempDir, name)
opt.DataDir = "data"
opt.LogDir = "log"
opt.Debug = false

if err := opt.Parse(); err != nil {
panic(fmt.Errorf("parse option failed: %v", err))
}

return opt
}

func mockMembers(count int) ([]*option.Options, []*pb.Member) {
ports, err := freeport.GetFreePorts(count * 3)
if err != nil {
panic(fmt.Errorf("get %d free ports failed: %v", count*3, err))
}

opts := make([]*option.Options, count)
pbMembers := make([]*pb.Member, count)

for i := 0; i < count; i++ {
id := i + 1
opt := mockTestOpt(ports[i*3 : (i+1)*3])

opts[i] = opt
pbMembers[i] = &pb.Member{
ID: uint64(id),
Name: opt.Name,
PeerURLs: []string{opt.Cluster.InitialAdvertisePeerURLs[0]},
ClientURLs: []string{opt.Cluster.AdvertiseClientURLs[0]},
}

env.InitServerDir(opt)
}

initCluster := map[string]string{}
for _, opt := range opts {
if opt.ClusterRole == "primary" {
initCluster[opt.Name] = opt.Cluster.ListenPeerURLs[0]
}
}
for _, opt := range opts {
if opt.ClusterRole == "primary" {
opt.Cluster.InitialCluster = initCluster
}
}
return opts, pbMembers
}

var tempDir = os.TempDir()

func getRandomString(n int) string {
Expand All @@ -55,9 +120,8 @@ func TestMain(m *testing.M) {
os.Exit(code)
}

func mockStaticClusterMembers(count int) ([]*option.Options, membersSlice, []*pb.Member) {
func mockStaticClusterMembers(count int) ([]*option.Options, []*pb.Member) {
opts := make([]*option.Options, count)
members := make(membersSlice, count)
pbMembers := make([]*pb.Member, count)

portCount := (count * 2) + 1 // two for each member and one for egctl API.
Expand Down Expand Up @@ -91,7 +155,6 @@ func mockStaticClusterMembers(count int) ([]*option.Options, membersSlice, []*pb
opt.HomeDir = filepath.Join(tempDir, name)
opt.DataDir = "data"
opt.LogDir = "log"
opt.MemberDir = "member"
opt.Debug = false
err = opt.Parse() // create directories
if err != nil {
Expand All @@ -101,11 +164,6 @@ func mockStaticClusterMembers(count int) ([]*option.Options, membersSlice, []*pb
id := uint64(i + 1)

opts[i] = opt
members[i] = &member{
ID: id,
Name: opt.Name,
PeerURL: opt.Cluster.InitialAdvertisePeerURLs[0],
}
pbMembers[i] = &pb.Member{
ID: id,
Name: opt.Name,
Expand All @@ -114,18 +172,11 @@ func mockStaticClusterMembers(count int) ([]*option.Options, membersSlice, []*pb
}
env.InitServerDir(opts[i])
}
sort.Sort(members)
noexistMember := members.getByPeerURL("no-exist")
if noexistMember != nil {
panic("get a member not exist succ, should failed")
}
members.deleteByName("no-exist")
members.deleteByPeerURL("no-exist-purl")
return opts, members, pbMembers
return opts, pbMembers
}

func mockStaticCluster(count int) []*cluster {
opts, _, _ := mockStaticClusterMembers(count)
opts, _ := mockStaticClusterMembers(count)

clusterNodes := make([]*cluster, count)
clusterNodesLock := sync.Mutex{}
Expand Down Expand Up @@ -216,7 +267,7 @@ func TestLeaseInvalid(t *testing.T) {
}

func TestClusterStart(t *testing.T) {
opts, _, _ := mockMembers(1)
opts, _ := mockMembers(1)

cls, err := New(opts[0])

Expand All @@ -234,7 +285,7 @@ func TestClusterStart(t *testing.T) {

func TestClusterPurgeMember(t *testing.T) {
assert := assert.New(t)
opts, _, _ := mockMembers(2)
opts, _ := mockMembers(2)

go func() {
_, err := New(opts[1])
Expand All @@ -250,7 +301,7 @@ func TestClusterPurgeMember(t *testing.T) {
}

func TestClusterSyncer(t *testing.T) {
opts, _, _ := mockMembers(1)
opts, _ := mockMembers(1)
cls, err := New(opts[0])

if err != nil {
Expand Down Expand Up @@ -330,7 +381,7 @@ func TestClusterSyncer(t *testing.T) {
}

func TestClusterWatcher(t *testing.T) {
opts, _, _ := mockMembers(1)
opts, _ := mockMembers(1)
cls, err := New(opts[0])

if err != nil {
Expand Down Expand Up @@ -471,7 +522,7 @@ func TestUtil(t *testing.T) {
}

func TestMutexAndOP(t *testing.T) {
opts, _, _ := mockMembers(1)
opts, _ := mockMembers(1)
cls, err := New(opts[0])

if err != nil {
Expand Down
Loading

0 comments on commit 2aa5400

Please sign in to comment.