diff --git a/charts/mantle-cluster-wide/templates/clusterrole.yaml b/charts/mantle-cluster-wide/templates/clusterrole.yaml index dcd3e01b..b03d8126 100644 --- a/charts/mantle-cluster-wide/templates/clusterrole.yaml +++ b/charts/mantle-cluster-wide/templates/clusterrole.yaml @@ -62,3 +62,11 @@ rules: - get - patch - update +- apiGroups: + - storage.k8s.io + resources: + - storageclasses + verbs: + - get + - list + - watch diff --git a/charts/mantle/templates/deployment.yaml b/charts/mantle/templates/deployment.yaml index 2b50eb2c..2b12da58 100644 --- a/charts/mantle/templates/deployment.yaml +++ b/charts/mantle/templates/deployment.yaml @@ -59,6 +59,11 @@ spec: - /manager args: - --leader-elect + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace - command: - /bin/bash - -c diff --git a/cmd/main.go b/cmd/main.go index 36a67f1a..a9b036f6 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "errors" "flag" "os" @@ -73,10 +74,19 @@ func main() { os.Exit(1) } - if err = (&controller.MantleBackupReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + managedCephClusterID := os.Getenv("POD_NAMESPACE") + if managedCephClusterID == "" { + setupLog.Error(errors.New("POD_NAMESPACE is empty"), "POD_NAMESPACE is empty") + os.Exit(1) + } + + reconciler := controller.NewMantleBackupReconciler( + mgr.GetClient(), + mgr.GetScheme(), + managedCephClusterID, + ) + + if err = reconciler.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "MantleBackup") os.Exit(1) } diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 85598a58..9b6b4408 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -62,3 +62,11 @@ rules: - get - patch - update +- apiGroups: + - storage.k8s.io + resources: + - storageclasses + verbs: + - get + - list + - watch diff --git a/e2e/Makefile b/e2e/Makefile index 9f4f8cc3..7ead5252 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -198,9 +198,11 @@ setup-components: $(HELM) upgrade --install mantle-cluster-wide ../charts/mantle-cluster-wide/ --wait $(HELM) upgrade --install --namespace=$(NS) mantle ../charts/mantle/ --wait + $(HELM) upgrade --install --namespace=$(NS2) mantle2 ../charts/mantle/ --wait .PHONY: delete-components delete-components: + $(HELM) uninstall --namespace=$(NS2) mantle2 --wait || true $(HELM) uninstall --namespace=$(NS) mantle --wait || true $(HELM) uninstall mantle-cluster-wide --wait || true diff --git a/e2e/suite_test.go b/e2e/suite_test.go index e3bdc83d..17b54457 100644 --- a/e2e/suite_test.go +++ b/e2e/suite_test.go @@ -3,6 +3,7 @@ package e2e import ( "bytes" _ "embed" + "encoding/json" "fmt" "os" "os/exec" @@ -43,6 +44,9 @@ const ( mantleBackupName2 = "mantlebackup-test-2" mantleBackupName3 = "mantlebackup-test-3" namespace = "rook-ceph" + namespace2 = "rook-ceph2" + storageClassName = "rook-ceph-block" + storageClassName2 = "rook-ceph-block2" ) func execAtLocal(cmd string, input []byte, args ...string) ([]byte, []byte, error) { @@ -67,6 +71,20 @@ func kubectlWithInput(input []byte, args ...string) ([]byte, []byte, error) { return execAtLocal("kubectl", input, args...) } +func getImageNameFromPVName(pvName string) (string, error) { + stdout, stderr, err := kubectl("get", "pv", pvName, "-o", "json") + if err != nil { + return "", fmt.Errorf("kubectl get pv failed. stderr: %s, err: %w", string(stderr), err) + } + var pv corev1.PersistentVolume + err = json.Unmarshal(stdout, &pv) + if err != nil { + return "", err + } + imageName := pv.Spec.CSI.VolumeAttributes["imageName"] + return imageName, nil +} + func TestMtest(t *testing.T) { if os.Getenv("E2ETEST") == "" { t.Skip("Run under e2e/") @@ -83,11 +101,18 @@ func TestMtest(t *testing.T) { var _ = BeforeSuite(func() { By("[BeforeSuite] Creating common resources") Eventually(func() error { - manifest := fmt.Sprintf(testRBDPoolSCTemplate, poolName, namespace, poolName, namespace, namespace, namespace) + manifest := fmt.Sprintf(testRBDPoolSCTemplate, poolName, namespace, + storageClassName, poolName, namespace, namespace, namespace) _, _, err := kubectlWithInput([]byte(manifest), "apply", "-n", namespace, "-f", "-") if err != nil { return err } + manifest = fmt.Sprintf(testRBDPoolSCTemplate, poolName, namespace2, + storageClassName2, poolName, namespace2, namespace2, namespace2) + _, _, err = kubectlWithInput([]byte(manifest), "apply", "-n", namespace2, "-f", "-") + if err != nil { + return err + } return nil }).Should(Succeed()) @@ -105,6 +130,7 @@ var _ = BeforeSuite(func() { }).Should(Succeed()) By(fmt.Sprintf("[BeforeSuite] Waiting for PVC(%s) to get bound", name)) + pvName := "" Eventually(func() error { stdout, stderr, err := kubectl("-n", namespace, "get", "pvc", name, "-o", "json") if err != nil { @@ -120,6 +146,27 @@ var _ = BeforeSuite(func() { if pvc.Status.Phase != "Bound" { return fmt.Errorf("PVC is not bound yet") } + pvName = pvc.Spec.VolumeName + + return nil + }).Should(Succeed()) + + By(fmt.Sprintf("[BeforeSuite] Create a new RBD image in %s ns with the same name as that of the PVC(%s) in %s ns", + namespace2, name, namespace)) + Eventually(func() error { + // Get the image name + imageName, err := getImageNameFromPVName(pvName) + if err != nil { + return err + } + + // Create a new RBD image in namespace2 with the same name as imageName. + _, stderr, err := kubectl( + "-n", namespace2, "exec", "deploy/rook-ceph-tools", "--", + "rbd", "create", "--size", "100", poolName+"/"+imageName) + if err != nil { + return fmt.Errorf("rbd create failed. stderr: %s, err: %w", string(stderr), err) + } return nil }).Should(Succeed()) @@ -157,9 +204,24 @@ var _ = AfterSuite(func() { _, _, _ = kubectl("delete", "-n", namespace, "pvc", pvc) } + By("[AfterSuite] Deleting RBD images in " + namespace2) + stdout, _, err := kubectl("exec", "-n", namespace2, "deploy/rook-ceph-tools", "--", + "rbd", "ls", poolName, "--format=json") + if err == nil { + imageNames := []string{} + if err := json.Unmarshal(stdout, &imageNames); err == nil { + for _, imageName := range imageNames { + _, _, _ = kubectl("exec", "-n", namespace2, "deploy/rook-ceph-tools", "--", + "rbd", "rm", poolName+"/"+imageName) + } + } + } + By("[AfterSuite] Deleting common resources") - _, _, _ = kubectl("delete", "sc", "rook-ceph-block", "--wait=false") - _, _, _ = kubectl("delete", "-n", namespace, "cephblockpool", "replicapool", "--wait=false") + _, _, _ = kubectl("delete", "sc", storageClassName, "--wait=false") + _, _, _ = kubectl("delete", "sc", storageClassName2, "--wait=false") + _, _, _ = kubectl("delete", "-n", namespace, "cephblockpool", poolName, "--wait=false") + _, _, _ = kubectl("delete", "-n", namespace2, "cephblockpool", poolName, "--wait=false") }) var _ = Describe("rbd backup system", func() { @@ -172,6 +234,7 @@ var _ = Describe("rbd backup system", func() { Expect(err).NotTo(HaveOccurred()) By("Waiting for RBD snapshot to be created") + imageName := "" Eventually(func() error { stdout, stderr, err := kubectl("-n", namespace, "get", "pvc", pvcName, "-o", "json") if err != nil { @@ -184,19 +247,13 @@ var _ = Describe("rbd backup system", func() { } pvName := pvc.Spec.VolumeName - stdout, stderr, err = kubectl("get", "pv", pvName, "-o", "json") - if err != nil { - return fmt.Errorf("kubectl get pv failed. stderr: %s, err: %w", string(stderr), err) - } - var pv corev1.PersistentVolume - err = yaml.Unmarshal(stdout, &pv) + imageName, err = getImageNameFromPVName(pvName) if err != nil { return err } if saveImageName { - firstImageName = pv.Spec.CSI.VolumeAttributes["imageName"] + firstImageName = imageName } - imageName := pv.Spec.CSI.VolumeAttributes["imageName"] stdout, stderr, err = kubectl( "-n", namespace, "exec", "deploy/rook-ceph-tools", "--", @@ -222,6 +279,34 @@ var _ = Describe("rbd backup system", func() { return nil }).Should(Succeed()) + + By("Checking that the mantle-controller deployed for a certain Rook/Ceph cluster (i.e., " + + namespace2 + ") doesn't create a snapshot for a MantleBackup for a different Rook/Ceph cluster (i.e., " + + namespace + ")") + Consistently(func() error { + stdout, stderr, err := kubectl( + "-n", namespace2, "exec", "deploy/rook-ceph-tools", "--", + "rbd", "snap", "ls", poolName+"/"+imageName, "--format=json") + if err != nil { + return fmt.Errorf("rbd snap ls failed. stderr: %s, err: %w", string(stderr), err) + } + var snapshots []controller.Snapshot + err = yaml.Unmarshal(stdout, &snapshots) + if err != nil { + return err + } + existSnapshot := false + for _, s := range snapshots { + if s.Name == mantleBackupName { + existSnapshot = true + break + } + } + if existSnapshot { + return fmt.Errorf("a wrong snapshot exists. snapshotName: %s", mantleBackupName) + } + return nil + }).Should(Succeed()) } It("should create MantleBackup resource", func() { @@ -251,16 +336,10 @@ var _ = Describe("rbd backup system", func() { } pvName := pvc.Spec.VolumeName - stdout, stderr, err = kubectl("get", "pv", pvName, "-o", "json") - if err != nil { - return fmt.Errorf("kubectl get pv failed. stderr: %s, err: %w", string(stderr), err) - } - var pv corev1.PersistentVolume - err = yaml.Unmarshal(stdout, &pv) + imageName, err := getImageNameFromPVName(pvName) if err != nil { return err } - imageName := pv.Spec.CSI.VolumeAttributes["imageName"] stdout, stderr, err = kubectl( "-n", namespace, "exec", "deploy/rook-ceph-tools", "--", diff --git a/e2e/testdata/rbd-pool-sc-template.yaml b/e2e/testdata/rbd-pool-sc-template.yaml index d8768cfc..42f16d83 100644 --- a/e2e/testdata/rbd-pool-sc-template.yaml +++ b/e2e/testdata/rbd-pool-sc-template.yaml @@ -12,7 +12,7 @@ spec: apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: - name: rook-ceph-block + name: %s provisioner: rook-ceph.rbd.csi.ceph.com parameters: clusterID: rook-ceph diff --git a/internal/controller/mantlebackup_controller.go b/internal/controller/mantlebackup_controller.go index 034d68a5..addf93f9 100644 --- a/internal/controller/mantlebackup_controller.go +++ b/internal/controller/mantlebackup_controller.go @@ -6,10 +6,12 @@ import ( "fmt" "io" "os/exec" + "strings" "syscall" "time" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -25,7 +27,8 @@ import ( // MantleBackupReconciler reconciles a MantleBackup object type MantleBackupReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + managedCephClusterID string } type Snapshot struct { @@ -41,10 +44,11 @@ const ( ) // NewMantleBackupReconciler returns NodeReconciler. -func NewMantleBackupReconciler(client client.Client, scheme *runtime.Scheme) *MantleBackupReconciler { +func NewMantleBackupReconciler(client client.Client, scheme *runtime.Scheme, managedCephClusterID string) *MantleBackupReconciler { return &MantleBackupReconciler{ - Client: client, - Scheme: scheme, + Client: client, + Scheme: scheme, + managedCephClusterID: managedCephClusterID, } } @@ -107,6 +111,23 @@ func (r *MantleBackupReconciler) updateStatus(ctx context.Context, backup *backu return nil } +func (r *MantleBackupReconciler) removeRBDSnapshot(poolName, imageName, snapshotName string) error { + command := []string{"rbd", "snap", "rm", poolName + "/" + imageName + "@" + snapshotName} + _, err := executeCommand(command, nil) + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + waitStatus := exitError.Sys().(syscall.WaitStatus) + exitCode := waitStatus.ExitStatus() + if exitCode != int(syscall.ENOENT) { + logger.Error("failed to remove rbd snapshot", "poolName", poolName, "imageName", imageName, "snapshotName", snapshotName, "exitCode", exitCode, "error", err) + return fmt.Errorf("failed to remove rbd snapshot") + } + } + logger.Info("rbd snapshot has already been removed", "poolName", poolName, "imageName", imageName, "snapshotName", snapshotName, "error", err) + } + return nil +} + func (r *MantleBackupReconciler) createRBDSnapshot(ctx context.Context, poolName, imageName string, backup *backupv1.MantleBackup) (ctrl.Result, error) { command := []string{"rbd", "snap", "create", poolName + "/" + imageName + "@" + backup.Name} _, err := executeCommand(command, nil) @@ -157,6 +178,7 @@ func (r *MantleBackupReconciler) createRBDSnapshot(ctx context.Context, poolName //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch //+kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch +//+kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -206,6 +228,39 @@ func (r *MantleBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, err } + storageClassName := pvc.Spec.StorageClassName + if storageClassName == nil { + logger.Info("not managed storage class", "pvc.Spec.StorageClassName", storageClassName) + return ctrl.Result{}, nil + } + var storageClass storagev1.StorageClass + err = r.Get(ctx, types.NamespacedName{Namespace: req.NamespacedName.Name, Name: *storageClassName}, &storageClass) + if err != nil { + logger.Error("failed to get SC", "namespace", req.NamespacedName.Namespace, "name", storageClassName, "error", err) + err2 := r.updateStatus(ctx, &backup, metav1.Condition{Type: backupv1.BackupConditionReadyToUse, Status: metav1.ConditionFalse, Reason: backupv1.BackupReasonFailedToCreateBackup}) + if err2 != nil { + return ctrl.Result{}, err2 + } + return ctrl.Result{}, err + } + + // Check if the MantleBackup resource being reconciled is managed by the CephCluster we are in charge of. + if !strings.HasSuffix(storageClass.Provisioner, ".rbd.csi.ceph.com") { + logger.Info("SC is not managed by RBD", "namespace", req.NamespacedName.Namespace, + "storageClassName", *storageClassName, "provisioner", storageClass.Provisioner) + return ctrl.Result{}, nil + } + clusterID, ok := storageClass.Parameters["clusterID"] + if !ok { + logger.Info("clusterID not found", "namespace", req.NamespacedName.Namespace, "storageClassName", *storageClassName) + return ctrl.Result{}, nil + } + if clusterID != r.managedCephClusterID { + logger.Info("clusterID not matched", "namespace", req.NamespacedName.Namespace, + "storageClassName", *storageClassName, "clusterID", clusterID, "managedCephClusterID", r.managedCephClusterID) + return ctrl.Result{}, nil + } + if pvc.Status.Phase != corev1.ClaimBound { err := r.updateStatus(ctx, &backup, metav1.Condition{Type: backupv1.BackupConditionReadyToUse, Status: metav1.ConditionFalse, Reason: backupv1.BackupReasonFailedToCreateBackup}) if err != nil { @@ -245,18 +300,9 @@ func (r *MantleBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request if !backup.ObjectMeta.DeletionTimestamp.IsZero() { if controllerutil.ContainsFinalizer(&backup, MantleBackupFinalizerName) { - command := []string{"rbd", "snap", "rm", poolName + "/" + imageName + "@" + backup.Name} - _, err = executeCommand(command, nil) + err := r.removeRBDSnapshot(poolName, imageName, backup.Name) if err != nil { - if exitError, ok := err.(*exec.ExitError); ok { - waitStatus := exitError.Sys().(syscall.WaitStatus) - exitCode := waitStatus.ExitStatus() - if exitCode != int(syscall.ENOENT) { - logger.Error("failed to remove rbd snapshot", "poolName", poolName, "imageName", imageName, "snapshotName", backup.Name, "exitCode", exitCode, "error", err) - return ctrl.Result{Requeue: true}, nil - } - } - logger.Info("rbd snapshot has already been removed", "poolName", poolName, "imageName", imageName, "snapshotName", backup.Name, "error", err) + return ctrl.Result{}, err } controllerutil.RemoveFinalizer(&backup, MantleBackupFinalizerName) diff --git a/internal/controller/mantlebackup_controller_test.go b/internal/controller/mantlebackup_controller_test.go index ed7e06d3..5c490030 100644 --- a/internal/controller/mantlebackup_controller_test.go +++ b/internal/controller/mantlebackup_controller_test.go @@ -31,6 +31,9 @@ var _ = Describe("MantleBackup controller", func() { var reconciler *MantleBackupReconciler scheme := runtime.NewScheme() + storageClassClusterID := dummyStorageClassClusterID + storageClassName := dummyStorageClassName + BeforeEach(func() { err := backupv1.AddToScheme(scheme) Expect(err).NotTo(HaveOccurred()) @@ -40,7 +43,7 @@ var _ = Describe("MantleBackup controller", func() { }) Expect(err).ToNot(HaveOccurred()) - reconciler = NewMantleBackupReconciler(k8sClient, mgr.GetScheme()) + reconciler = NewMantleBackupReconciler(k8sClient, mgr.GetScheme(), storageClassClusterID) err = reconciler.SetupWithManager(mgr) Expect(err).NotTo(HaveOccurred()) @@ -105,6 +108,7 @@ var _ = Describe("MantleBackup controller", func() { corev1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI), }, }, + StorageClassName: &storageClassName, }, } err = k8sClient.Create(ctx, &pvc) @@ -232,6 +236,7 @@ var _ = Describe("MantleBackup controller", func() { corev1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI), }, }, + StorageClassName: &storageClassName, }, } err = k8sClient.Create(ctx, &pvc) @@ -367,6 +372,7 @@ var _ = Describe("MantleBackup controller", func() { corev1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI), }, }, + StorageClassName: &storageClassName, }, } err = k8sClient.Create(ctx, &pvc) @@ -494,6 +500,7 @@ var _ = Describe("MantleBackup controller", func() { corev1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI), }, }, + StorageClassName: &storageClassName, }, } err = k8sClient.Create(ctx, &pvc) diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 6e96b006..53697e35 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -12,6 +12,7 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -54,6 +55,10 @@ func TestControllers(t *testing.T) { RunSpecs(t, "Controller Suite") } +var dummyStorageClassName = "dummy-sc" +var dummyStorageClassClusterID = "rook-ceph" +var dummyStorageClassProvisioner = "rook-ceph.rbd.csi.ceph.com" + var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) @@ -86,6 +91,18 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) + sc := storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: dummyStorageClassName, + }, + Provisioner: dummyStorageClassProvisioner, + Parameters: map[string]string{ + "clusterID": dummyStorageClassClusterID, + }, + } + ctx := context.Background() + err = k8sClient.Create(ctx, &sc) + Expect(err).NotTo(HaveOccurred()) }) var _ = AfterSuite(func() {