Skip to content

Commit

Permalink
e2e: test done
Browse files Browse the repository at this point in the history
  • Loading branch information
jiayouxujin committed Sep 19, 2023
1 parent a5d7b19 commit 8f9b41e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 108 deletions.
37 changes: 0 additions & 37 deletions pkg/client/kvrocks/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,6 @@ import (
"strings"
)

const (
ClusterNotInitErr = "CLUSTERDOWN The cluster is not initialized"
ClusterAlreadyMigrate = "Can't migrate slot which has been migrated"
ClusterSlotInvalid = "Can't migrate slot which doesn't belong to me"
ClusterVersionInvalid = "Invalid cluster version"
ClusterInvalidVersion = "Invalid version of cluster"
)

func (s *Client) MoveSlots(ip, password string, slot int, dstNodeId string) bool {
c := kvrocksClient(ip, password)
defer c.Close()
if err := c.Do(ctx, "CLUSTERX", "MIGRATE", slot, dstNodeId).Err(); err != nil && (err.Error() == ClusterAlreadyMigrate || err.Error() == ClusterSlotInvalid) {
return true
}
return false
}

func (s *Client) ResetSlot(ip, password string, slot, version int, dstNodeId string) error {
c := kvrocksClient(ip, password)
defer c.Close()
if err := c.Do(ctx, "CLUSTERX", "SETSLOT", slot, "NODE", dstNodeId, version).Err(); err != nil {
return err
}
s.logger.V(1).Info("clusterx setslot successfully", "ip", ip, "node", dstNodeId, "slot", slot, "version", version)
return nil
}

func (s *Client) ClusterVersion(ip, password string) (int, error) {
c := kvrocksClient(ip, password)
defer c.Close()
result, err := c.Do(ctx, "CLUSTERX", "VERSION").Int()
if err != nil {
return -1, err
}
return result, nil
}

func (s *Client) ClusterNodeInfo(ip, password string) (*Node, error) {
c := kvrocksClient(ip, password)
defer c.Close()
Expand Down
7 changes: 0 additions & 7 deletions pkg/client/kvrocks/kvrocks_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ func kvrocksSentinelClient(ip, password string) *client.SentinelClient {
})
}

func kvrocksClusterClient(ip, password string) *client.ClusterClient {
return client.NewClusterClient(&client.ClusterOptions{
Addrs: []string{net.JoinHostPort(ip, strconv.Itoa(KVRocksPort))},
Password: password,
})
}

func (node *Node) InsertSlot(value int) {
node.Slots = append(node.Slots, value)
sort.Ints(node.Slots)
Expand Down
127 changes: 63 additions & 64 deletions test/e2e/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,56 @@ var _ = Describe("Operator for Cluster Mode", func() {
})

It("test recover when slave down", func() {
for index := 0; index < int(kvrocksInstance.Spec.Master); index++ {

var pod corev1.Pod
key := types.NamespacedName{
Namespace: kvrocksInstance.GetNamespace(),
Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), 0, 1),
}
Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed())
Expect(kvrocksInstance.Status.Topo[0].Topology[1].Role).Should(Equal(kvrocks.RoleSlaver))
Expect(env.Client.Delete(ctx, &pod)).Should(Succeed())

// wait pod reconstruction
time.Sleep(time.Second * 30)
Eventually(func() error {
err := env.Client.Get(ctx, kvrocksKey, kvrocksInstance)
if err != nil {
return err
}

var pod corev1.Pod
key := types.NamespacedName{
Namespace: kvrocksInstance.GetNamespace(),
Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), index, 1),
Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), 0, 1),
}
if err := env.Client.Get(ctx, key, &pod); err != nil {
return err
}
if pod.Status.Phase != corev1.PodRunning {
return errors.New("please wait pod running")
}
if kvrocksInstance.Status.Topo[0].Topology[1].Failover {
return errors.New("wait failover over")
}
Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed())
Expect(kvrocksInstance.Status.Topo[index].Topology[1].Role).Should(Equal(kvrocks.RoleSlaver))
Expect(env.Client.Delete(ctx, &pod)).Should(Succeed())

return nil
}, timeout, interval).Should(Succeed())
Eventually(func() error {
return checkKvrocksCluster(kvrocksKey, sentinelKey)
}, timeout, interval).Should(Succeed())
})

