From feddf0b8a2f95d62fabb3eed0c0b888e80d8430e Mon Sep 17 00:00:00 2001 From: "Bala.FA" Date: Sun, 29 Oct 2023 23:02:31 +0530 Subject: [PATCH] Add drive repair support Signed-off-by: Bala.FA --- .golangci.yml | 4 +- cmd/directpv/main.go | 1 + cmd/directpv/repair.go | 80 ++++++++ cmd/kubectl-directpv/main.go | 1 + cmd/kubectl-directpv/repair.go | 92 +++++++++ docs/command-reference.md | 22 +++ docs/drive-management.md | 18 +- pkg/admin/installer/consts.go | 5 +- pkg/admin/installer/daemonset.go | 62 +++---- pkg/admin/installer/deployment.go | 11 +- pkg/admin/installer/psp.go | 4 +- pkg/admin/installer/utils.go | 25 --- pkg/admin/repair.go | 237 ++++++++++++++++++++++++ pkg/apis/directpv.min.io/types/types.go | 3 + pkg/consts/consts.go | 18 ++ pkg/consts/consts.go.in | 18 ++ pkg/drive/event.go | 4 +- pkg/drive/repair.go | 209 +++++++++++++++++++++ pkg/k8s/k8s.go | 26 +++ pkg/legacy/converter/sys.go | 4 +- pkg/xfs/repair.go | 27 +++ pkg/xfs/repair_linux.go | 48 +++++ pkg/xfs/repair_other.go | 30 +++ 23 files changed, 872 insertions(+), 77 deletions(-) create mode 100644 cmd/directpv/repair.go create mode 100644 cmd/kubectl-directpv/repair.go create mode 100644 pkg/admin/repair.go create mode 100644 pkg/drive/repair.go create mode 100644 pkg/xfs/repair.go create mode 100644 pkg/xfs/repair_linux.go create mode 100644 pkg/xfs/repair_other.go diff --git a/.golangci.yml b/.golangci.yml index 8b0e532b..7d738763 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ linters-settings: - gofumpt: - lang-version: "1.22" +run: + go: "1.22" misspell: locale: US diff --git a/cmd/directpv/main.go b/cmd/directpv/main.go index d7cf3db1..b733ebe2 100644 --- a/cmd/directpv/main.go +++ b/cmd/directpv/main.go @@ -128,6 +128,7 @@ func init() { mainCmd.AddCommand(legacyControllerCmd) mainCmd.AddCommand(legacyNodeServerCmd) mainCmd.AddCommand(nodeControllerCmd) + mainCmd.AddCommand(repairCmd) } func main() { diff --git a/cmd/directpv/repair.go b/cmd/directpv/repair.go new file mode 100644 index 00000000..77edb9e3 --- /dev/null +++ b/cmd/directpv/repair.go @@ -0,0 +1,80 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + "errors" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/client" + drivepkg "github.com/minio/directpv/pkg/drive" + "github.com/minio/directpv/pkg/types" + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + forceFlag = false + disablePrefetchFlag = false + dryRunFlag = false +) + +var repairCmd = &cobra.Command{ + Use: "repair ", + Short: "Start drive repair.", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(c *cobra.Command, args []string) error { + switch len(args) { + case 0: + return errors.New("DRIVE-ID must be provided") + case 1: + default: + return errors.New("only one DRIVE-ID must be provided") + } + return startRepair(c.Context(), args[0]) + }, +} + +func init() { + repairCmd.PersistentFlags().BoolVar(&forceFlag, "force", forceFlag, "Force log zeroing") + repairCmd.PersistentFlags().BoolVar(&disablePrefetchFlag, "disable-prefetch", disablePrefetchFlag, "Disable prefetching of inode and directory blocks") + repairCmd.PersistentFlags().BoolVar(&dryRunFlag, "dry-run", dryRunFlag, "No modify mode") +} + +func startRepair(ctx context.Context, driveID string) error { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + defer cancel() + + drive, err := client.DriveClient().Get(ctx, driveID, metav1.GetOptions{}) + if err != nil { + return err + } + + if drive.Status.Status != directpvtypes.DriveStatusRepairing { + drive.Status.Status = directpvtypes.DriveStatusRepairing + } + + updatedDrive, err := client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()}) + if err != nil { + return err + } + + return drivepkg.Repair(ctx, updatedDrive, forceFlag, disablePrefetchFlag, dryRunFlag) +} diff --git a/cmd/kubectl-directpv/main.go b/cmd/kubectl-directpv/main.go index 505fd85e..089455c8 100644 --- a/cmd/kubectl-directpv/main.go +++ b/cmd/kubectl-directpv/main.go @@ -165,6 +165,7 @@ Use "{{.CommandPath}} [command] --help" for more information about this command. mainCmd.AddCommand(cleanCmd) mainCmd.AddCommand(suspendCmd) mainCmd.AddCommand(resumeCmd) + mainCmd.AddCommand(repairCmd) mainCmd.AddCommand(removeCmd) mainCmd.AddCommand(uninstallCmd) mainCmd.SetHelpCommand(&cobra.Command{ diff --git a/cmd/kubectl-directpv/repair.go b/cmd/kubectl-directpv/repair.go new file mode 100644 index 00000000..bd5deaaa --- /dev/null +++ b/cmd/kubectl-directpv/repair.go @@ -0,0 +1,92 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + "errors" + "os" + "strings" + + "github.com/minio/directpv/pkg/admin" + "github.com/minio/directpv/pkg/consts" + "github.com/spf13/cobra" +) + +var ( + forceFlag = false + disablePrefetchFlag = false +) + +var repairCmd = &cobra.Command{ + Use: "repair DRIVE ...", + Short: "Repair filesystem of drives", + SilenceUsage: true, + SilenceErrors: true, + Example: strings.ReplaceAll( + `1. Repair drives + $ kubectl {PLUGIN_NAME} repair 3b562992-f752-4a41-8be4-4e688ae8cd4c`, + `{PLUGIN_NAME}`, + consts.AppName, + ), + Run: func(c *cobra.Command, args []string) { + driveIDArgs = args + if err := validateRepairCmd(); err != nil { + eprintf(true, "%v\n", err) + os.Exit(-1) + } + + repairMain(c.Context()) + }, +} + +func init() { + setFlagOpts(repairCmd) + + addDryRunFlag(repairCmd, "Repair drives with no modify mode") + repairCmd.PersistentFlags().BoolVar(&forceFlag, "force", forceFlag, "Force log zeroing") + repairCmd.PersistentFlags().BoolVar(&disablePrefetchFlag, "disable-prefetch", disablePrefetchFlag, "Disable prefetching of inode and directory blocks") +} + +func validateRepairCmd() error { + if err := validateDriveIDArgs(); err != nil { + return err + } + + if len(driveIDArgs) == 0 { + return errors.New("no drive provided to repair") + } + + return nil +} + +func repairMain(ctx context.Context) { + _, err := adminClient.Repair( + ctx, + admin.RepairArgs{ + DriveIDs: driveIDSelectors, + DryRun: dryRunFlag, + ForceFlag: forceFlag, + DisablePrefetchFlag: disablePrefetchFlag, + }, + logFunc, + ) + if err != nil { + eprintf(!errors.Is(err, admin.ErrNoMatchingResourcesFound), "%v\n", err) + os.Exit(1) + } +} diff --git a/docs/command-reference.md b/docs/command-reference.md index 0e558b04..f105dcb3 100644 --- a/docs/command-reference.md +++ b/docs/command-reference.md @@ -722,6 +722,28 @@ EXAMPLES: $ kubectl directpv resume volumes pvc-0700b8c7-85b2-4894-b83a-274484f220d0 ``` +## `repair` command +``` +Repair filesystem of drives + +USAGE: + directpv repair DRIVE ... [flags] + +FLAGS: + --dry-run Repair drives with no modify mode + --force Force log zeroing + --disable-prefetch Disable prefetching of inode and directory blocks + -h, --help help for repair + +GLOBAL FLAGS: + --kubeconfig string Path to the kubeconfig file to use for CLI requests + --quiet Suppress printing error messages + +EXAMPLES: +1. Repair drives + $ kubectl directpv repair 3b562992-f752-4a41-8be4-4e688ae8cd4c +``` + ## `remove` command ``` Remove unused drives from DirectPV diff --git a/docs/drive-management.md b/docs/drive-management.md index 3e2780cc..e0adfdf5 100644 --- a/docs/drive-management.md +++ b/docs/drive-management.md @@ -118,11 +118,25 @@ Refer [remove command](./command-reference.md#remove-command) for more informati By Kubernetes design, [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) workload is active only if all of its pods are in running state. Any faulty drive(s) will prevent the statefulset from starting up. DirectPV provides a workaround to suspend failed drives which will mount the respective volumes on empty `/var/lib/directpv/tmp` directory with read-only access. This can be done by executing the `suspend drives` command. Below is an example: ```sh -> kubectl directpv suspend drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 +$ kubectl directpv suspend drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 ``` Suspended drives can be resumed once they are fixed. Upon resuming, the corresponding volumes will resume using the respective allocated drives. This can be done by using the `resume drives` command. Below is an example: ```sh -> kubectl directpv resume drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 +$ kubectl directpv resume drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 +``` + +## Repair drives + +***CAUTION: THIS IS DANGEROUS OPERATION WHICH LEADS TO DATA LOSS*** + +In a rare situation, filesystem on faulty drives can be repaired to make them usable. As a first step, faulty drives must be suspended, then the `repair` command should be run for them. The `repair` command creates onetime Kubernetes `Job` with the pod name as `repair-` and these jobs are auto removed after five minutes of its completion. Progress and status of the drive repair can be viewed using `kubectl log` command. Below is an example: + +```sh +# Suspend faulty drives +$ kubectl directpv suspend drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 + +# Run repair command on suspended drives +$ kubectl directpv repair af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 ``` diff --git a/pkg/admin/installer/consts.go b/pkg/admin/installer/consts.go index 31a18369..5640f006 100644 --- a/pkg/admin/installer/consts.go +++ b/pkg/admin/installer/consts.go @@ -25,9 +25,8 @@ const ( namespace = consts.AppName healthZContainerPortName = "healthz" healthZContainerPort = 9898 - volumePathSysDir = "/sys" - volumeNameSocketDir = "socket-dir" - socketDir = "/csi" + csiDirVolumeName = "socket-dir" + csiDirVolumePath = "/csi" selectorKey = "selector." + consts.GroupName kubeNodeNameEnvVarName = "KUBE_NODE_NAME" csiEndpointEnvVarName = "CSI_ENDPOINT" diff --git a/pkg/admin/installer/daemonset.go b/pkg/admin/installer/daemonset.go index 4ca0fbae..627b1122 100644 --- a/pkg/admin/installer/daemonset.go +++ b/pkg/admin/installer/daemonset.go @@ -23,6 +23,7 @@ import ( directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/client" "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/k8s" legacyclient "github.com/minio/directpv/pkg/legacy/client" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -32,20 +33,11 @@ import ( ) const ( - volumeNameMountpointDir = "mountpoint-dir" - volumeNameRegistrationDir = "registration-dir" - volumeNamePluginDir = "plugins-dir" - volumeNameAppRootDir = consts.AppName + "-common-root" - volumeNameLegacyAppRootDir = "direct-csi-common-root" - appRootDir = consts.AppRootDir + "/" - legacyAppRootDir = "/var/lib/direct-csi/" - volumeNameSysDir = "sysfs" - volumeNameDevDir = "devfs" - volumePathDevDir = "/dev" - volumeNameRunUdevData = "run-udev-data-dir" - volumePathRunUdevData = consts.UdevDataDir - socketFile = "/csi.sock" - totalDaemonsetSteps = 2 + kubeletPodsDirVolumeName = "mountpoint-dir" + registrationDirVolumeName = "registration-dir" + kubeletPluginsDirVolumeName = "plugins-dir" + socketFile = "/csi.sock" + totalDaemonsetSteps = 2 ) type daemonsetTask struct { @@ -97,25 +89,25 @@ func newSecurityContext(seccompProfile string) *corev1.SecurityContext { func getVolumesAndMounts(pluginSocketDir string) (volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) { volumes = []corev1.Volume{ - newHostPathVolume(volumeNameSocketDir, pluginSocketDir), - newHostPathVolume(volumeNameMountpointDir, kubeletDirPath+"/pods"), - newHostPathVolume(volumeNameRegistrationDir, kubeletDirPath+"/plugins_registry"), - newHostPathVolume(volumeNamePluginDir, kubeletDirPath+"/plugins"), - newHostPathVolume(volumeNameAppRootDir, appRootDir), - newHostPathVolume(volumeNameSysDir, volumePathSysDir), - newHostPathVolume(volumeNameDevDir, volumePathDevDir), - newHostPathVolume(volumeNameRunUdevData, volumePathRunUdevData), - newHostPathVolume(volumeNameLegacyAppRootDir, legacyAppRootDir), + k8s.NewHostPathVolume(csiDirVolumeName, pluginSocketDir), + k8s.NewHostPathVolume(kubeletPodsDirVolumeName, kubeletDirPath+"/pods"), + k8s.NewHostPathVolume(registrationDirVolumeName, kubeletDirPath+"/plugins_registry"), + k8s.NewHostPathVolume(kubeletPluginsDirVolumeName, kubeletDirPath+"/plugins"), + k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath), + k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath), + k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath), + k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath), + k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath), } volumeMounts = []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), - newVolumeMount(volumeNameMountpointDir, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false), - newVolumeMount(volumeNamePluginDir, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false), - newVolumeMount(volumeNameAppRootDir, appRootDir, corev1.MountPropagationBidirectional, false), - newVolumeMount(volumeNameSysDir, volumePathSysDir, corev1.MountPropagationBidirectional, false), - newVolumeMount(volumeNameDevDir, volumePathDevDir, corev1.MountPropagationHostToContainer, true), - newVolumeMount(volumeNameRunUdevData, volumePathRunUdevData, corev1.MountPropagationBidirectional, true), - newVolumeMount(volumeNameLegacyAppRootDir, legacyAppRootDir, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(kubeletPodsDirVolumeName, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(kubeletPluginsDirVolumeName, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true), + k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true), } return @@ -132,8 +124,8 @@ func nodeDriverRegistrarContainer(image, pluginSocketDir string) corev1.Containe }, Env: []corev1.EnvVar{kubeNodeNameEnvVar}, VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), - newVolumeMount(volumeNameRegistrationDir, "/registration", corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(registrationDirVolumeName, "/registration", corev1.MountPropagationNone, false), }, TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, TerminationMessagePath: "/var/log/driver-registrar-termination-log", @@ -195,13 +187,13 @@ func livenessProbeContainer(image string) corev1.Container { Name: "liveness-probe", Image: image, Args: []string{ - fmt.Sprintf("--csi-address=%v%v", socketDir, socketFile), + fmt.Sprintf("--csi-address=%v%v", csiDirVolumePath, socketFile), fmt.Sprintf("--health-port=%v", healthZContainerPort), }, TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, TerminationMessagePath: "/var/log/driver-liveness-termination-log", VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), }, } } diff --git a/pkg/admin/installer/deployment.go b/pkg/admin/installer/deployment.go index b6d74d44..429993a5 100644 --- a/pkg/admin/installer/deployment.go +++ b/pkg/admin/installer/deployment.go @@ -23,6 +23,7 @@ import ( directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/client" "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/k8s" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -96,8 +97,8 @@ func (t deploymentTask) doCreateDeployment(ctx context.Context, args *Args, lega podSpec := corev1.PodSpec{ ServiceAccountName: consts.Identity, Volumes: []corev1.Volume{ - newHostPathVolume( - volumeNameSocketDir, + k8s.NewHostPathVolume( + csiDirVolumeName, newPluginsSocketDir(kubeletDirPath, fmt.Sprintf("%s-controller", consts.ControllerServerName)), ), }, @@ -116,7 +117,7 @@ func (t deploymentTask) doCreateDeployment(ctx context.Context, args *Args, lega }, Env: []corev1.EnvVar{csiEndpointEnvVar}, VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), }, TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, TerminationMessagePath: "/var/log/controller-provisioner-termination-log", @@ -148,7 +149,7 @@ func (t deploymentTask) doCreateDeployment(ctx context.Context, args *Args, lega }, Env: []corev1.EnvVar{csiEndpointEnvVar}, VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), }, TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, TerminationMessagePath: "/var/log/controller-csi-resizer-termination-log", @@ -173,7 +174,7 @@ func (t deploymentTask) doCreateDeployment(ctx context.Context, args *Args, lega }, Env: []corev1.EnvVar{kubeNodeNameEnvVar, csiEndpointEnvVar}, VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), }, }, }, diff --git a/pkg/admin/installer/psp.go b/pkg/admin/installer/psp.go index 9efa3e77..bbaf984f 100644 --- a/pkg/admin/installer/psp.go +++ b/pkg/admin/installer/psp.go @@ -153,10 +153,10 @@ func (t pspTask) createPodSecurityPolicy(ctx context.Context, args *Args) (err e Volumes: []policy.FSType{policy.HostPath}, AllowedHostPaths: []policy.AllowedHostPath{ {PathPrefix: "/proc", ReadOnly: true}, - {PathPrefix: volumePathSysDir}, + {PathPrefix: consts.SysDirVolumePath}, {PathPrefix: consts.UdevDataDir, ReadOnly: true}, {PathPrefix: consts.AppRootDir}, - {PathPrefix: socketDir}, + {PathPrefix: csiDirVolumePath}, {PathPrefix: kubeletDirPath}, }, SELinux: policy.SELinuxStrategyOptions{ diff --git a/pkg/admin/installer/utils.go b/pkg/admin/installer/utils.go index c37ecaab..8bc82d7c 100644 --- a/pkg/admin/installer/utils.go +++ b/pkg/admin/installer/utils.go @@ -24,38 +24,13 @@ import ( "strings" "github.com/minio/directpv/pkg/k8s" - corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" ) -func newHostPathVolume(name, path string) corev1.Volume { - hostPathType := corev1.HostPathDirectoryOrCreate - volumeSource := corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: path, - Type: &hostPathType, - }, - } - - return corev1.Volume{ - Name: name, - VolumeSource: volumeSource, - } -} - func newPluginsSocketDir(kubeletDir, name string) string { return path.Join(kubeletDir, "plugins", k8s.SanitizeResourceName(name)) } -func newVolumeMount(name, path string, mountPropogation corev1.MountPropagationMode, readOnly bool) corev1.VolumeMount { - return corev1.VolumeMount{ - Name: name, - ReadOnly: readOnly, - MountPath: path, - MountPropagation: &mountPropogation, - } -} - func getRandSuffix() string { b := make([]byte, 5) if _, err := rand.Read(b); err != nil { diff --git a/pkg/admin/repair.go b/pkg/admin/repair.go new file mode 100644 index 00000000..94c47e20 --- /dev/null +++ b/pkg/admin/repair.go @@ -0,0 +1,237 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package admin + +import ( + "context" + "fmt" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/k8s" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + ttlSecondsAfterFinished = int32(5 * 60) // 5 Minutes + backOffLimit = int32(1) + + repairJobVolumes = []corev1.Volume{ + k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath), + k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath), + k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath), + k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath), + k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath), + } + + repairJobVolumeMounts = []corev1.VolumeMount{ + k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true), + k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true), + } +) + +// RepairArgs represents the arguments to repair a drive +type RepairArgs struct { + DriveIDs []directpvtypes.DriveID + DryRun bool + ForceFlag bool + DisablePrefetchFlag bool +} + +// RepairResult represents result of repaired drive +type RepairResult struct { + JobName string + DriveID directpvtypes.DriveID +} + +type repairContainerParams struct { + containerImage string + imagePullSecrets []corev1.LocalObjectReference + tolerations []corev1.Toleration + annotations map[string]string + securityContext *corev1.SecurityContext +} + +func (client *Client) getContainerParams(ctx context.Context) (params repairContainerParams, err error) { + daemonSet, err := client.Kube().AppsV1().DaemonSets(consts.AppName).Get( + ctx, consts.NodeServerName, metav1.GetOptions{}, + ) + + if err != nil && !apierrors.IsNotFound(err) { + return params, err + } + + if daemonSet == nil || daemonSet.UID == "" { + return params, fmt.Errorf("invalid daemonset found") + } + + for _, container := range daemonSet.Spec.Template.Spec.Containers { + if container.Name == consts.NodeServerName { + params.containerImage = container.Image + params.securityContext = container.SecurityContext + break + } + } + + params.imagePullSecrets = daemonSet.Spec.Template.Spec.ImagePullSecrets + params.tolerations = daemonSet.Spec.Template.Spec.Tolerations + params.annotations = daemonSet.Spec.Template.Annotations + + return +} + +// Repair repairs added drives +func (client *Client) Repair(ctx context.Context, args RepairArgs, log LogFunc) (results []RepairResult, err error) { + if len(args.DriveIDs) == 0 { + return + } + + if log == nil { + log = nullLogger + } + + ctx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + params, err := client.getContainerParams(ctx) + if err != nil { + log( + LogMessage{ + Type: ErrorLogType, + Err: err, + Message: "unable to get container parameters from daemonset for drive repair", + Values: map[string]any{"namespace": consts.AppName, "daemonSet": consts.NodeServerName}, + FormattedMessage: fmt.Sprintf("unable to get container parameters from daemonset; %v\n", err), + }, + ) + return nil, err + } + + resultCh := client.NewDriveLister(). + DriveIDSelector(args.DriveIDs). + IgnoreNotFound(true). + List(ctx) + for result := range resultCh { + if result.Err != nil { + return results, result.Err + } + + jobName := "repair-" + result.Drive.Name + if _, err := client.Kube().BatchV1().Jobs(consts.AppName).Get(ctx, jobName, metav1.GetOptions{}); err != nil { + if !apierrors.IsNotFound(err) { + log( + LogMessage{ + Type: ErrorLogType, + Err: err, + Message: "unable to get repair job", + Values: map[string]any{"jobName": jobName}, + FormattedMessage: fmt.Sprintf("unable to get repair job %v; %v\n", jobName, err), + }, + ) + continue + } + } else { + log( + LogMessage{ + Type: ErrorLogType, + Err: err, + Message: "job already exists", + Values: map[string]any{"jobName": jobName}, + FormattedMessage: fmt.Sprintf("job %v already exists\n", jobName), + }, + ) + continue + } + + nodeID := string(result.Drive.GetNodeID()) + + containerArgs := []string{"/directpv", "repair", result.Drive.Name, "--kube-node-name=" + nodeID} + if args.ForceFlag { + containerArgs = append(containerArgs, "--force") + } + if args.DisablePrefetchFlag { + containerArgs = append(containerArgs, "--disable-prefetch") + } + if args.DryRun { + containerArgs = append(containerArgs, "--dry-run") + } + + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: consts.AppName, + Annotations: params.annotations, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: &backOffLimit, + TTLSecondsAfterFinished: &ttlSecondsAfterFinished, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + NodeSelector: map[string]string{string(directpvtypes.NodeLabelKey): nodeID}, + ServiceAccountName: consts.Identity, + Tolerations: params.tolerations, + ImagePullSecrets: params.imagePullSecrets, + Volumes: repairJobVolumes, + Containers: []corev1.Container{ + { + Name: jobName, + Image: params.containerImage, + Command: containerArgs, + SecurityContext: params.securityContext, + VolumeMounts: repairJobVolumeMounts, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, + TerminationMessagePath: "/var/log/repair-termination-log", + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } + + if _, err := client.Kube().BatchV1().Jobs(consts.AppName).Create(ctx, &job, metav1.CreateOptions{}); err != nil { + log( + LogMessage{ + Type: ErrorLogType, + Err: err, + Message: "unable to create repair job", + Values: map[string]any{"jobName": jobName, "driveName": result.Drive.Name}, + FormattedMessage: fmt.Sprintf("unable to create repair job %v; %v\n", jobName, err), + }, + ) + } else { + log( + LogMessage{ + Type: InfoLogType, + Message: "repair job created", + Values: map[string]any{"jobName": jobName, "driveName": result.Drive.Name}, + FormattedMessage: fmt.Sprintf("repair job %v for drive %v is created\n", jobName, result.Drive.Name), + }, + ) + + results = append(results, RepairResult{JobName: jobName, DriveID: result.Drive.GetDriveID()}) + } + } + + return +} diff --git a/pkg/apis/directpv.min.io/types/types.go b/pkg/apis/directpv.min.io/types/types.go index 5c1d06f1..9f031b78 100644 --- a/pkg/apis/directpv.min.io/types/types.go +++ b/pkg/apis/directpv.min.io/types/types.go @@ -48,6 +48,9 @@ const ( // DriveStatusMoving denotes drive is moving volumes. DriveStatusMoving DriveStatus = "Moving" + + // DriveStatusRepairing denotes drive is repairing it's filesystem. + DriveStatusRepairing DriveStatus = "Repairing" ) // ToDriveStatus converts string value to DriveStatus. diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 9bee57e0..b90cb0b4 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -97,4 +97,22 @@ const ( // TmpFS mount TmpMountDir = AppRootDir + "/tmp" + + // LegacyAppRootDir is legacy application root directory. + LegacyAppRootDir = "/var/lib/direct-csi" + + AppRootDirVolumeName = AppName + "-common-root" + AppRootDirVolumePath = AppRootDir + "/" + + LegacyAppRootDirVolumeName = "direct-csi-common-root" + LegacyAppRootDirVolumePath = LegacyAppRootDir + "/" + + SysDirVolumeName = "sysfs" + SysDirVolumePath = "/sys" + + DevDirVolumeName = "devfs" + DevDirVolumePath = "/dev" + + RunUdevDataVolumeName = "run-udev-data-dir" + RunUdevDataVolumePath = UdevDataDir ) diff --git a/pkg/consts/consts.go.in b/pkg/consts/consts.go.in index bf50b159..b4fa6bf5 100644 --- a/pkg/consts/consts.go.in +++ b/pkg/consts/consts.go.in @@ -95,4 +95,22 @@ const ( // TmpFS mount TmpMountDir = AppRootDir + "/tmp" + + // LegacyAppRootDir is legacy application root directory. + LegacyAppRootDir = "/var/lib/direct-csi" + + AppRootDirVolumeName = AppName + "-common-root" + AppRootDirVolumePath = AppRootDir + "/" + + LegacyAppRootDirVolumeName = "direct-csi-common-root" + LegacyAppRootDirVolumePath = LegacyAppRootDir + "/" + + SysDirVolumeName = "sysfs" + SysDirVolumePath = "/sys" + + DevDirVolumeName = "devfs" + DevDirVolumePath = "/dev" + + RunUdevDataVolumeName = "run-udev-data-dir" + RunUdevDataVolumePath = UdevDataDir ) diff --git a/pkg/drive/event.go b/pkg/drive/event.go index 6c106346..7936e5e1 100644 --- a/pkg/drive/event.go +++ b/pkg/drive/event.go @@ -183,7 +183,7 @@ func newDriveEventHandler(nodeID directpvtypes.NodeID) *driveEventHandler { if err = os.Remove(driveMountPoint); err != nil && !errors.Is(err, os.ErrNotExist) { return err } - driveMountPoint = path.Join("/var/lib/direct-csi/mnt", fsuuid) + driveMountPoint = path.Join(consts.LegacyAppRootDir, "mnt", fsuuid) if err = os.Remove(driveMountPoint); err != nil && !errors.Is(err, os.ErrNotExist) { return err } @@ -221,7 +221,7 @@ func (handler *driveEventHandler) unmountDrive(drive *types.Drive, skipDriveMoun devices, found := mountPointMap[driveMountPoint] if !found { // Check for legacy mount for backward compatibility. - driveMountPoint = path.Join("/var/lib/direct-csi/mnt", drive.Status.FSUUID) + driveMountPoint = path.Join(consts.LegacyAppRootDir, "mnt", drive.Status.FSUUID) if devices, found = mountPointMap[driveMountPoint]; !found { return nil // Device already umounted } diff --git a/pkg/drive/repair.go b/pkg/drive/repair.go new file mode 100644 index 00000000..5c23bf13 --- /dev/null +++ b/pkg/drive/repair.go @@ -0,0 +1,209 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package drive + +import ( + "bytes" + "context" + "fmt" + "io" + "path" + "sync" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/client" + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/sys" + "github.com/minio/directpv/pkg/types" + "github.com/minio/directpv/pkg/utils" + "github.com/minio/directpv/pkg/xfs" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" +) + +type logWriter struct { + buffer []byte + closed bool + mutex sync.Mutex +} + +func (w *logWriter) Write(data []byte) (n int, err error) { + w.mutex.Lock() + defer w.mutex.Unlock() + + if w.closed { + klog.Info(string(data)) + return len(data), io.EOF + } + + w.buffer = append(w.buffer, data...) + for { + index := bytes.IndexRune(w.buffer, '\n') + if index < 0 { + break + } + + klog.Info(string(w.buffer[:index+1])) + w.buffer = w.buffer[index+1:] + } + + return len(data), nil +} + +func (w *logWriter) Close() (err error) { + w.mutex.Lock() + defer w.mutex.Unlock() + + klog.Info(string(w.buffer)) + w.buffer = nil + w.closed = true + return nil +} + +func repair(ctx context.Context, drive *types.Drive, force, disablePrefetch, dryRun bool, + getDeviceByFSUUID func(fsuuid string) (string, error), + getMounts func() (deviceMap map[string]utils.StringSet, err error), + unmount func(mountPoint string) error, + repair func(ctx context.Context, device string, force, disablePrefetch, dryRun bool, output io.Writer) error, + mount func(device, target string) (err error), +) error { + device, err := getDeviceByFSUUID(drive.Status.FSUUID) + if err != nil { + klog.ErrorS( + err, + "unable to find device by FSUUID; "+ + "either device is removed or run command "+ + "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+ + "on the host to reload", + "FSUUID", drive.Status.FSUUID) + client.Eventf( + drive, client.EventTypeWarning, client.EventReasonStageVolume, + "unable to find device by FSUUID %v; "+ + "either device is removed or run command "+ + "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+ + " on the host to reload", drive.Status.FSUUID) + return fmt.Errorf("unable to find device by FSUUID %v; %w", drive.Status.FSUUID, err) + } + + target := types.GetDriveMountDir(drive.Status.FSUUID) + legacyTarget := path.Join(consts.LegacyAppRootDir, "mnt", drive.Status.FSUUID) + + deviceMap, err := getMounts() + if err != nil { + return err + } + + mountPoints := deviceMap[device] + getCount := func() int { + count := len(mountPoints) + if mountPoints.Exist(target) { + count-- + } + if mountPoints.Exist(legacyTarget) { + count-- + } + return count + } + + switch len(mountPoints) { + case 0: + case 1: + if mountPoints.Exist(target) { + if err = unmount(target); err != nil { + return err + } + } + if mountPoints.Exist(legacyTarget) { + if err = unmount(legacyTarget); err != nil { + return err + } + } + case 2: + if mountPoints.Exist(target) && mountPoints.Exist(legacyTarget) { + if err = unmount(target); err != nil { + return err + } + if err = unmount(legacyTarget); err != nil { + return err + } + } else { + return fmt.Errorf("unable to run xfs repair; %v volume mounts still mounted", getCount()) + } + default: + return fmt.Errorf("unable to run xfs repair; %v volume mounts still mounted", getCount()) + } + + logWriter := &logWriter{} + err = repair(ctx, device, force, disablePrefetch, dryRun, logWriter) + logWriter.Close() + if err != nil { + return err + } + + merr := mount(device, target) + if merr != nil { + klog.ErrorS(err, "unable to mount the drive", "Source", device, "Target", target) + } + + driveID := drive.GetDriveID() + updateFunc := func() error { + drive, err := client.DriveClient().Get(ctx, string(driveID), metav1.GetOptions{}) + if err != nil { + return err + } + + if merr != nil { + drive.SetMountErrorCondition(fmt.Sprintf("unable to mount; %v", err)) + client.Eventf(drive, + client.EventTypeWarning, + client.EventReasonDriveMountError, + "unable to mount the drive; %v", err, + ) + drive.Status.Status = directpvtypes.DriveStatusError + } else { + client.Eventf( + drive, + client.EventTypeNormal, + client.EventReasonDriveMounted, + "Drive mounted successfully to %s", target, + ) + drive.Status.Status = directpvtypes.DriveStatusReady + } + + _, err = client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()}) + return err + } + + return retry.RetryOnConflict(retry.DefaultRetry, updateFunc) +} + +// Repair runs `xfs_repair` command on specified drive +func Repair(ctx context.Context, drive *types.Drive, force, disablePrefetch, dryRun bool) error { + return repair(ctx, drive, force, disablePrefetch, dryRun, + sys.GetDeviceByFSUUID, + func() (deviceMap map[string]utils.StringSet, err error) { + _, deviceMap, _, _, err = sys.GetMounts(false) + return + }, + func(mountPoint string) error { + return sys.Unmount(mountPoint, true, true, false) + }, + xfs.Repair, + xfs.Mount, + ) +} diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index d93fc874..1dd58c0d 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -322,3 +322,29 @@ func ParseTolerations(values []string) ([]corev1.Toleration, error) { return tolerations, nil } + +// NewHostPathVolume - creates volume for given name and host path. +func NewHostPathVolume(name, path string) corev1.Volume { + hostPathType := corev1.HostPathDirectoryOrCreate + volumeSource := corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: path, + Type: &hostPathType, + }, + } + + return corev1.Volume{ + Name: name, + VolumeSource: volumeSource, + } +} + +// NewVolumeMount - creates volume mount for given name, path, mount propagation and read only flag. +func NewVolumeMount(name, path string, mountPropogation corev1.MountPropagationMode, readOnly bool) corev1.VolumeMount { + return corev1.VolumeMount{ + Name: name, + ReadOnly: readOnly, + MountPath: path, + MountPropagation: &mountPropogation, + } +} diff --git a/pkg/legacy/converter/sys.go b/pkg/legacy/converter/sys.go index 84a51718..7103f043 100644 --- a/pkg/legacy/converter/sys.go +++ b/pkg/legacy/converter/sys.go @@ -16,12 +16,14 @@ package converter +import "github.com/minio/directpv/pkg/consts" + const ( // HostDevRoot is "/dev" directory. HostDevRoot = "/dev" // DirectCSIDevRoot is "/var/lib/direct-csi/devices" directory. - DirectCSIDevRoot = "/var/lib/direct-csi/devices" + DirectCSIDevRoot = consts.LegacyAppRootDir + "/devices" // DirectCSIPartitionInfix is partition infix value. DirectCSIPartitionInfix = "-part-" diff --git a/pkg/xfs/repair.go b/pkg/xfs/repair.go new file mode 100644 index 00000000..db83b1c7 --- /dev/null +++ b/pkg/xfs/repair.go @@ -0,0 +1,27 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package xfs + +import ( + "context" + "io" +) + +// Repair is a utility function to repair XFS on a device +func Repair(ctx context.Context, device string, force, disablePrefetch, dryRun bool, output io.Writer) error { + return repair(ctx, device, force, disablePrefetch, dryRun, output) +} diff --git a/pkg/xfs/repair_linux.go b/pkg/xfs/repair_linux.go new file mode 100644 index 00000000..51fd0276 --- /dev/null +++ b/pkg/xfs/repair_linux.go @@ -0,0 +1,48 @@ +//go:build linux + +// This file is part of MinIO DirectPV +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package xfs + +import ( + "context" + "fmt" + "io" + "os/exec" +) + +func repair(ctx context.Context, device string, force, disablePrefetch, dryRun bool, output io.Writer) error { + args := []string{device, "-v"} + if force { + args = append(args, "-L") + } + if disablePrefetch { + args = append(args, "-P") + } + if dryRun { + args = append(args, "-n") + } + + cmd := exec.CommandContext(ctx, "xfs_repair", args...) + cmd.Stdout = output + cmd.Stderr = output + if err := cmd.Run(); err != nil { + return fmt.Errorf("unable to run xfs_repair on device %v; %w", device, err) + } + + return nil +} diff --git a/pkg/xfs/repair_other.go b/pkg/xfs/repair_other.go new file mode 100644 index 00000000..0d3887bc --- /dev/null +++ b/pkg/xfs/repair_other.go @@ -0,0 +1,30 @@ +//go:build !linux + +// This file is part of MinIO DirectPV +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package xfs + +import ( + "context" + "fmt" + "io" + "runtime" +) + +func repair(_ context.Context, _ string, _, _, _ bool, _ io.Writer) error { + return fmt.Errorf("unsupported operating system %v", runtime.GOOS) +}