Skip to content

Commit

Permalink
fix(e2e): Improve Data insert tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Pothulapati committed Oct 11, 2023
1 parent 9531564 commit a70359a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 80 deletions.
100 changes: 52 additions & 48 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
},
Key: "password",
},
ClientCaCertSecret: &corev1.SecretReference{
Name: "df-client-ca-certs",
},
},
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
Expand All @@ -107,27 +104,16 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
}

Context("Dragonfly resource creation", func() {
password := "df-pass-1"
It("Should create successfully", func() {
// create the secret
err := k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "df-client-ca-certs",
Namespace: namespace,
},
StringData: map[string]string{
"ca.crt": "foo",
},
})
Expect(err).To(BeNil())

// create the secret
err = k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "df-secret",
Namespace: namespace,
},
StringData: map[string]string{
"password": "df-pass-1",
"password": password,
},
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -185,22 +171,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
SecretKeyRef: df.Spec.Authentication.PasswordFromSecret,
},
}))

// ClientCACertSecret
Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("%s=%s", resources.TLSCACertDirArg, resources.TLSCACertDir)))
Expect(ss.Spec.Template.Spec.Containers[0].VolumeMounts).To(ContainElement(corev1.VolumeMount{
Name: resources.TLSCACertVolumeName,
MountPath: resources.TLSCACertDir,
}))
Expect(ss.Spec.Template.Spec.Volumes).To(ContainElement(corev1.Volume{
Name: resources.TLSCACertVolumeName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: df.Spec.Authentication.ClientCaCertSecret.Name,
DefaultMode: func() *int32 { i := int32(420); return &i }(),
},
},
}))
})

It("Check for pod values", func() {
Expand All @@ -219,11 +189,14 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()

It("Check for connectivity", func() {
stopChan := make(chan struct{}, 1)
rc, err := InitRunCmd(ctx, stopChan, name, namespace, "df-pass-1")
defer close(stopChan)
_, err := checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, password)
Expect(err).To(BeNil())
err = rc.Start(ctx)
defer close(stopChan)
// check for ping
stopChan = make(chan struct{}, 1)
_, err = checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, password)
Expect(err).To(BeNil())

})

It("Increase in replicas should be propagated successfully", func() {
Expand Down Expand Up @@ -327,7 +300,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
})

It("Update to image should be propagated successfully", func() {
newImage := resources.DragonflyImage + ":v1.1.0"
newImage := resources.DragonflyImage + ":v1.9.0"
// Update df to the latest
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Expand Down Expand Up @@ -494,7 +467,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
})
})

var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
var _ = Describe("Dragonfly PVC Test with single replica", Ordered, func() {

ctx := context.Background()
name := "df-pvc"
Expand Down Expand Up @@ -561,7 +534,47 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
Expect(pvcs.Items).To(HaveLen(1))
Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("--snapshot_cron=%s", schedule)))

// TODO: Do data insert testing
// Insert Data
stopChan := make(chan struct{}, 1)
rc, err := checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, "")
Expect(err).To(BeNil())

// Insert test data
Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil())
close(stopChan)

// delete the single replica
var pod corev1.Pod
err = k8sClient.Get(ctx, types.NamespacedName{
Name: fmt.Sprintf("%s-0", name),
Namespace: namespace,
}, &pod)
Expect(err).To(BeNil())

err = k8sClient.Delete(ctx, &pod)
Expect(err).To(BeNil())

// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)

// check if the pod is created
err = k8sClient.Get(ctx, types.NamespacedName{
Name: fmt.Sprintf("%s-0", name),
Namespace: namespace,
}, &pod)
Expect(err).To(BeNil())

// recreate Redis Client on the new pod
stopChan = make(chan struct{}, 1)
rc, err = checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, "df-pass-1")
defer close(stopChan)
Expect(err).To(BeNil())

// check if the Data exists
data, err := rc.Get(ctx, "foo").Result()
Expect(err).To(BeNil())
Expect(data).To(Equal("bar"))
})

It("Cleanup", func() {
Expand All @@ -579,7 +592,7 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
})
})

var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() {
var _ = Describe("Dragonfly Server TLS tests", Ordered, func() {
ctx := context.Background()
name := "df-tls"
namespace := "default"
Expand All @@ -596,15 +609,6 @@ var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() {
Spec: resourcesv1.DragonflySpec{
Replicas: 2,
Args: args,
Env: []corev1.EnvVar{
{
Name: "DFLY_PASSWORD",
Value: "df-pass-1",
},
},
TLSSecretRef: &corev1.SecretReference{
Name: "df-tls",
},
},
}

Expand Down
69 changes: 37 additions & 32 deletions e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st
return false, nil
}

type RunCmd struct {
*redis.Client
*portforward.PortForwarder
}

func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, password string) (*RunCmd, error) {
func checkAndK8sPortForwardRedis(ctx context.Context, stopChan chan struct{}, name, namespace, password string) (*redis.Client, error) {
home := homedir.HomeDir()
if home == "" {
return nil, fmt.Errorf("can't find kube-config")
Expand All @@ -108,6 +103,10 @@ func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, pa
return nil, err
}

if len(pods.Items) == 0 {
return nil, fmt.Errorf("no pods found")
}

var master *corev1.Pod
for _, pod := range pods.Items {
if pod.Labels[resources.Role] == resources.Master {
Expand All @@ -116,16 +115,43 @@ func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, pa
}
}

if master == nil {
return nil, fmt.Errorf("no master pod found")
}

fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyPort)
if err != nil {
return nil, err
}

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("localhost:%d", resources.DragonflyPort),
Password: password,
})
return &RunCmd{Client: redisClient, PortForwarder: fw}, nil
redisOptions := &redis.Options{
Addr: fmt.Sprintf("localhost:%d", resources.DragonflyPort),
}

if password != "" {
redisOptions.Password = password
}

redisClient := redis.NewClient(redisOptions)

errChan := make(chan error, 1)
go func() { errChan <- fw.ForwardPorts() }()

select {
case err = <-errChan:
return nil, errors.Wrap(err, "unable to forward ports")
case <-fw.Ready:
}

pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

err = redisClient.Ping(pingCtx).Err()
if err != nil {
return nil, fmt.Errorf("unable to ping instance: %w", err)
}

return redisClient, nil
}

func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, pod *corev1.Pod, stopChan chan struct{}, port int) (*portforward.PortForwarder, error) {
Expand All @@ -151,24 +177,3 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r
}
return fw, err
}

func (r *RunCmd) Start(ctx context.Context) error {
errChan := make(chan error, 1)
var err error
go func() { errChan <- r.ForwardPorts() }()

select {
case err = <-errChan:
return errors.Wrap(err, "port forwarding failed")
case <-r.Ready:
}

pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

err = r.Ping(pingCtx).Err()
if err != nil {
return fmt.Errorf("unable to ping instance")
}
return nil
}

0 comments on commit a70359a

Please sign in to comment.