Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(e2e): Improve data insert tests #114

Merged
merged 5 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 55 additions & 49 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
},
Key: "password",
},
ClientCaCertSecret: &corev1.SecretReference{
Name: "df-client-ca-certs",
},
},
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
Expand All @@ -107,27 +104,16 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
}

Context("Dragonfly resource creation", func() {
password := "df-pass-1"
It("Should create successfully", func() {
// create the secret
err := k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "df-client-ca-certs",
Namespace: namespace,
},
StringData: map[string]string{
"ca.crt": "foo",
},
})
Expect(err).To(BeNil())

// create the secret
err = k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "df-secret",
Namespace: namespace,
},
StringData: map[string]string{
"password": "df-pass-1",
"password": password,
},
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -185,22 +171,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
SecretKeyRef: df.Spec.Authentication.PasswordFromSecret,
},
}))

// ClientCACertSecret
Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("%s=%s", resources.TLSCACertDirArg, resources.TLSCACertDir)))
Expect(ss.Spec.Template.Spec.Containers[0].VolumeMounts).To(ContainElement(corev1.VolumeMount{
Name: resources.TLSCACertVolumeName,
MountPath: resources.TLSCACertDir,
}))
Expect(ss.Spec.Template.Spec.Volumes).To(ContainElement(corev1.Volume{
Name: resources.TLSCACertVolumeName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: df.Spec.Authentication.ClientCaCertSecret.Name,
DefaultMode: func() *int32 { i := int32(420); return &i }(),
},
},
}))
})

It("Check for pod values", func() {
Expand All @@ -219,11 +189,9 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()

It("Check for connectivity", func() {
stopChan := make(chan struct{}, 1)
rc, err := InitRunCmd(ctx, stopChan, name, namespace, "df-pass-1")
defer close(stopChan)
Expect(err).To(BeNil())
err = rc.Start(ctx)
_, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password)
Expect(err).To(BeNil())
defer close(stopChan)
})

It("Increase in replicas should be propagated successfully", func() {
Expand Down Expand Up @@ -327,7 +295,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
})

It("Update to image should be propagated successfully", func() {
newImage := resources.DragonflyImage + ":v1.1.0"
newImage := resources.DragonflyImage + ":v1.9.0"
// Update df to the latest
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Expand Down Expand Up @@ -494,8 +462,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
})
})

var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {

var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempts(3), func() {
ctx := context.Background()
name := "df-pvc"
namespace := "default"
Expand Down Expand Up @@ -531,14 +498,16 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
},
})
Expect(err).To(BeNil())
})

It("Resources should exist", func() {
// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)

// Check for service and statefulset
var ss appsv1.StatefulSet
err = k8sClient.Get(ctx, types.NamespacedName{
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expand All @@ -561,7 +530,48 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
Expect(pvcs.Items).To(HaveLen(1))
Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("--snapshot_cron=%s", schedule)))

// TODO: Do data insert testing
// Insert Data
stopChan := make(chan struct{}, 1)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, "")
Expect(err).To(BeNil())

// Insert test data
Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil())
close(stopChan)

// delete the single replica
var pod corev1.Pod
err = k8sClient.Get(ctx, types.NamespacedName{
Name: fmt.Sprintf("%s-0", name),
Namespace: namespace,
}, &pod)
Expect(err).To(BeNil())

err = k8sClient.Delete(ctx, &pod)
Expect(err).To(BeNil())

time.Sleep(10 * time.Second)

// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)
// check if the pod is created
err = k8sClient.Get(ctx, types.NamespacedName{
Name: fmt.Sprintf("%s-0", name),
Namespace: namespace,
}, &pod)
Expect(err).To(BeNil())

// recreate Redis Client on the new pod
stopChan = make(chan struct{}, 1)
rc, err = checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, "")
defer close(stopChan)
Expect(err).To(BeNil())

