diff --git a/.golangci.yml b/.golangci.yml
index 8b0e532b..7d738763 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -1,6 +1,6 @@
linters-settings:
- gofumpt:
- lang-version: "1.22"
+run:
+ go: "1.22"
misspell:
locale: US
diff --git a/cmd/directpv/main.go b/cmd/directpv/main.go
index d7cf3db1..b733ebe2 100644
--- a/cmd/directpv/main.go
+++ b/cmd/directpv/main.go
@@ -128,6 +128,7 @@ func init() {
mainCmd.AddCommand(legacyControllerCmd)
mainCmd.AddCommand(legacyNodeServerCmd)
mainCmd.AddCommand(nodeControllerCmd)
+ mainCmd.AddCommand(repairCmd)
}
func main() {
diff --git a/cmd/directpv/repair.go b/cmd/directpv/repair.go
new file mode 100644
index 00000000..77edb9e3
--- /dev/null
+++ b/cmd/directpv/repair.go
@@ -0,0 +1,80 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2024 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "context"
+ "errors"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/client"
+ drivepkg "github.com/minio/directpv/pkg/drive"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/spf13/cobra"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+var (
+ forceFlag = false
+ disablePrefetchFlag = false
+ dryRunFlag = false
+)
+
+var repairCmd = &cobra.Command{
+ Use: "repair ",
+ Short: "Start drive repair.",
+ SilenceUsage: true,
+ SilenceErrors: true,
+ RunE: func(c *cobra.Command, args []string) error {
+ switch len(args) {
+ case 0:
+ return errors.New("DRIVE-ID must be provided")
+ case 1:
+ default:
+ return errors.New("only one DRIVE-ID must be provided")
+ }
+ return startRepair(c.Context(), args[0])
+ },
+}
+
+func init() {
+ repairCmd.PersistentFlags().BoolVar(&forceFlag, "force", forceFlag, "Force log zeroing")
+ repairCmd.PersistentFlags().BoolVar(&disablePrefetchFlag, "disable-prefetch", disablePrefetchFlag, "Disable prefetching of inode and directory blocks")
+ repairCmd.PersistentFlags().BoolVar(&dryRunFlag, "dry-run", dryRunFlag, "No modify mode")
+}
+
+func startRepair(ctx context.Context, driveID string) error {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithCancel(ctx)
+ defer cancel()
+
+ drive, err := client.DriveClient().Get(ctx, driveID, metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+
+ if drive.Status.Status != directpvtypes.DriveStatusRepairing {
+ drive.Status.Status = directpvtypes.DriveStatusRepairing
+ }
+
+ updatedDrive, err := client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()})
+ if err != nil {
+ return err
+ }
+
+ return drivepkg.Repair(ctx, updatedDrive, forceFlag, disablePrefetchFlag, dryRunFlag)
+}
diff --git a/cmd/kubectl-directpv/main.go b/cmd/kubectl-directpv/main.go
index 505fd85e..089455c8 100644
--- a/cmd/kubectl-directpv/main.go
+++ b/cmd/kubectl-directpv/main.go
@@ -165,6 +165,7 @@ Use "{{.CommandPath}} [command] --help" for more information about this command.
mainCmd.AddCommand(cleanCmd)
mainCmd.AddCommand(suspendCmd)
mainCmd.AddCommand(resumeCmd)
+ mainCmd.AddCommand(repairCmd)
mainCmd.AddCommand(removeCmd)
mainCmd.AddCommand(uninstallCmd)
mainCmd.SetHelpCommand(&cobra.Command{
diff --git a/cmd/kubectl-directpv/repair.go b/cmd/kubectl-directpv/repair.go
new file mode 100644
index 00000000..bd5deaaa
--- /dev/null
+++ b/cmd/kubectl-directpv/repair.go
@@ -0,0 +1,92 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2024 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "context"
+ "errors"
+ "os"
+ "strings"
+
+ "github.com/minio/directpv/pkg/admin"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/spf13/cobra"
+)
+
+var (
+ forceFlag = false
+ disablePrefetchFlag = false
+)
+
+var repairCmd = &cobra.Command{
+ Use: "repair DRIVE ...",
+ Short: "Repair filesystem of drives",
+ SilenceUsage: true,
+ SilenceErrors: true,
+ Example: strings.ReplaceAll(
+ `1. Repair drives
+ $ kubectl {PLUGIN_NAME} repair 3b562992-f752-4a41-8be4-4e688ae8cd4c`,
+ `{PLUGIN_NAME}`,
+ consts.AppName,
+ ),
+ Run: func(c *cobra.Command, args []string) {
+ driveIDArgs = args
+ if err := validateRepairCmd(); err != nil {
+ eprintf(true, "%v\n", err)
+ os.Exit(-1)
+ }
+
+ repairMain(c.Context())
+ },
+}
+
+func init() {
+ setFlagOpts(repairCmd)
+
+ addDryRunFlag(repairCmd, "Repair drives with no modify mode")
+ repairCmd.PersistentFlags().BoolVar(&forceFlag, "force", forceFlag, "Force log zeroing")
+ repairCmd.PersistentFlags().BoolVar(&disablePrefetchFlag, "disable-prefetch", disablePrefetchFlag, "Disable prefetching of inode and directory blocks")
+}
+
+func validateRepairCmd() error {
+ if err := validateDriveIDArgs(); err != nil {
+ return err
+ }
+
+ if len(driveIDArgs) == 0 {
+ return errors.New("no drive provided to repair")
+ }
+
+ return nil
+}
+
+func repairMain(ctx context.Context) {
+ _, err := adminClient.Repair(
+ ctx,
+ admin.RepairArgs{
+ DriveIDs: driveIDSelectors,
+ DryRun: dryRunFlag,
+ ForceFlag: forceFlag,
+ DisablePrefetchFlag: disablePrefetchFlag,
+ },
+ logFunc,
+ )
+ if err != nil {
+ eprintf(!errors.Is(err, admin.ErrNoMatchingResourcesFound), "%v\n", err)
+ os.Exit(1)
+ }
+}
diff --git a/docs/command-reference.md b/docs/command-reference.md
index 0e558b04..f105dcb3 100644
--- a/docs/command-reference.md
+++ b/docs/command-reference.md
@@ -722,6 +722,28 @@ EXAMPLES:
$ kubectl directpv resume volumes pvc-0700b8c7-85b2-4894-b83a-274484f220d0
```
+## `repair` command
+```
+Repair filesystem of drives
+
+USAGE:
+ directpv repair DRIVE ... [flags]
+
+FLAGS:
+ --dry-run Repair drives with no modify mode
+ --force Force log zeroing
+ --disable-prefetch Disable prefetching of inode and directory blocks
+ -h, --help help for repair
+
+GLOBAL FLAGS:
+ --kubeconfig string Path to the kubeconfig file to use for CLI requests
+ --quiet Suppress printing error messages
+
+EXAMPLES:
+1. Repair drives
+ $ kubectl directpv repair 3b562992-f752-4a41-8be4-4e688ae8cd4c
+```
+
## `remove` command
```
Remove unused drives from DirectPV
diff --git a/docs/drive-management.md b/docs/drive-management.md
index 3e2780cc..f5b1948d 100644
--- a/docs/drive-management.md
+++ b/docs/drive-management.md
@@ -118,11 +118,27 @@ Refer [remove command](./command-reference.md#remove-command) for more informati
By Kubernetes design, [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) workload is active only if all of its pods are in running state. Any faulty drive(s) will prevent the statefulset from starting up. DirectPV provides a workaround to suspend failed drives which will mount the respective volumes on empty `/var/lib/directpv/tmp` directory with read-only access. This can be done by executing the `suspend drives` command. Below is an example:
```sh
-> kubectl directpv suspend drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0
+$ kubectl directpv suspend drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0
```
Suspended drives can be resumed once they are fixed. Upon resuming, the corresponding volumes will resume using the respective allocated drives. This can be done by using the `resume drives` command. Below is an example:
```sh
-> kubectl directpv resume drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0
+$ kubectl directpv resume drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0
+```
+
+## Repair drives
+
+***CAUTION: THIS IS DANGEROUS OPERATION WHICH LEADS TO DATA LOSS***
+
+In a rare situation, filesystem on faulty drives can be repaired to make them usable. As a first step, faulty drives must be suspended, then the `repair` command should be run for them. The `repair` command creates onetime Kubernetes `Job` with the pod name as `repair-` and these jobs are auto removed after five minutes of its completion. Progress and status of the drive repair can be viewed using `kubectl log` command. Below is an example:
+
+```sh
+# Suspend faulty drives
+$ kubectl directpv suspend drives af3b8b4c-73b4-4a74-84b7-1ec30492a6f0
+
+# Restart volume consumer pods and make sure associated volumes are unbound
+
+# Run repair command on suspended drives
+$ kubectl directpv repair af3b8b4c-73b4-4a74-84b7-1ec30492a6f0
```
diff --git a/docs/tools/repair.sh b/docs/tools/repair.sh
new file mode 100644
index 00000000..1f904790
--- /dev/null
+++ b/docs/tools/repair.sh
@@ -0,0 +1,145 @@
+#!/usr/bin/env bash
+#
+# This file is part of MinIO DirectPV
+# Copyright (c) 2024 MinIO, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+#
+# This script repairs faulty drives
+#
+
+set -e
+
+ME=$(basename "$0"); export ME
+
+declare -a drive_ids
+
+# usage: is_uuid
+function is_uuid() {
+ [[ "$1" =~ ^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$ ]]
+}
+
+# usage: get_suspend_value
+function get_suspend_value() {
+ # shellcheck disable=SC2016
+ kubectl get directpvvolumes "${1}" \
+ -o go-template='{{range $k,$v := .metadata.labels}}{{if eq $k "directpv.min.io/suspend"}}{{$v}}{{end}}{{end}}'
+}
+
+# usage: is_suspended
+function is_suspended() {
+ value=$(get_suspend_value "${1}")
+ [[ "${value,,}" = "true" ]]
+}
+
+# usage: get_volumes
+function get_volumes() {
+ kubectl get directpvvolumes \
+ --selector="directpv.min.io/drive=${1}" \
+ -o go-template='{{range .items}}{{.metadata.name}}{{ " " | print }}{{end}}'
+}
+
+# usage: get_pod_name
+function get_pod_name() {
+ # shellcheck disable=SC2016
+ kubectl get directpvvolumes "${1}" \
+ -o go-template='{{range $k,$v := .metadata.labels}}{{if eq $k "directpv.min.io/pod.name"}}{{$v}}{{end}}{{end}}'
+}
+
+# usage: get_pod_namespace
+function get_pod_namespace() {
+ # shellcheck disable=SC2016
+ kubectl get directpvvolumes "${1}" \
+ -o go-template='{{range $k,$v := .metadata.labels}}{{if eq $k "directpv.min.io/pod.namespace"}}{{$v}}{{end}}{{end}}'
+}
+
+function init() {
+ if [[ $# -eq 0 ]]; then
+ cat < ...
+
+ARGUMENTS:
+ DRIVE-ID Faulty drive ID.
+
+EXAMPLE:
+ # Repair drive af3b8b4c-73b4-4a74-84b7-1ec30492a6f0.
+ $ ${ME} af3b8b4c-73b4-4a74-84b7-1ec30492a6f0
+EOF
+ exit 255
+ fi
+
+ if ! which kubectl >/dev/null 2>&1; then
+ echo "kubectl not found; please install"
+ exit 255
+ fi
+
+ if ! kubectl directpv --version >/dev/null 2>&1; then
+ echo "kubectl directpv not found; please install"
+ exit 255
+ fi
+
+ for drive in "$@"; do
+ if ! is_uuid "${drive}"; then
+ echo "invalid drive ID ${drive}"
+ exit 255
+ fi
+ if [[ ! ${drive_ids[*]} =~ ${drive} ]]; then
+ drive_ids+=( "${drive}" )
+ fi
+ done
+}
+
+# usage: repair
+function repair() {
+ drive_id="$1"
+
+ pods_deleted=true
+ if ! is_suspended "${drive_id}"; then
+ kubectl directpv suspend "${drive_id}"
+
+ # shellcheck disable=SC2207
+ volumes=( $(get_volumes "${drive_id}") )
+ for volume in "${volumes[@]}"; do
+ pod_name=$(get_pod_name "${volume}")
+ pod_namespace=$(get_pod_namespace "${volume}")
+
+ if ! kubectl delete pod "${pod_name}" --namespace "${pod_namespace}"; then
+ echo "unable to delete pod '${pod_name}' using volume '${volume}'; please delete the pod manually"
+ pods_deleted=false
+ fi
+ done
+ else
+ echo "drive ${drive_id} already suspended"
+ fi
+
+ if [ "${pods_deleted}" == "true" ]; then
+ kubectl directpv repair "${drive_id}"
+ else
+ echo "delete pods manually and retry again for drive ${drive_id}"
+ fi
+}
+
+function main() {
+ for drive in "${drive_ids[@]}"; do
+ repair "${drive}"
+ done
+}
+
+init "$@"
+main "$@"
diff --git a/docs/tools/replace.sh b/docs/tools/replace.sh
index 8fe117b4..b971a37f 100755
--- a/docs/tools/replace.sh
+++ b/docs/tools/replace.sh
@@ -61,7 +61,7 @@ function get_node_name() {
-o go-template='{{range $k,$v := .metadata.labels}}{{if eq $k "directpv.min.io/node"}}{{$v}}{{end}}{{end}}'
}
-# usage: is_uuid input
+# usage: is_uuid
function is_uuid() {
[[ "$1" =~ ^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$ ]]
}
@@ -179,9 +179,9 @@ function main() {
exit 1
fi
- mapfile -t volumes < <(get_volumes "${src_drive_id}")
- IFS=' ' read -r -a volumes_arr <<< "${volumes[@]}"
- for volume in "${volumes_arr[@]}"; do
+ # shellcheck disable=SC2207
+ volumes=( $(get_volumes "${src_drive_id}") )
+ for volume in "${volumes[@]}"; do
pod_name=$(get_pod_name "${volume}")
pod_namespace=$(get_pod_namespace "${volume}")
@@ -190,9 +190,9 @@ function main() {
fi
done
- if [ "${#volumes_arr[@]}" -gt 0 ]; then
+ if [ "${#volumes[@]}" -gt 0 ]; then
# Wait for associated DirectPV volumes to be unbound
- while kubectl directpv list volumes --no-headers "${volumes_arr[@]}" | grep -q Bounded; do
+ while kubectl directpv list volumes --no-headers "${volumes[@]}" | grep -q Bounded; do
echo "...waiting for volumes to be unbound"
sleep 10
done
diff --git a/pkg/admin/installer/consts.go b/pkg/admin/installer/consts.go
index 31a18369..5640f006 100644
--- a/pkg/admin/installer/consts.go
+++ b/pkg/admin/installer/consts.go
@@ -25,9 +25,8 @@ const (
namespace = consts.AppName
healthZContainerPortName = "healthz"
healthZContainerPort = 9898
- volumePathSysDir = "/sys"
- volumeNameSocketDir = "socket-dir"
- socketDir = "/csi"
+ csiDirVolumeName = "socket-dir"
+ csiDirVolumePath = "/csi"
selectorKey = "selector." + consts.GroupName
kubeNodeNameEnvVarName = "KUBE_NODE_NAME"
csiEndpointEnvVarName = "CSI_ENDPOINT"
diff --git a/pkg/admin/installer/daemonset.go b/pkg/admin/installer/daemonset.go
index 4ca0fbae..627b1122 100644
--- a/pkg/admin/installer/daemonset.go
+++ b/pkg/admin/installer/daemonset.go
@@ -23,6 +23,7 @@ import (
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/k8s"
legacyclient "github.com/minio/directpv/pkg/legacy/client"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -32,20 +33,11 @@ import (
)
const (
- volumeNameMountpointDir = "mountpoint-dir"
- volumeNameRegistrationDir = "registration-dir"
- volumeNamePluginDir = "plugins-dir"
- volumeNameAppRootDir = consts.AppName + "-common-root"
- volumeNameLegacyAppRootDir = "direct-csi-common-root"
- appRootDir = consts.AppRootDir + "/"
- legacyAppRootDir = "/var/lib/direct-csi/"
- volumeNameSysDir = "sysfs"
- volumeNameDevDir = "devfs"
- volumePathDevDir = "/dev"
- volumeNameRunUdevData = "run-udev-data-dir"
- volumePathRunUdevData = consts.UdevDataDir
- socketFile = "/csi.sock"
- totalDaemonsetSteps = 2
+ kubeletPodsDirVolumeName = "mountpoint-dir"
+ registrationDirVolumeName = "registration-dir"
+ kubeletPluginsDirVolumeName = "plugins-dir"
+ socketFile = "/csi.sock"
+ totalDaemonsetSteps = 2
)
type daemonsetTask struct {
@@ -97,25 +89,25 @@ func newSecurityContext(seccompProfile string) *corev1.SecurityContext {
func getVolumesAndMounts(pluginSocketDir string) (volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) {
volumes = []corev1.Volume{
- newHostPathVolume(volumeNameSocketDir, pluginSocketDir),
- newHostPathVolume(volumeNameMountpointDir, kubeletDirPath+"/pods"),
- newHostPathVolume(volumeNameRegistrationDir, kubeletDirPath+"/plugins_registry"),
- newHostPathVolume(volumeNamePluginDir, kubeletDirPath+"/plugins"),
- newHostPathVolume(volumeNameAppRootDir, appRootDir),
- newHostPathVolume(volumeNameSysDir, volumePathSysDir),
- newHostPathVolume(volumeNameDevDir, volumePathDevDir),
- newHostPathVolume(volumeNameRunUdevData, volumePathRunUdevData),
- newHostPathVolume(volumeNameLegacyAppRootDir, legacyAppRootDir),
+ k8s.NewHostPathVolume(csiDirVolumeName, pluginSocketDir),
+ k8s.NewHostPathVolume(kubeletPodsDirVolumeName, kubeletDirPath+"/pods"),
+ k8s.NewHostPathVolume(registrationDirVolumeName, kubeletDirPath+"/plugins_registry"),
+ k8s.NewHostPathVolume(kubeletPluginsDirVolumeName, kubeletDirPath+"/plugins"),
+ k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath),
+ k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath),
+ k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath),
+ k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath),
+ k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath),
}
volumeMounts = []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
- newVolumeMount(volumeNameMountpointDir, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false),
- newVolumeMount(volumeNamePluginDir, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false),
- newVolumeMount(volumeNameAppRootDir, appRootDir, corev1.MountPropagationBidirectional, false),
- newVolumeMount(volumeNameSysDir, volumePathSysDir, corev1.MountPropagationBidirectional, false),
- newVolumeMount(volumeNameDevDir, volumePathDevDir, corev1.MountPropagationHostToContainer, true),
- newVolumeMount(volumeNameRunUdevData, volumePathRunUdevData, corev1.MountPropagationBidirectional, true),
- newVolumeMount(volumeNameLegacyAppRootDir, legacyAppRootDir, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(kubeletPodsDirVolumeName, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(kubeletPluginsDirVolumeName, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true),
+ k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true),
}
return
@@ -132,8 +124,8 @@ func nodeDriverRegistrarContainer(image, pluginSocketDir string) corev1.Containe
},
Env: []corev1.EnvVar{kubeNodeNameEnvVar},
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
- newVolumeMount(volumeNameRegistrationDir, "/registration", corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(registrationDirVolumeName, "/registration", corev1.MountPropagationNone, false),
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
TerminationMessagePath: "/var/log/driver-registrar-termination-log",
@@ -195,13 +187,13 @@ func livenessProbeContainer(image string) corev1.Container {
Name: "liveness-probe",
Image: image,
Args: []string{
- fmt.Sprintf("--csi-address=%v%v", socketDir, socketFile),
+ fmt.Sprintf("--csi-address=%v%v", csiDirVolumePath, socketFile),
fmt.Sprintf("--health-port=%v", healthZContainerPort),
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
TerminationMessagePath: "/var/log/driver-liveness-termination-log",
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
},
}
}
diff --git a/pkg/admin/installer/deployment.go b/pkg/admin/installer/deployment.go
index b6d74d44..429993a5 100644
--- a/pkg/admin/installer/deployment.go
+++ b/pkg/admin/installer/deployment.go
@@ -23,6 +23,7 @@ import (
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/k8s"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -96,8 +97,8 @@ func (t deploymentTask) doCreateDeployment(ctx context.Context, args *Args, lega
podSpec := corev1.PodSpec{
ServiceAccountName: consts.Identity,
Volumes: []corev1.Volume{
- newHostPathVolume(
- volumeNameSocketDir,
+ k8s.NewHostPathVolume(
+ csiDirVolumeName,
newPluginsSocketDir(kubeletDirPath, fmt.Sprintf("%s-controller", consts.ControllerServerName)),
),
},
@@ -116,7 +117,7 @@ func (t deploymentTask) doCreateDeployment(ctx context.Context, args *Args, lega
},
Env: []corev1.EnvVar{csiEndpointEnvVar},
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
TerminationMessagePath: "/var/log/controller-provisioner-termination-log",
@@ -148,7 +149,7 @@ func (t deploymentTask) doCreateDeployment(ctx context.Context, args *Args, lega
},
Env: []corev1.EnvVar{csiEndpointEnvVar},
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
TerminationMessagePath: "/var/log/controller-csi-resizer-termination-log",
@@ -173,7 +174,7 @@ func (t deploymentTask) doCreateDeployment(ctx context.Context, args *Args, lega
},
Env: []corev1.EnvVar{kubeNodeNameEnvVar, csiEndpointEnvVar},
VolumeMounts: []corev1.VolumeMount{
- newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false),
},
},
},
diff --git a/pkg/admin/installer/psp.go b/pkg/admin/installer/psp.go
index 9efa3e77..bbaf984f 100644
--- a/pkg/admin/installer/psp.go
+++ b/pkg/admin/installer/psp.go
@@ -153,10 +153,10 @@ func (t pspTask) createPodSecurityPolicy(ctx context.Context, args *Args) (err e
Volumes: []policy.FSType{policy.HostPath},
AllowedHostPaths: []policy.AllowedHostPath{
{PathPrefix: "/proc", ReadOnly: true},
- {PathPrefix: volumePathSysDir},
+ {PathPrefix: consts.SysDirVolumePath},
{PathPrefix: consts.UdevDataDir, ReadOnly: true},
{PathPrefix: consts.AppRootDir},
- {PathPrefix: socketDir},
+ {PathPrefix: csiDirVolumePath},
{PathPrefix: kubeletDirPath},
},
SELinux: policy.SELinuxStrategyOptions{
diff --git a/pkg/admin/installer/utils.go b/pkg/admin/installer/utils.go
index c37ecaab..8bc82d7c 100644
--- a/pkg/admin/installer/utils.go
+++ b/pkg/admin/installer/utils.go
@@ -24,38 +24,13 @@ import (
"strings"
"github.com/minio/directpv/pkg/k8s"
- corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)
-func newHostPathVolume(name, path string) corev1.Volume {
- hostPathType := corev1.HostPathDirectoryOrCreate
- volumeSource := corev1.VolumeSource{
- HostPath: &corev1.HostPathVolumeSource{
- Path: path,
- Type: &hostPathType,
- },
- }
-
- return corev1.Volume{
- Name: name,
- VolumeSource: volumeSource,
- }
-}
-
func newPluginsSocketDir(kubeletDir, name string) string {
return path.Join(kubeletDir, "plugins", k8s.SanitizeResourceName(name))
}
-func newVolumeMount(name, path string, mountPropogation corev1.MountPropagationMode, readOnly bool) corev1.VolumeMount {
- return corev1.VolumeMount{
- Name: name,
- ReadOnly: readOnly,
- MountPath: path,
- MountPropagation: &mountPropogation,
- }
-}
-
func getRandSuffix() string {
b := make([]byte, 5)
if _, err := rand.Read(b); err != nil {
diff --git a/pkg/admin/repair.go b/pkg/admin/repair.go
new file mode 100644
index 00000000..94c47e20
--- /dev/null
+++ b/pkg/admin/repair.go
@@ -0,0 +1,237 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2024 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package admin
+
+import (
+ "context"
+ "fmt"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/k8s"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+var (
+ ttlSecondsAfterFinished = int32(5 * 60) // 5 Minutes
+ backOffLimit = int32(1)
+
+ repairJobVolumes = []corev1.Volume{
+ k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath),
+ k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath),
+ k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath),
+ k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath),
+ k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath),
+ }
+
+ repairJobVolumeMounts = []corev1.VolumeMount{
+ k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false),
+ k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true),
+ k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true),
+ }
+)
+
+// RepairArgs represents the arguments to repair a drive
+type RepairArgs struct {
+ DriveIDs []directpvtypes.DriveID
+ DryRun bool
+ ForceFlag bool
+ DisablePrefetchFlag bool
+}
+
+// RepairResult represents result of repaired drive
+type RepairResult struct {
+ JobName string
+ DriveID directpvtypes.DriveID
+}
+
+type repairContainerParams struct {
+ containerImage string
+ imagePullSecrets []corev1.LocalObjectReference
+ tolerations []corev1.Toleration
+ annotations map[string]string
+ securityContext *corev1.SecurityContext
+}
+
+func (client *Client) getContainerParams(ctx context.Context) (params repairContainerParams, err error) {
+ daemonSet, err := client.Kube().AppsV1().DaemonSets(consts.AppName).Get(
+ ctx, consts.NodeServerName, metav1.GetOptions{},
+ )
+
+ if err != nil && !apierrors.IsNotFound(err) {
+ return params, err
+ }
+
+ if daemonSet == nil || daemonSet.UID == "" {
+ return params, fmt.Errorf("invalid daemonset found")
+ }
+
+ for _, container := range daemonSet.Spec.Template.Spec.Containers {
+ if container.Name == consts.NodeServerName {
+ params.containerImage = container.Image
+ params.securityContext = container.SecurityContext
+ break
+ }
+ }
+
+ params.imagePullSecrets = daemonSet.Spec.Template.Spec.ImagePullSecrets
+ params.tolerations = daemonSet.Spec.Template.Spec.Tolerations
+ params.annotations = daemonSet.Spec.Template.Annotations
+
+ return
+}
+
+// Repair repairs added drives
+func (client *Client) Repair(ctx context.Context, args RepairArgs, log LogFunc) (results []RepairResult, err error) {
+ if len(args.DriveIDs) == 0 {
+ return
+ }
+
+ if log == nil {
+ log = nullLogger
+ }
+
+ ctx, cancelFunc := context.WithCancel(ctx)
+ defer cancelFunc()
+
+ params, err := client.getContainerParams(ctx)
+ if err != nil {
+ log(
+ LogMessage{
+ Type: ErrorLogType,
+ Err: err,
+ Message: "unable to get container parameters from daemonset for drive repair",
+ Values: map[string]any{"namespace": consts.AppName, "daemonSet": consts.NodeServerName},
+ FormattedMessage: fmt.Sprintf("unable to get container parameters from daemonset; %v\n", err),
+ },
+ )
+ return nil, err
+ }
+
+ resultCh := client.NewDriveLister().
+ DriveIDSelector(args.DriveIDs).
+ IgnoreNotFound(true).
+ List(ctx)
+ for result := range resultCh {
+ if result.Err != nil {
+ return results, result.Err
+ }
+
+ jobName := "repair-" + result.Drive.Name
+ if _, err := client.Kube().BatchV1().Jobs(consts.AppName).Get(ctx, jobName, metav1.GetOptions{}); err != nil {
+ if !apierrors.IsNotFound(err) {
+ log(
+ LogMessage{
+ Type: ErrorLogType,
+ Err: err,
+ Message: "unable to get repair job",
+ Values: map[string]any{"jobName": jobName},
+ FormattedMessage: fmt.Sprintf("unable to get repair job %v; %v\n", jobName, err),
+ },
+ )
+ continue
+ }
+ } else {
+ log(
+ LogMessage{
+ Type: ErrorLogType,
+ Err: err,
+ Message: "job already exists",
+ Values: map[string]any{"jobName": jobName},
+ FormattedMessage: fmt.Sprintf("job %v already exists\n", jobName),
+ },
+ )
+ continue
+ }
+
+ nodeID := string(result.Drive.GetNodeID())
+
+ containerArgs := []string{"/directpv", "repair", result.Drive.Name, "--kube-node-name=" + nodeID}
+ if args.ForceFlag {
+ containerArgs = append(containerArgs, "--force")
+ }
+ if args.DisablePrefetchFlag {
+ containerArgs = append(containerArgs, "--disable-prefetch")
+ }
+ if args.DryRun {
+ containerArgs = append(containerArgs, "--dry-run")
+ }
+
+ job := batchv1.Job{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: jobName,
+ Namespace: consts.AppName,
+ Annotations: params.annotations,
+ },
+ Spec: batchv1.JobSpec{
+ BackoffLimit: &backOffLimit,
+ TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
+ Template: corev1.PodTemplateSpec{
+ Spec: corev1.PodSpec{
+ NodeSelector: map[string]string{string(directpvtypes.NodeLabelKey): nodeID},
+ ServiceAccountName: consts.Identity,
+ Tolerations: params.tolerations,
+ ImagePullSecrets: params.imagePullSecrets,
+ Volumes: repairJobVolumes,
+ Containers: []corev1.Container{
+ {
+ Name: jobName,
+ Image: params.containerImage,
+ Command: containerArgs,
+ SecurityContext: params.securityContext,
+ VolumeMounts: repairJobVolumeMounts,
+ TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
+ TerminationMessagePath: "/var/log/repair-termination-log",
+ },
+ },
+ RestartPolicy: corev1.RestartPolicyNever,
+ },
+ },
+ },
+ }
+
+ if _, err := client.Kube().BatchV1().Jobs(consts.AppName).Create(ctx, &job, metav1.CreateOptions{}); err != nil {
+ log(
+ LogMessage{
+ Type: ErrorLogType,
+ Err: err,
+ Message: "unable to create repair job",
+ Values: map[string]any{"jobName": jobName, "driveName": result.Drive.Name},
+ FormattedMessage: fmt.Sprintf("unable to create repair job %v; %v\n", jobName, err),
+ },
+ )
+ } else {
+ log(
+ LogMessage{
+ Type: InfoLogType,
+ Message: "repair job created",
+ Values: map[string]any{"jobName": jobName, "driveName": result.Drive.Name},
+ FormattedMessage: fmt.Sprintf("repair job %v for drive %v is created\n", jobName, result.Drive.Name),
+ },
+ )
+
+ results = append(results, RepairResult{JobName: jobName, DriveID: result.Drive.GetDriveID()})
+ }
+ }
+
+ return
+}
diff --git a/pkg/apis/directpv.min.io/types/types.go b/pkg/apis/directpv.min.io/types/types.go
index 5c1d06f1..9f031b78 100644
--- a/pkg/apis/directpv.min.io/types/types.go
+++ b/pkg/apis/directpv.min.io/types/types.go
@@ -48,6 +48,9 @@ const (
// DriveStatusMoving denotes drive is moving volumes.
DriveStatusMoving DriveStatus = "Moving"
+
+ // DriveStatusRepairing denotes drive is repairing it's filesystem.
+ DriveStatusRepairing DriveStatus = "Repairing"
)
// ToDriveStatus converts string value to DriveStatus.
diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go
index 9bee57e0..b90cb0b4 100644
--- a/pkg/consts/consts.go
+++ b/pkg/consts/consts.go
@@ -97,4 +97,22 @@ const (
// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"
+
+ // LegacyAppRootDir is legacy application root directory.
+ LegacyAppRootDir = "/var/lib/direct-csi"
+
+ AppRootDirVolumeName = AppName + "-common-root"
+ AppRootDirVolumePath = AppRootDir + "/"
+
+ LegacyAppRootDirVolumeName = "direct-csi-common-root"
+ LegacyAppRootDirVolumePath = LegacyAppRootDir + "/"
+
+ SysDirVolumeName = "sysfs"
+ SysDirVolumePath = "/sys"
+
+ DevDirVolumeName = "devfs"
+ DevDirVolumePath = "/dev"
+
+ RunUdevDataVolumeName = "run-udev-data-dir"
+ RunUdevDataVolumePath = UdevDataDir
)
diff --git a/pkg/consts/consts.go.in b/pkg/consts/consts.go.in
index bf50b159..b4fa6bf5 100644
--- a/pkg/consts/consts.go.in
+++ b/pkg/consts/consts.go.in
@@ -95,4 +95,22 @@ const (
// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"
+
+ // LegacyAppRootDir is legacy application root directory.
+ LegacyAppRootDir = "/var/lib/direct-csi"
+
+ AppRootDirVolumeName = AppName + "-common-root"
+ AppRootDirVolumePath = AppRootDir + "/"
+
+ LegacyAppRootDirVolumeName = "direct-csi-common-root"
+ LegacyAppRootDirVolumePath = LegacyAppRootDir + "/"
+
+ SysDirVolumeName = "sysfs"
+ SysDirVolumePath = "/sys"
+
+ DevDirVolumeName = "devfs"
+ DevDirVolumePath = "/dev"
+
+ RunUdevDataVolumeName = "run-udev-data-dir"
+ RunUdevDataVolumePath = UdevDataDir
)
diff --git a/pkg/drive/event.go b/pkg/drive/event.go
index 6c106346..7936e5e1 100644
--- a/pkg/drive/event.go
+++ b/pkg/drive/event.go
@@ -183,7 +183,7 @@ func newDriveEventHandler(nodeID directpvtypes.NodeID) *driveEventHandler {
if err = os.Remove(driveMountPoint); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
- driveMountPoint = path.Join("/var/lib/direct-csi/mnt", fsuuid)
+ driveMountPoint = path.Join(consts.LegacyAppRootDir, "mnt", fsuuid)
if err = os.Remove(driveMountPoint); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
@@ -221,7 +221,7 @@ func (handler *driveEventHandler) unmountDrive(drive *types.Drive, skipDriveMoun
devices, found := mountPointMap[driveMountPoint]
if !found {
// Check for legacy mount for backward compatibility.
- driveMountPoint = path.Join("/var/lib/direct-csi/mnt", drive.Status.FSUUID)
+ driveMountPoint = path.Join(consts.LegacyAppRootDir, "mnt", drive.Status.FSUUID)
if devices, found = mountPointMap[driveMountPoint]; !found {
return nil // Device already umounted
}
diff --git a/pkg/drive/repair.go b/pkg/drive/repair.go
new file mode 100644
index 00000000..5c23bf13
--- /dev/null
+++ b/pkg/drive/repair.go
@@ -0,0 +1,209 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2024 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package drive
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "path"
+ "sync"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/client"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/sys"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/minio/directpv/pkg/utils"
+ "github.com/minio/directpv/pkg/xfs"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/util/retry"
+ "k8s.io/klog/v2"
+)
+
+type logWriter struct {
+ buffer []byte
+ closed bool
+ mutex sync.Mutex
+}
+
+func (w *logWriter) Write(data []byte) (n int, err error) {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ if w.closed {
+ klog.Info(string(data))
+ return len(data), io.EOF
+ }
+
+ w.buffer = append(w.buffer, data...)
+ for {
+ index := bytes.IndexRune(w.buffer, '\n')
+ if index < 0 {
+ break
+ }
+
+ klog.Info(string(w.buffer[:index+1]))
+ w.buffer = w.buffer[index+1:]
+ }
+
+ return len(data), nil
+}
+
+func (w *logWriter) Close() (err error) {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ klog.Info(string(w.buffer))
+ w.buffer = nil
+ w.closed = true
+ return nil
+}
+
+func repair(ctx context.Context, drive *types.Drive, force, disablePrefetch, dryRun bool,
+ getDeviceByFSUUID func(fsuuid string) (string, error),
+ getMounts func() (deviceMap map[string]utils.StringSet, err error),
+ unmount func(mountPoint string) error,
+ repair func(ctx context.Context, device string, force, disablePrefetch, dryRun bool, output io.Writer) error,
+ mount func(device, target string) (err error),
+) error {
+ device, err := getDeviceByFSUUID(drive.Status.FSUUID)
+ if err != nil {
+ klog.ErrorS(
+ err,
+ "unable to find device by FSUUID; "+
+ "either device is removed or run command "+
+ "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+
+ "on the host to reload",
+ "FSUUID", drive.Status.FSUUID)
+ client.Eventf(
+ drive, client.EventTypeWarning, client.EventReasonStageVolume,
+ "unable to find device by FSUUID %v; "+
+ "either device is removed or run command "+
+ "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+
+ " on the host to reload", drive.Status.FSUUID)
+ return fmt.Errorf("unable to find device by FSUUID %v; %w", drive.Status.FSUUID, err)
+ }
+
+ target := types.GetDriveMountDir(drive.Status.FSUUID)
+ legacyTarget := path.Join(consts.LegacyAppRootDir, "mnt", drive.Status.FSUUID)
+
+ deviceMap, err := getMounts()
+ if err != nil {
+ return err
+ }
+
+ mountPoints := deviceMap[device]
+ getCount := func() int {
+ count := len(mountPoints)
+ if mountPoints.Exist(target) {
+ count--
+ }
+ if mountPoints.Exist(legacyTarget) {
+ count--
+ }
+ return count
+ }
+
+ switch len(mountPoints) {
+ case 0:
+ case 1:
+ if mountPoints.Exist(target) {
+ if err = unmount(target); err != nil {
+ return err
+ }
+ }
+ if mountPoints.Exist(legacyTarget) {
+ if err = unmount(legacyTarget); err != nil {
+ return err
+ }
+ }
+ case 2:
+ if mountPoints.Exist(target) && mountPoints.Exist(legacyTarget) {
+ if err = unmount(target); err != nil {
+ return err
+ }
+ if err = unmount(legacyTarget); err != nil {
+ return err
+ }
+ } else {
+ return fmt.Errorf("unable to run xfs repair; %v volume mounts still mounted", getCount())
+ }
+ default:
+ return fmt.Errorf("unable to run xfs repair; %v volume mounts still mounted", getCount())
+ }
+
+ logWriter := &logWriter{}
+ err = repair(ctx, device, force, disablePrefetch, dryRun, logWriter)
+ logWriter.Close()
+ if err != nil {
+ return err
+ }
+
+ merr := mount(device, target)
+ if merr != nil {
+ klog.ErrorS(err, "unable to mount the drive", "Source", device, "Target", target)
+ }
+
+ driveID := drive.GetDriveID()
+ updateFunc := func() error {
+ drive, err := client.DriveClient().Get(ctx, string(driveID), metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+
+ if merr != nil {
+ drive.SetMountErrorCondition(fmt.Sprintf("unable to mount; %v", err))
+ client.Eventf(drive,
+ client.EventTypeWarning,
+ client.EventReasonDriveMountError,
+ "unable to mount the drive; %v", err,
+ )
+ drive.Status.Status = directpvtypes.DriveStatusError
+ } else {
+ client.Eventf(
+ drive,
+ client.EventTypeNormal,
+ client.EventReasonDriveMounted,
+ "Drive mounted successfully to %s", target,
+ )
+ drive.Status.Status = directpvtypes.DriveStatusReady
+ }
+
+ _, err = client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()})
+ return err
+ }
+
+ return retry.RetryOnConflict(retry.DefaultRetry, updateFunc)
+}
+
+// Repair runs `xfs_repair` command on specified drive
+func Repair(ctx context.Context, drive *types.Drive, force, disablePrefetch, dryRun bool) error {
+ return repair(ctx, drive, force, disablePrefetch, dryRun,
+ sys.GetDeviceByFSUUID,
+ func() (deviceMap map[string]utils.StringSet, err error) {
+ _, deviceMap, _, _, err = sys.GetMounts(false)
+ return
+ },
+ func(mountPoint string) error {
+ return sys.Unmount(mountPoint, true, true, false)
+ },
+ xfs.Repair,
+ xfs.Mount,
+ )
+}
diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go
index d93fc874..1dd58c0d 100644
--- a/pkg/k8s/k8s.go
+++ b/pkg/k8s/k8s.go
@@ -322,3 +322,29 @@ func ParseTolerations(values []string) ([]corev1.Toleration, error) {
return tolerations, nil
}
+
+// NewHostPathVolume - creates volume for given name and host path.
+func NewHostPathVolume(name, path string) corev1.Volume {
+ hostPathType := corev1.HostPathDirectoryOrCreate
+ volumeSource := corev1.VolumeSource{
+ HostPath: &corev1.HostPathVolumeSource{
+ Path: path,
+ Type: &hostPathType,
+ },
+ }
+
+ return corev1.Volume{
+ Name: name,
+ VolumeSource: volumeSource,
+ }
+}
+
+// NewVolumeMount - creates volume mount for given name, path, mount propagation and read only flag.
+func NewVolumeMount(name, path string, mountPropogation corev1.MountPropagationMode, readOnly bool) corev1.VolumeMount {
+ return corev1.VolumeMount{
+ Name: name,
+ ReadOnly: readOnly,
+ MountPath: path,
+ MountPropagation: &mountPropogation,
+ }
+}
diff --git a/pkg/legacy/converter/sys.go b/pkg/legacy/converter/sys.go
index 84a51718..7103f043 100644
--- a/pkg/legacy/converter/sys.go
+++ b/pkg/legacy/converter/sys.go
@@ -16,12 +16,14 @@
package converter
+import "github.com/minio/directpv/pkg/consts"
+
const (
// HostDevRoot is "/dev" directory.
HostDevRoot = "/dev"
// DirectCSIDevRoot is "/var/lib/direct-csi/devices" directory.
- DirectCSIDevRoot = "/var/lib/direct-csi/devices"
+ DirectCSIDevRoot = consts.LegacyAppRootDir + "/devices"
// DirectCSIPartitionInfix is partition infix value.
DirectCSIPartitionInfix = "-part-"
diff --git a/pkg/xfs/repair.go b/pkg/xfs/repair.go
new file mode 100644
index 00000000..db83b1c7
--- /dev/null
+++ b/pkg/xfs/repair.go
@@ -0,0 +1,27 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2024 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package xfs
+
+import (
+ "context"
+ "io"
+)
+
+// Repair is a utility function to repair XFS on a device
+func Repair(ctx context.Context, device string, force, disablePrefetch, dryRun bool, output io.Writer) error {
+ return repair(ctx, device, force, disablePrefetch, dryRun, output)
+}
diff --git a/pkg/xfs/repair_linux.go b/pkg/xfs/repair_linux.go
new file mode 100644
index 00000000..51fd0276
--- /dev/null
+++ b/pkg/xfs/repair_linux.go
@@ -0,0 +1,48 @@
+//go:build linux
+
+// This file is part of MinIO DirectPV
+// Copyright (c) 2024 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package xfs
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "os/exec"
+)
+
+func repair(ctx context.Context, device string, force, disablePrefetch, dryRun bool, output io.Writer) error {
+ args := []string{device, "-v"}
+ if force {
+ args = append(args, "-L")
+ }
+ if disablePrefetch {
+ args = append(args, "-P")
+ }
+ if dryRun {
+ args = append(args, "-n")
+ }
+
+ cmd := exec.CommandContext(ctx, "xfs_repair", args...)
+ cmd.Stdout = output
+ cmd.Stderr = output
+ if err := cmd.Run(); err != nil {
+ return fmt.Errorf("unable to run xfs_repair on device %v; %w", device, err)
+ }
+
+ return nil
+}
diff --git a/pkg/xfs/repair_other.go b/pkg/xfs/repair_other.go
new file mode 100644
index 00000000..0d3887bc
--- /dev/null
+++ b/pkg/xfs/repair_other.go
@@ -0,0 +1,30 @@
+//go:build !linux
+
+// This file is part of MinIO DirectPV
+// Copyright (c) 2024 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package xfs
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "runtime"
+)
+
+func repair(_ context.Context, _ string, _, _, _ bool, _ io.Writer) error {
+ return fmt.Errorf("unsupported operating system %v", runtime.GOOS)
+}