It("test recover when master down", func() {

var pod corev1.Pod
key := types.NamespacedName{
Namespace: kvrocksInstance.GetNamespace(),
Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), 0, 0),
}
Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed())
Expect(kvrocksInstance.Status.Topo[0].Topology[0].Role).Should(Equal(kvrocks.RoleMaster))
Expect(env.Client.Delete(ctx, &pod)).Should(Succeed())

// wait pod reconstruction
time.Sleep(time.Second * 30)
Expand All @@ -150,21 +190,21 @@ var _ = Describe("Operator for Cluster Mode", func() {
if err != nil {
return err
}
for index := 0; index < int(kvrocksInstance.Spec.Master); index++ {
var pod corev1.Pod
key := types.NamespacedName{
Namespace: kvrocksInstance.GetNamespace(),
Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), index, 1),
}
if err := env.Client.Get(ctx, key, &pod); err != nil {
return err
}
if pod.Status.Phase != corev1.PodRunning {
return errors.New("please wait pod running")
}
if kvrocksInstance.Status.Topo[index].Topology[1].Failover {
return errors.New("wait failover over")
}
Expect(err).Should(Succeed())

var pod corev1.Pod
key := types.NamespacedName{
Namespace: kvrocksInstance.GetNamespace(),
Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), 0, 0),
}
if err := env.Client.Get(ctx, key, &pod); err != nil {
return err
}
if pod.Status.Phase != corev1.PodRunning {
return errors.New("please wait pod running")
}
if kvrocksInstance.Status.Topo[0].Topology[0].Failover {
return errors.New("wait failover over")
}
return nil
}, timeout, interval).Should(Succeed())
Expand All @@ -173,49 +213,6 @@ var _ = Describe("Operator for Cluster Mode", func() {
}, timeout, interval).Should(Succeed())
})

// It("test recover when master down", func() {
// for index := 0; index < int(kvrocksInstance.Spec.Master); index++ {
// var pod corev1.Pod
// key := types.NamespacedName{
// Namespace: kvrocksInstance.GetNamespace(),
// Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), index, 0),
// }
// Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed())
// Expect(kvrocksInstance.Status.Topo[index].Topology[0].Role).Should(Equal(kvrocks.RoleMaster))
// Expect(env.Client.Delete(ctx, &pod)).Should(Succeed())
// }

// // wait pod reconstruction
// time.Sleep(time.Second * 30)
// Eventually(func() error {
// err := env.Client.Get(ctx, kvrocksKey, kvrocksInstance)
// if err != nil {
// return err
// }
// Expect(err).Should(Succeed())
// for index := 0; index < int(kvrocksInstance.Spec.Master); index++ {
// var pod corev1.Pod
// key := types.NamespacedName{
// Namespace: kvrocksInstance.GetNamespace(),
// Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), index, 0),
// }
// if err := env.Client.Get(ctx, key, &pod); err != nil {
// return err
// }
// if pod.Status.Phase != corev1.PodRunning {
// return errors.New("please wait pod running")
// }
// if kvrocksInstance.Status.Topo[index].Topology[0].Failover {
// return errors.New("wait failover over")
// }
// }
// return nil
// }, timeout, interval).Should(Succeed())
// Eventually(func() error {
// return checkKvrocksCluster(kvrocksKey, sentinelKey)
// }, timeout, interval).Should(Succeed())
// })

It("test shrink", func() {
kvrocksInstance.Spec.Replicas = 1
Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed())
Expand Down Expand Up @@ -260,10 +257,11 @@ var _ = Describe("Operator for Cluster Mode", func() {
}, timeout, interval).Should(Succeed())
})

It("test expansion or shrink master", func() {
It("test expansion and shrink master", func() {
// expansion
kvrocksInstance.Spec.Master = 5
Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed())
time.Sleep(time.Second * 30)
for index := 0; index < int(kvrocksInstance.Spec.Master); index++ {
key := types.NamespacedName{
Namespace: kvrocksInstance.Namespace,
Expand All @@ -287,6 +285,7 @@ var _ = Describe("Operator for Cluster Mode", func() {
Expect(err).Should(Succeed())
kvrocksInstance.Spec.Master = 3
Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed())
time.Sleep(time.Second * 30)
for index := 0; index < int(kvrocksInstance.Spec.Master); index++ {
key := types.NamespacedName{
Namespace: kvrocksInstance.Namespace,
Expand Down

0 comments on commit 8f9b41e

Please sign in to comment.