// check if the Data exists
data, err := rc.Get(ctx, "foo").Result()
Expect(err).To(BeNil())
Expect(data).To(Equal("bar"))
})

It("Cleanup", func() {
Expand All @@ -579,7 +589,7 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
})
})

var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() {
var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() {
ctx := context.Background()
name := "df-tls"
namespace := "default"
Expand All @@ -596,12 +606,6 @@ var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() {
Spec: resourcesv1.DragonflySpec{
Replicas: 2,
Args: args,
Env: []corev1.EnvVar{
{
Name: "DFLY_PASSWORD",
Value: "df-pass-1",
},
},
TLSSecretRef: &corev1.SecretReference{
Name: "df-tls",
},
Expand Down Expand Up @@ -629,13 +633,15 @@ var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() {
err = k8sClient.Create(ctx, &df)
Expect(err).To(BeNil())

})
It("Resources should exist", func() {
// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)

// Check for service and statefulset
var ss appsv1.StatefulSet
err = k8sClient.Get(ctx, types.NamespacedName{
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expand Down
93 changes: 39 additions & 54 deletions e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"net/http"
"os"
"path/filepath"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -35,10 +34,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/client-go/util/homedir"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -78,36 +75,18 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st
return false, nil
}

type RunCmd struct {
*redis.Client
*portforward.PortForwarder
}

func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, password string) (*RunCmd, error) {
home := homedir.HomeDir()
if home == "" {
return nil, fmt.Errorf("can't find kube-config")
}
kubeconfig := filepath.Join(home, ".kube", "config")
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}

// create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

func checkAndK8sPortForwardRedis(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, stopChan chan struct{}, name, namespace, password string) (*redis.Client, error) {
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s", name),
})
if err != nil {
return nil, err
}

if len(pods.Items) == 0 {
return nil, fmt.Errorf("no pods found")
}

var master *corev1.Pod
for _, pod := range pods.Items {
if pod.Labels[resources.Role] == resources.Master {
Expand All @@ -116,16 +95,43 @@ func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, pa
}
}

fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyPort)
if master == nil {
return nil, fmt.Errorf("no master pod found")
}

fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyAdminPort)
if err != nil {
return nil, err
}

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("localhost:%d", resources.DragonflyPort),
Password: password,
})
return &RunCmd{Client: redisClient, PortForwarder: fw}, nil
redisOptions := &redis.Options{
Addr: fmt.Sprintf("localhost:9998"),
}

if password != "" {
redisOptions.Password = password
}

redisClient := redis.NewClient(redisOptions)

errChan := make(chan error, 1)
go func() { errChan <- fw.ForwardPorts() }()

select {
case err = <-errChan:
return nil, errors.Wrap(err, "unable to forward ports")
case <-fw.Ready:
}

pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

err = redisClient.Ping(pingCtx).Err()
if err != nil {
return nil, fmt.Errorf("unable to ping instance: %w", err)
}

return redisClient, nil
}

func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, pod *corev1.Pod, stopChan chan struct{}, port int) (*portforward.PortForwarder, error) {
Expand All @@ -142,7 +148,7 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
ports := []string{fmt.Sprintf("%d:%d", port, resources.DragonflyPort)}
ports := []string{fmt.Sprintf("%d:%d", 9998, resources.DragonflyPort)}
readyChan := make(chan struct{}, 1)

fw, err := portforward.New(dialer, ports, stopChan, readyChan, io.Discard, os.Stderr)
Expand All @@ -151,24 +157,3 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r
}
return fw, err
}

func (r *RunCmd) Start(ctx context.Context) error {
errChan := make(chan error, 1)
var err error
go func() { errChan <- r.ForwardPorts() }()

select {
case err = <-errChan:
return errors.Wrap(err, "port forwarding failed")
case <-r.Ready:
}

pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

err = r.Ping(pingCtx).Err()
if err != nil {
return fmt.Errorf("unable to ping instance")
}
return nil
}