From 8f9b41ea9864cd4ffc68da792107a446558cfc4d Mon Sep 17 00:00:00 2001 From: jinxu <1319039722@qq.com> Date: Tue, 19 Sep 2023 16:02:08 +0800 Subject: [PATCH] e2e: test done --- pkg/client/kvrocks/cluster.go | 37 -------- pkg/client/kvrocks/kvrocks_client.go | 7 -- test/e2e/cluster/cluster_test.go | 127 +++++++++++++-------------- 3 files changed, 63 insertions(+), 108 deletions(-) diff --git a/pkg/client/kvrocks/cluster.go b/pkg/client/kvrocks/cluster.go index 36386dd..5a2a217 100644 --- a/pkg/client/kvrocks/cluster.go +++ b/pkg/client/kvrocks/cluster.go @@ -4,43 +4,6 @@ import ( "strings" ) -const ( - ClusterNotInitErr = "CLUSTERDOWN The cluster is not initialized" - ClusterAlreadyMigrate = "Can't migrate slot which has been migrated" - ClusterSlotInvalid = "Can't migrate slot which doesn't belong to me" - ClusterVersionInvalid = "Invalid cluster version" - ClusterInvalidVersion = "Invalid version of cluster" -) - -func (s *Client) MoveSlots(ip, password string, slot int, dstNodeId string) bool { - c := kvrocksClient(ip, password) - defer c.Close() - if err := c.Do(ctx, "CLUSTERX", "MIGRATE", slot, dstNodeId).Err(); err != nil && (err.Error() == ClusterAlreadyMigrate || err.Error() == ClusterSlotInvalid) { - return true - } - return false -} - -func (s *Client) ResetSlot(ip, password string, slot, version int, dstNodeId string) error { - c := kvrocksClient(ip, password) - defer c.Close() - if err := c.Do(ctx, "CLUSTERX", "SETSLOT", slot, "NODE", dstNodeId, version).Err(); err != nil { - return err - } - s.logger.V(1).Info("clusterx setslot successfully", "ip", ip, "node", dstNodeId, "slot", slot, "version", version) - return nil -} - -func (s *Client) ClusterVersion(ip, password string) (int, error) { - c := kvrocksClient(ip, password) - defer c.Close() - result, err := c.Do(ctx, "CLUSTERX", "VERSION").Int() - if err != nil { - return -1, err - } - return result, nil -} - func (s *Client) ClusterNodeInfo(ip, password string) (*Node, error) { c := kvrocksClient(ip, password) defer c.Close() diff --git a/pkg/client/kvrocks/kvrocks_client.go b/pkg/client/kvrocks/kvrocks_client.go index 19793d9..d33bac9 100644 --- a/pkg/client/kvrocks/kvrocks_client.go +++ b/pkg/client/kvrocks/kvrocks_client.go @@ -76,13 +76,6 @@ func kvrocksSentinelClient(ip, password string) *client.SentinelClient { }) } -func kvrocksClusterClient(ip, password string) *client.ClusterClient { - return client.NewClusterClient(&client.ClusterOptions{ - Addrs: []string{net.JoinHostPort(ip, strconv.Itoa(KVRocksPort))}, - Password: password, - }) -} - func (node *Node) InsertSlot(value int) { node.Slots = append(node.Slots, value) sort.Ints(node.Slots) diff --git a/test/e2e/cluster/cluster_test.go b/test/e2e/cluster/cluster_test.go index 4f1ae08..e7ca3f4 100644 --- a/test/e2e/cluster/cluster_test.go +++ b/test/e2e/cluster/cluster_test.go @@ -132,16 +132,56 @@ var _ = Describe("Operator for Cluster Mode", func() { }) It("test recover when slave down", func() { - for index := 0; index < int(kvrocksInstance.Spec.Master); index++ { + + var pod corev1.Pod + key := types.NamespacedName{ + Namespace: kvrocksInstance.GetNamespace(), + Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), 0, 1), + } + Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed()) + Expect(kvrocksInstance.Status.Topo[0].Topology[1].Role).Should(Equal(kvrocks.RoleSlaver)) + Expect(env.Client.Delete(ctx, &pod)).Should(Succeed()) + + // wait pod reconstruction + time.Sleep(time.Second * 30) + Eventually(func() error { + err := env.Client.Get(ctx, kvrocksKey, kvrocksInstance) + if err != nil { + return err + } + var pod corev1.Pod key := types.NamespacedName{ Namespace: kvrocksInstance.GetNamespace(), - Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), index, 1), + Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), 0, 1), + } + if err := env.Client.Get(ctx, key, &pod); err != nil { + return err + } + if pod.Status.Phase != corev1.PodRunning { + return errors.New("please wait pod running") + } + if kvrocksInstance.Status.Topo[0].Topology[1].Failover { + return errors.New("wait failover over") } - Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed()) - Expect(kvrocksInstance.Status.Topo[index].Topology[1].Role).Should(Equal(kvrocks.RoleSlaver)) - Expect(env.Client.Delete(ctx, &pod)).Should(Succeed()) + + return nil + }, timeout, interval).Should(Succeed()) + Eventually(func() error { + return checkKvrocksCluster(kvrocksKey, sentinelKey) + }, timeout, interval).Should(Succeed()) + }) + + It("test recover when master down", func() { + + var pod corev1.Pod + key := types.NamespacedName{ + Namespace: kvrocksInstance.GetNamespace(), + Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), 0, 0), } + Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed()) + Expect(kvrocksInstance.Status.Topo[0].Topology[0].Role).Should(Equal(kvrocks.RoleMaster)) + Expect(env.Client.Delete(ctx, &pod)).Should(Succeed()) // wait pod reconstruction time.Sleep(time.Second * 30) @@ -150,21 +190,21 @@ var _ = Describe("Operator for Cluster Mode", func() { if err != nil { return err } - for index := 0; index < int(kvrocksInstance.Spec.Master); index++ { - var pod corev1.Pod - key := types.NamespacedName{ - Namespace: kvrocksInstance.GetNamespace(), - Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), index, 1), - } - if err := env.Client.Get(ctx, key, &pod); err != nil { - return err - } - if pod.Status.Phase != corev1.PodRunning { - return errors.New("please wait pod running") - } - if kvrocksInstance.Status.Topo[index].Topology[1].Failover { - return errors.New("wait failover over") - } + Expect(err).Should(Succeed()) + + var pod corev1.Pod + key := types.NamespacedName{ + Namespace: kvrocksInstance.GetNamespace(), + Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), 0, 0), + } + if err := env.Client.Get(ctx, key, &pod); err != nil { + return err + } + if pod.Status.Phase != corev1.PodRunning { + return errors.New("please wait pod running") + } + if kvrocksInstance.Status.Topo[0].Topology[0].Failover { + return errors.New("wait failover over") } return nil }, timeout, interval).Should(Succeed()) @@ -173,49 +213,6 @@ var _ = Describe("Operator for Cluster Mode", func() { }, timeout, interval).Should(Succeed()) }) - // It("test recover when master down", func() { - // for index := 0; index < int(kvrocksInstance.Spec.Master); index++ { - // var pod corev1.Pod - // key := types.NamespacedName{ - // Namespace: kvrocksInstance.GetNamespace(), - // Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), index, 0), - // } - // Expect(env.Client.Get(ctx, key, &pod)).Should(Succeed()) - // Expect(kvrocksInstance.Status.Topo[index].Topology[0].Role).Should(Equal(kvrocks.RoleMaster)) - // Expect(env.Client.Delete(ctx, &pod)).Should(Succeed()) - // } - - // // wait pod reconstruction - // time.Sleep(time.Second * 30) - // Eventually(func() error { - // err := env.Client.Get(ctx, kvrocksKey, kvrocksInstance) - // if err != nil { - // return err - // } - // Expect(err).Should(Succeed()) - // for index := 0; index < int(kvrocksInstance.Spec.Master); index++ { - // var pod corev1.Pod - // key := types.NamespacedName{ - // Namespace: kvrocksInstance.GetNamespace(), - // Name: fmt.Sprintf("%s-%d-%d", kvrocksInstance.GetName(), index, 0), - // } - // if err := env.Client.Get(ctx, key, &pod); err != nil { - // return err - // } - // if pod.Status.Phase != corev1.PodRunning { - // return errors.New("please wait pod running") - // } - // if kvrocksInstance.Status.Topo[index].Topology[0].Failover { - // return errors.New("wait failover over") - // } - // } - // return nil - // }, timeout, interval).Should(Succeed()) - // Eventually(func() error { - // return checkKvrocksCluster(kvrocksKey, sentinelKey) - // }, timeout, interval).Should(Succeed()) - // }) - It("test shrink", func() { kvrocksInstance.Spec.Replicas = 1 Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed()) @@ -260,10 +257,11 @@ var _ = Describe("Operator for Cluster Mode", func() { }, timeout, interval).Should(Succeed()) }) - It("test expansion or shrink master", func() { + It("test expansion and shrink master", func() { // expansion kvrocksInstance.Spec.Master = 5 Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed()) + time.Sleep(time.Second * 30) for index := 0; index < int(kvrocksInstance.Spec.Master); index++ { key := types.NamespacedName{ Namespace: kvrocksInstance.Namespace, @@ -287,6 +285,7 @@ var _ = Describe("Operator for Cluster Mode", func() { Expect(err).Should(Succeed()) kvrocksInstance.Spec.Master = 3 Expect(env.Client.Update(ctx, kvrocksInstance)).Should(Succeed()) + time.Sleep(time.Second * 30) for index := 0; index < int(kvrocksInstance.Spec.Master); index++ { key := types.NamespacedName{ Namespace: kvrocksInstance.Namespace,