Skip to content

Commit

Permalink
Merge branch 'master' into issue-10106
Browse files Browse the repository at this point in the history
  • Loading branch information
derekbit authored Jan 11, 2025
2 parents 3c3080b + c3f39a6 commit ae68f6e
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 69 deletions.
26 changes: 19 additions & 7 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,11 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
if err != nil {
return nil, err
}
defer func() {
if err != nil {
engineClientProxy.Close()
}
}()

// get storage class of the pvc binding with the volume
kubernetesStatus := &volume.Status.KubernetesStatus
Expand Down Expand Up @@ -830,11 +835,13 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
bc.creationRetryCounter.DeleteEntry(backup.Name)
return nil, fmt.Errorf("failed waiting for the engine %v to be running before enabling backup monitor", engine.Name)
err = fmt.Errorf("failed waiting for the engine %v to be running before enabling backup monitor", engine.Name)
return nil, err
}
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForEngineMessage, engine.Name)
return nil, fmt.Errorf("waiting for the engine %v to be running before enabling backup monitor", engine.Name)
err = fmt.Errorf("waiting for the engine %v to be running before enabling backup monitor", engine.Name)
return nil, err
}

snapshot, err := bc.ds.GetSnapshotRO(backup.Spec.SnapshotName)
Expand All @@ -850,7 +857,8 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(FailedToGetSnapshotMessage, backup.Spec.SnapshotName)
}
return nil, errors.Wrapf(err, "failed to get the snapshot %v before enabling backup monitor", backup.Spec.SnapshotName)
err = errors.Wrapf(err, "failed to get the snapshot %v before enabling backup monitor", backup.Spec.SnapshotName)
return nil, err
}
if snapshot != nil {
if !snapshot.Status.ReadyToUse {
Expand All @@ -860,23 +868,27 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
bc.creationRetryCounter.DeleteEntry(backup.Name)
return nil, fmt.Errorf("failed waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
err = fmt.Errorf("failed waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
return nil, err
}
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForSnapshotMessage, backup.Spec.SnapshotName)
return nil, fmt.Errorf("waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
err = fmt.Errorf("waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
return nil, err
}
}

clusterBackups, err := bc.ds.ListBackupsWithBackupVolumeName(backupTarget.Name, volume.Name)
if err != nil {
return nil, errors.Wrapf(err, "failed to list backups in the cluster")
err = errors.Wrapf(err, "failed to list backups in the cluster")
return nil, err
}
for _, b := range clusterBackups {
if b.Status.State == longhorn.BackupStateDeleting {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForBackupDeletionIsCompleteMessage, b.Name)
return nil, fmt.Errorf("waiting for the backup %v to be deleted before enabling backup monitor", b.Name)
err = fmt.Errorf("waiting for the backup %v to be deleted before enabling backup monitor", b.Name)
return nil, err
}
}

Expand Down
6 changes: 3 additions & 3 deletions controller/share_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,7 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager
string(secret.Data[types.CryptoPBKDF]))
}

manifest := c.createPodManifest(sm, annotations, tolerations, affinity, imagePullPolicy, nil, registrySecret,
manifest := c.createPodManifest(sm, volume.Spec.DataEngine, annotations, tolerations, affinity, imagePullPolicy, nil, registrySecret,
priorityClass, nodeSelector, fsType, mountOptions, cryptoKey, cryptoParams, nfsConfig)

storageNetwork, err := c.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
Expand Down Expand Up @@ -1411,13 +1411,13 @@ func (c *ShareManagerController) createLeaseManifest(sm *longhorn.ShareManager)
return lease
}

