diff --git a/.golangci.yml b/.golangci.yml index 8b0e532b..7d738763 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ linters-settings: - gofumpt: - lang-version: "1.22" +run: + go: "1.22" misspell: locale: US diff --git a/cmd/kubectl-directpv/repair.go b/cmd/kubectl-directpv/repair.go index 34639c4d..1f14230a 100644 --- a/cmd/kubectl-directpv/repair.go +++ b/cmd/kubectl-directpv/repair.go @@ -22,15 +22,9 @@ import ( "os" "strings" - directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/admin" "github.com/minio/directpv/pkg/consts" - "github.com/minio/directpv/pkg/k8s" - "github.com/minio/directpv/pkg/utils" "github.com/spf13/cobra" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var ( @@ -52,7 +46,7 @@ var repairCmd = &cobra.Command{ Run: func(c *cobra.Command, args []string) { driveIDArgs = args if err := validateRepairCmd(); err != nil { - utils.Eprintf(quietFlag, true, "%v\n", err) + eprintf(true, "%v\n", err) os.Exit(-1) } @@ -81,111 +75,18 @@ func validateRepairCmd() error { } func repairMain(ctx context.Context) { - var failed bool - - ctx, cancelFunc := context.WithCancel(ctx) - defer cancelFunc() - - containerImage, imagePullSecrets, tolerations, err := getContainerParams(ctx) + _, err := adminClient.Repair( + ctx, + admin.RepairArgs{ + DriveIDs: driveIDSelectors, + DryRun: dryRunFlag, + ForceFlag: forceFlag, + DisablePrefetchFlag: disablePrefetchFlag, + }, + logFunc, + ) if err != nil { - utils.Eprintf(quietFlag, true, "unable to container parameters from daemonset; %v\n", err) - os.Exit(1) - } - - resultCh := adminClient.NewDriveLister(). - DriveIDSelector(driveIDSelectors). - IgnoreNotFound(true). - List(ctx) - for result := range resultCh { - if result.Err != nil { - utils.Eprintf(quietFlag, true, "%v\n", result.Err) - os.Exit(1) - } - - jobName := "repair-" + result.Drive.Name - if _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppName).Get(ctx, jobName, metav1.GetOptions{}); err != nil { - if !apierrors.IsNotFound(err) { - utils.Eprintf(quietFlag, true, "unable to get repair job %v; %v\n", jobName, err) - failed = true - continue - } - } else { - utils.Eprintf(quietFlag, true, "job %v already exists\n", jobName) - continue - } - - nodeID := string(result.Drive.GetNodeID()) - - containerArgs := []string{"/directpv", "repair", result.Drive.Name, "--kube-node-name=" + nodeID} - if forceFlag { - containerArgs = append(containerArgs, "--force") - } - if disablePrefetchFlag { - containerArgs = append(containerArgs, "--disable-prefetch") - } - if dryRunFlag { - containerArgs = append(containerArgs, "--dry-run") - } - - backOffLimit := int32(1) - - volumes := []corev1.Volume{ - k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath), - k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath), - k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath), - k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath), - k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath), - } - - volumeMounts := []corev1.VolumeMount{ - k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false), - k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false), - k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false), - k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true), - k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true), - } - - privileged := true - - job := batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: jobName, - Namespace: consts.AppName, - }, - Spec: batchv1.JobSpec{ - BackoffLimit: &backOffLimit, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - NodeSelector: map[string]string{string(directpvtypes.NodeLabelKey): nodeID}, - ServiceAccountName: consts.Identity, - Tolerations: tolerations, - ImagePullSecrets: imagePullSecrets, - Volumes: volumes, - Containers: []corev1.Container{ - { - Name: jobName, - Image: containerImage, - Command: containerArgs, - SecurityContext: &corev1.SecurityContext{Privileged: &privileged}, - VolumeMounts: volumeMounts, - TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, - TerminationMessagePath: "/var/log/repair-termination-log", - }, - }, - RestartPolicy: corev1.RestartPolicyNever, - }, - }, - }, - } - - if _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppName).Create(ctx, &job, metav1.CreateOptions{}); err != nil { - utils.Eprintf(quietFlag, true, "unable to create repair job %v; %v\n", jobName, err) - } else { - utils.Eprintf(quietFlag, false, "repair job %v for drive %v is created\n", jobName, result.Drive.Name) - } - } - - if failed { + eprintf(!errors.Is(err, admin.ErrNoMatchingResourcesFound), "%v\n", err) os.Exit(1) } } diff --git a/cmd/kubectl-directpv/utils.go b/cmd/kubectl-directpv/utils.go index 6e66833f..355c678e 100644 --- a/cmd/kubectl-directpv/utils.go +++ b/cmd/kubectl-directpv/utils.go @@ -17,7 +17,6 @@ package main import ( - "context" "errors" "fmt" "os" @@ -27,12 +26,8 @@ import ( "github.com/jedib0t/go-pretty/v6/table" "github.com/minio/directpv/pkg/admin" "github.com/minio/directpv/pkg/consts" - "github.com/minio/directpv/pkg/k8s" "github.com/minio/directpv/pkg/utils" "github.com/mitchellh/go-homedir" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" ) @@ -133,27 +128,3 @@ func eprintf(isErr bool, format string, args ...any) { func logFunc(log admin.LogMessage) { eprintf(log.Type == admin.ErrorLogType, log.FormattedMessage) } - -func getContainerParams(ctx context.Context) (string, []corev1.LocalObjectReference, []corev1.Toleration, error) { - daemonSet, err := k8s.KubeClient().AppsV1().DaemonSets(consts.AppName).Get( - ctx, consts.NodeServerName, metav1.GetOptions{}, - ) - - if err != nil && !apierrors.IsNotFound(err) { - return "", nil, nil, err - } - - if daemonSet == nil || daemonSet.UID == "" { - return "", nil, nil, fmt.Errorf("invalid daemonset found") - } - - var containerImage string - for _, container := range daemonSet.Spec.Template.Spec.Containers { - if container.Name == consts.NodeServerName { - containerImage = container.Image - break - } - } - - return containerImage, daemonSet.Spec.Template.Spec.ImagePullSecrets, daemonSet.Spec.Template.Spec.Tolerations, nil -} diff --git a/pkg/admin/repair.go b/pkg/admin/repair.go new file mode 100644 index 00000000..deb98013 --- /dev/null +++ b/pkg/admin/repair.go @@ -0,0 +1,218 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2024 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package admin + +import ( + "context" + "fmt" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/k8s" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// RepairArgs represents the arguments to repair a drive +type RepairArgs struct { + DriveIDs []directpvtypes.DriveID + DryRun bool + ForceFlag bool + DisablePrefetchFlag bool +} + +// RepairResult represents result of repaired drive +type RepairResult struct { + JobName string + DriveID directpvtypes.DriveID +} + +func getContainerParams(ctx context.Context) (string, []corev1.LocalObjectReference, []corev1.Toleration, error) { + daemonSet, err := k8s.KubeClient().AppsV1().DaemonSets(consts.AppName).Get( + ctx, consts.NodeServerName, metav1.GetOptions{}, + ) + + if err != nil && !apierrors.IsNotFound(err) { + return "", nil, nil, err + } + + if daemonSet == nil || daemonSet.UID == "" { + return "", nil, nil, fmt.Errorf("invalid daemonset found") + } + + var containerImage string + for _, container := range daemonSet.Spec.Template.Spec.Containers { + if container.Name == consts.NodeServerName { + containerImage = container.Image + break + } + } + + return containerImage, daemonSet.Spec.Template.Spec.ImagePullSecrets, daemonSet.Spec.Template.Spec.Tolerations, nil +} + +// Repair repairs added drives +func (client *Client) Repair(ctx context.Context, args RepairArgs, log LogFunc) (results []RepairResult, err error) { + if log == nil { + log = nullLogger + } + + ctx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + containerImage, imagePullSecrets, tolerations, err := getContainerParams(ctx) + if err != nil { + log( + LogMessage{ + Type: ErrorLogType, + Err: err, + Message: "unable to get container parameters from daemonset for drive repair", + Values: map[string]any{"namespace": consts.AppName, "daemonSet": consts.NodeServerName}, + FormattedMessage: fmt.Sprintf("unable to get container parameters from daemonset; %v\n", err), + }, + ) + return nil, err + } + + resultCh := client.NewDriveLister(). + DriveIDSelector(args.DriveIDs). + IgnoreNotFound(true). + List(ctx) + for result := range resultCh { + if result.Err != nil { + return results, result.Err + } + + jobName := "repair-" + result.Drive.Name + if _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppName).Get(ctx, jobName, metav1.GetOptions{}); err != nil { + if !apierrors.IsNotFound(err) { + log( + LogMessage{ + Type: ErrorLogType, + Err: err, + Message: "unable to get repair job", + Values: map[string]any{"jobName": jobName}, + FormattedMessage: fmt.Sprintf("unable to get repair job %v; %v\n", jobName, err), + }, + ) + continue + } + } else { + log( + LogMessage{ + Type: ErrorLogType, + Err: err, + Message: "job already exists", + Values: map[string]any{"jobName": jobName}, + FormattedMessage: fmt.Sprintf("job %v already exists\n", jobName), + }, + ) + continue + } + + nodeID := string(result.Drive.GetNodeID()) + + containerArgs := []string{"/directpv", "repair", result.Drive.Name, "--kube-node-name=" + nodeID} + if args.ForceFlag { + containerArgs = append(containerArgs, "--force") + } + if args.DisablePrefetchFlag { + containerArgs = append(containerArgs, "--disable-prefetch") + } + if args.DryRun { + containerArgs = append(containerArgs, "--dry-run") + } + + backOffLimit := int32(1) + + volumes := []corev1.Volume{ + k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath), + k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath), + k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath), + k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath), + k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath), + } + + volumeMounts := []corev1.VolumeMount{ + k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true), + k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true), + } + + privileged := true + + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: consts.AppName, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: &backOffLimit, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + NodeSelector: map[string]string{string(directpvtypes.NodeLabelKey): nodeID}, + ServiceAccountName: consts.Identity, + Tolerations: tolerations, + ImagePullSecrets: imagePullSecrets, + Volumes: volumes, + Containers: []corev1.Container{ + { + Name: jobName, + Image: containerImage, + Command: containerArgs, + SecurityContext: &corev1.SecurityContext{Privileged: &privileged}, + VolumeMounts: volumeMounts, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, + TerminationMessagePath: "/var/log/repair-termination-log", + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } + + if _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppName).Create(ctx, &job, metav1.CreateOptions{}); err != nil { + log( + LogMessage{ + Type: ErrorLogType, + Err: err, + Message: "unable to create repair job", + Values: map[string]any{"jobName": jobName, "driveName": result.Drive.Name}, + FormattedMessage: fmt.Sprintf("unable to create repair job %v; %v\n", jobName, err), + }, + ) + } else { + log( + LogMessage{ + Type: InfoLogType, + Message: "repair job created", + Values: map[string]any{"jobName": jobName, "driveName": result.Drive.Name}, + FormattedMessage: fmt.Sprintf("repair job %v for drive %v is created\n", jobName, result.Drive.Name), + }, + ) + + results = append(results, RepairResult{JobName: jobName, DriveID: result.Drive.GetDriveID()}) + } + } + + return +}