func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, annotations map[string]string, tolerations []corev1.Toleration,
func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, dataEngine longhorn.DataEngineType, annotations map[string]string, tolerations []corev1.Toleration,
affinity *corev1.Affinity, pullPolicy corev1.PullPolicy, resourceReq *corev1.ResourceRequirements, registrySecret, priorityClass string,
nodeSelector map[string]string, fsType string, mountOptions []string, cryptoKey string, cryptoParams *crypto.EncryptParams,
nfsConfig *nfsServerConfig) *corev1.Pod {

// command args for the share-manager
args := []string{"--debug", "daemon", "--volume", sm.Name}
args := []string{"--debug", "daemon", "--volume", sm.Name, "--data-engine", string(dataEngine)}

if len(fsType) > 0 {
args = append(args, "--fs", fsType)
Expand Down
47 changes: 34 additions & 13 deletions csi/crypto/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/longhorn/longhorn-manager/types"

lhns "github.com/longhorn/go-common-libs/ns"
lhtypes "github.com/longhorn/go-common-libs/types"
longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)

const (
mapperFilePathPrefix = "/dev/mapper"
mapperV2VolumeSuffix = "-encrypted"

CryptoKeyDefaultCipher = "aes-xts-plain64"
CryptoKeyDefaultHash = "sha256"
Expand Down Expand Up @@ -69,7 +73,13 @@ func (cp *EncryptParams) GetPBKDF() string {
}

// VolumeMapper returns the path for mapped encrypted device.
func VolumeMapper(volume string) string {
func VolumeMapper(volume, dataEngine string) string {
if types.IsDataEngineV2(longhorn.DataEngineType(dataEngine)) {
// v2 volume will use a dm device as default to control IO path when attaching.
// This dm device will be created with the same name as the volume name.
// The encrypted volume will be created with the volume name with "-encrypted" suffix to resolve the naming conflict.
return path.Join(mapperFilePathPrefix, getEncryptVolumeName(volume, dataEngine))
}
return path.Join(mapperFilePathPrefix, volume)
}

Expand All @@ -95,9 +105,10 @@ func EncryptVolume(devicePath, passphrase string, cryptoParams *EncryptParams) e
}

// OpenVolume opens volume so that it can be used by the client.
func OpenVolume(volume, devicePath, passphrase string) error {
if isOpen, _ := IsDeviceOpen(VolumeMapper(volume)); isOpen {
logrus.Infof("Device %s is already opened at %s", devicePath, VolumeMapper(volume))
// devicePath is the path of the volume on the host that will be opened for instance '/dev/longhorn/volume1'
func OpenVolume(volume, dataEngine, devicePath, passphrase string) error {
if isOpen, _ := IsDeviceOpen(VolumeMapper(volume, dataEngine)); isOpen {
logrus.Infof("Device %s is already opened at %s", devicePath, VolumeMapper(volume, dataEngine))
return nil
}

Expand All @@ -107,29 +118,38 @@ func OpenVolume(volume, devicePath, passphrase string) error {
return err
}

logrus.Infof("Opening device %s with LUKS on %s", devicePath, volume)
_, err = nsexec.LuksOpen(volume, devicePath, passphrase, lhtypes.LuksTimeout)
encryptVolumeName := getEncryptVolumeName(volume, dataEngine)
logrus.Infof("Opening device %s with LUKS on %s", devicePath, encryptVolumeName)
_, err = nsexec.LuksOpen(encryptVolumeName, devicePath, passphrase, lhtypes.LuksTimeout)
if err != nil {
logrus.WithError(err).Warnf("Failed to open LUKS device %s", devicePath)
logrus.WithError(err).Warnf("Failed to open LUKS device %s to %s", devicePath, encryptVolumeName)
}
return err
}

func getEncryptVolumeName(volume, dataEngine string) string {
if types.IsDataEngineV2(longhorn.DataEngineType(dataEngine)) {
return volume + mapperV2VolumeSuffix
}
return volume
}

// CloseVolume closes encrypted volume so it can be detached.
func CloseVolume(volume string) error {
func CloseVolume(volume, dataEngine string) error {
namespaces := []lhtypes.Namespace{lhtypes.NamespaceMnt, lhtypes.NamespaceIpc}
nsexec, err := lhns.NewNamespaceExecutor(lhtypes.ProcessNone, lhtypes.HostProcDirectory, namespaces)
if err != nil {
return err
}

logrus.Infof("Closing LUKS device %s", volume)
_, err = nsexec.LuksClose(volume, lhtypes.LuksTimeout)
encryptVolumeName := getEncryptVolumeName(volume, dataEngine)
logrus.Infof("Closing LUKS device %s", encryptVolumeName)
_, err = nsexec.LuksClose(encryptVolumeName, lhtypes.LuksTimeout)
return err
}

func ResizeEncryptoDevice(volume, passphrase string) error {
if isOpen, err := IsDeviceOpen(VolumeMapper(volume)); err != nil {
func ResizeEncryptoDevice(volume, dataEngine, passphrase string) error {
if isOpen, err := IsDeviceOpen(VolumeMapper(volume, dataEngine)); err != nil {
return err
} else if !isOpen {
return fmt.Errorf("volume %v encrypto device is closed for resizing", volume)
Expand All @@ -141,7 +161,8 @@ func ResizeEncryptoDevice(volume, passphrase string) error {
return err
}

_, err = nsexec.LuksResize(volume, passphrase, lhtypes.LuksTimeout)
encryptVolumeName := getEncryptVolumeName(volume, dataEngine)
_, err = nsexec.LuksResize(encryptVolumeName, passphrase, lhtypes.LuksTimeout)
return err
}

Expand Down
39 changes: 21 additions & 18 deletions csi/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Errorf(codes.Internal, "failed to evaluate device filesystem %v format: %v", devicePath, err)
}

log.Infof("Volume %v device %v contains filesystem of format %v", volumeID, devicePath, diskFormat)
dataEngine := volume.DataEngine
log.Infof("Volume %v (%v) device %v contains filesystem of format %v", volumeID, dataEngine, devicePath, diskFormat)

if volume.Encrypted {
secrets := req.GetSecrets()
Expand All @@ -515,7 +516,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
}

cryptoDevice := crypto.VolumeMapper(volumeID)
cryptoDevice := crypto.VolumeMapper(volumeID, dataEngine)
log.Infof("Volume %s requires crypto device %s", volumeID, cryptoDevice)

// check if the crypto device is open at the null path.
Expand All @@ -525,12 +526,12 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Errorf(codes.Internal, "failed to check if the crypto device %s for volume %s is mapped to the null path: %v", cryptoDevice, volumeID, err.Error())
} else if mappedToNullPath {
log.Warnf("Closing active crypto device %s for volume %s since the volume is not closed properly before", cryptoDevice, volumeID)
if err := crypto.CloseVolume(volumeID); err != nil {
if err := crypto.CloseVolume(volumeID, dataEngine); err != nil {
return nil, status.Errorf(codes.Internal, "failed to close active crypto device %s for volume %s: %v ", cryptoDevice, volumeID, err.Error())
}
}

if err := crypto.OpenVolume(volumeID, devicePath, passphrase); err != nil {
if err := crypto.OpenVolume(volumeID, dataEngine, devicePath, passphrase); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down Expand Up @@ -628,19 +629,20 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
// optionally try to retrieve the volume and check if it's an RWX volume
// if it is we let the share-manager clean up the crypto device
volume, _ := ns.apiClient.Volume.ById(volumeID)
if volume == nil || types.IsDataEngineV1(longhorn.DataEngineType(volume.DataEngine)) {
// Currently, only "RWO v1 volumes" and "block device with v1 volume.Migratable is true" supports encryption.
sharedAccess := requiresSharedAccess(volume, nil)
cleanupCryptoDevice := !sharedAccess || (sharedAccess && volume.Migratable)
if cleanupCryptoDevice {
cryptoDevice := crypto.VolumeMapper(volumeID)
if isOpen, err := crypto.IsDeviceOpen(cryptoDevice); err != nil {
dataEngine := string(longhorn.DataEngineTypeV1)
if volume != nil {
dataEngine = volume.DataEngine
}
sharedAccess := requiresSharedAccess(volume, nil)
cleanupCryptoDevice := !sharedAccess || (sharedAccess && volume.Migratable)
if cleanupCryptoDevice {
cryptoDevice := crypto.VolumeMapper(volumeID, dataEngine)
if isOpen, err := crypto.IsDeviceOpen(cryptoDevice); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if isOpen {
log.Infof("Volume %s closing active crypto device %s", volumeID, cryptoDevice)
if err := crypto.CloseVolume(volumeID, dataEngine); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if isOpen {
log.Infof("Volume %s closing active crypto device %s", volumeID, cryptoDevice)
if err := crypto.CloseVolume(volumeID); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
}
}
Expand Down Expand Up @@ -820,14 +822,15 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
return nil, fmt.Errorf("unknown filesystem type for volume %v node expansion", volumeID)
}

dataEngine := volume.DataEngine
devicePath, err = func() (string, error) {
if !volume.Encrypted {
return devicePath, nil
}
if diskFormat != "crypto_LUKS" {
return "", status.Errorf(codes.InvalidArgument, "unsupported disk encryption format %v", diskFormat)
}
devicePath = crypto.VolumeMapper(volumeID)
devicePath = crypto.VolumeMapper(volumeID, dataEngine)

// Need to enable feature gate in v1.25:
// https://github.com/kubernetes/enhancements/issues/3107
Expand All @@ -847,7 +850,7 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
}

// blindly resize the encrypto device
if err := crypto.ResizeEncryptoDevice(volumeID, passphrase); err != nil {
if err := crypto.ResizeEncryptoDevice(volumeID, dataEngine, passphrase); err != nil {
return "", status.Errorf(codes.InvalidArgument, "failed to resize crypto device %v for volume %v node expansion: %v", devicePath, volumeID, err)
}

Expand Down
29 changes: 4 additions & 25 deletions upgrade/v17xto180/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,8 @@ func upgradeBackups(namespace string, lhClient *lhclientset.Clientset, resourceM
}

for _, b := range backupMap {
backupTargetName := types.DefaultBackupTargetName
vol, err := lhClient.LonghornV1beta2().Volumes(namespace).Get(context.TODO(), b.Status.VolumeName, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get volume %v of backup %v", b.Status.VolumeName, b.Name)
}
} else {
if vol.Spec.BackupTargetName != "" {
backupTargetName = vol.Spec.BackupTargetName
}
}
b.Labels = addLabel(b.Labels, types.LonghornLabelBackupTarget, backupTargetName)
// all backups from v1.7.x should only have the one backup target, default backup target `default`.
b.Labels = addLabel(b.Labels, types.LonghornLabelBackupTarget, types.DefaultBackupTargetName)
}

return nil
Expand Down Expand Up @@ -281,19 +271,8 @@ func upgradeBackupStatus(namespace string, lhClient *lhclientset.Clientset, reso
}

for _, b := range backupMap {
backupTargetName := types.DefaultBackupTargetName
vol, err := lhClient.LonghornV1beta2().Volumes(namespace).Get(context.TODO(), b.Status.VolumeName, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get volume %v of backup %v", b.Status.VolumeName, b.Name)
}
} else {
if vol.Spec.BackupTargetName != "" {
backupTargetName = vol.Spec.BackupTargetName
}
}

b.Status.BackupTargetName = backupTargetName
// all backups from v1.7.x should only have the one backup target, default backup target `default`.
b.Status.BackupTargetName = types.DefaultBackupTargetName
}
return nil
}
Expand Down
3 changes: 0 additions & 3 deletions webhook/resources/volume/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ func (v *volumeValidator) Create(request *admission.Request, newObj runtime.Obje

// TODO: remove this check when we support the following features for SPDK volumes
if types.IsDataEngineV2(volume.Spec.DataEngine) {
if volume.Spec.Encrypted {
return werror.NewInvalidError("encrypted volume is not supported for data engine v2", "")
}
if types.IsDataFromVolume(volume.Spec.DataSource) {
return werror.NewInvalidError("clone is not supported for data engine v2", "")
}
Expand Down

0 comments on commit ae68f6e

Please sign in to comment.