Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: include trashed parent images while calculating the clone depth #4029

Draft
wants to merge 8 commits into
base: devel
Choose a base branch
from
26 changes: 18 additions & 8 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
@@ -559,7 +559,9 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
rbdVol.Monitors,
rbdVol.RbdImageName,
cr)
if err != nil {
if errors.Is(err, ErrFlattenInProgress) {
return status.Error(codes.Aborted, err.Error())
} else if err != nil {
return status.Error(codes.Internal, err.Error())
}

@@ -995,7 +997,7 @@ func cleanupRBDImage(ctx context.Context,
if inUse {
log.ErrorLog(ctx, "rbd %s is still being used", rbdVol)

return nil, status.Errorf(codes.Internal, "rbd %s is still being used", rbdVol.RbdImageName)
return nil, status.Errorf(codes.FailedPrecondition, "rbd %s is still being used", rbdVol.RbdImageName)
}

// delete the temporary rbd image created as part of volume clone during
@@ -1017,9 +1019,10 @@ func cleanupRBDImage(ctx context.Context,
}
}

// Deleting rbd image
// Deleting rbd image, it isn't a failure if the image was deleted already.
log.DebugLog(ctx, "deleting image %s", rbdVol.RbdImageName)
if err = rbdVol.deleteImage(ctx); err != nil {
err = rbdVol.deleteImage(ctx)
if err != nil && !errors.Is(err, librbd.ErrNotFound) {
log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v",
rbdVol, err)

@@ -1166,13 +1169,18 @@ func (cs *ControllerServer) CreateSnapshot(
}
}()

readyToUse := true

vol, err := cs.doSnapshotClone(ctx, rbdVol, rbdSnap, cr)
if err != nil {
if errors.Is(err, ErrFlattenInProgress) {
readyToUse = false
} else if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Update the metadata on snapshot not on the original image
rbdVol.RbdImageName = rbdSnap.RbdSnapName
rbdVol.ImageID = vol.ImageID
rbdVol.ClusterName = cs.ClusterName

defer func() {
@@ -1203,7 +1211,7 @@ func (cs *ControllerServer) CreateSnapshot(
SnapshotId: vol.VolID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: vol.CreatedAt,
ReadyToUse: true,
ReadyToUse: readyToUse,
},
}, nil
}
@@ -1234,10 +1242,12 @@ func cloneFromSnapshot(
return nil, status.Error(codes.Internal, err.Error())
}

readyToUse := true

err = vol.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if errors.Is(err, ErrFlattenInProgress) {
// if flattening is in progress, return error and do not cleanup
return nil, status.Errorf(codes.Internal, err.Error())
readyToUse = false
} else if err != nil {
uErr := undoSnapshotCloning(ctx, rbdVol, rbdSnap, vol, cr)
if uErr != nil {
@@ -1263,7 +1273,7 @@ func cloneFromSnapshot(
SnapshotId: rbdSnap.VolID,
SourceVolumeId: rbdSnap.SourceVolumeID,
CreationTime: rbdSnap.CreatedAt,
ReadyToUse: true,
ReadyToUse: readyToUse,
},
}, nil
}
2 changes: 1 addition & 1 deletion internal/rbd/nodeserver.go
Original file line number Diff line number Diff line change
@@ -595,7 +595,7 @@ func flattenImageBeforeMapping(
if err != nil {
return err
}
depth, err = volOptions.getCloneDepth(ctx)
depth, err = volOptions.getCloneDepth(rbdHardMaxCloneDepth + 1)
if err != nil {
return err
}
131 changes: 94 additions & 37 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
@@ -114,6 +114,8 @@ type rbdImage struct {
NamePrefix string
// ParentName represents the parent image name of the image.
ParentName string
// ParentID represents the parent image ID of the image.
ParentID string
// Parent Pool is the pool that contains the parent image.
ParentPool string
// Cluster name
@@ -505,7 +507,14 @@ func (ri *rbdImage) open() (*librbd.Image, error) {
return nil, err
}

image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot)
var image *librbd.Image

// try to open by id, that works for images in trash too
if ri.ImageID != "" {
image, err = librbd.OpenImageById(ri.ioctx, ri.ImageID, librbd.NoSnapshot)
} else {
image, err = librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot)
}
if err != nil {
if errors.Is(err, librbd.ErrNotFound) {
err = util.JoinErrors(ErrImageNotFound, err)
@@ -697,47 +706,80 @@ func (ri *rbdImage) trashRemoveImage(ctx context.Context) error {
return nil
}

func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) {
var depth uint
vol := rbdVolume{}
// getCloneDepth walks the parents of the image and returns the number of
// images in the chain. The `maxDepth` argument to the function is used to
// specify a limit of the depth to check. When `maxDepth` depth is reached, no
// further traversing of the depth is required.
//
nixpanic marked this conversation as resolved.
Show resolved Hide resolved
// This function re-uses the ioctx of the image to open all images in the
// chain. There is no need to open new ioctx's for every image.
func (ri *rbdImage) getCloneDepth(maxDepth uint) (uint, error) {
var (
depth uint
info *librbd.ParentInfo
)

vol.Pool = ri.Pool
vol.Monitors = ri.Monitors
vol.RbdImageName = ri.RbdImageName
vol.RadosNamespace = ri.RadosNamespace
vol.conn = ri.conn.Copy()
image, err := ri.open()
if err != nil {
return 0, fmt.Errorf("failed to open image %s: %w", ri, err)
}

// get the librbd.ImageSpec of the parent
info, err = image.GetParent()

// Close this image, it is not used anymore. Using defer to close it
// and replacing the image with an other image can result in resource
// leaks according to golangci-lint.
image.Close()

for {
if vol.RbdImageName == "" {
return depth, nil
if errors.Is(err, librbd.ErrNotFound) {
// image does not have a parent
break
} else if err != nil {
return 0, fmt.Errorf("failed to get parent of image %s at depth %d: %w", ri, depth, err)
}
err := vol.openIoctx()
if err != nil {
return depth, err

// if the parent is in trash, return maxDepth to trigger
// flattening
if info.Image.Trash {
return maxDepth, nil
}

err = vol.getImageInfo()
// FIXME: create and destroy the vol inside the loop.
// see https://github.com/ceph/ceph-csi/pull/1838#discussion_r598530807
vol.ioctx.Destroy()
vol.ioctx = nil
if err != nil {
// if the parent image is moved to trash the name will be present
// in rbd image info but the image will be in trash, in that case
// return the found depth
if errors.Is(err, ErrImageNotFound) {
return depth, nil
}
log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err)
// if there is a parent, count it to the depth
depth++

return depth, err
// if maxDepth (usually hard-limit) is reached, further
// traversing parents is not needed, some action (flattening)
// will be triggered regardless of deeper depth
if depth == maxDepth {
break
}
if vol.ParentName != "" {
depth++

// open the parent image, so that the for-loop can continue
// with checking for the parent of the parent
image, err = librbd.OpenImageByIdReadOnly(ri.ioctx, info.Image.ImageID, librbd.NoSnapshot)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
// parent image does not exist, no parent after all
break
} else if err != nil {
imageSpec := info.Image.ImageID
if info.Snap.SnapName != librbd.NoSnapshot {
imageSpec += "@" + info.Snap.SnapName
}

return 0, fmt.Errorf("failed to open parent image ID %q, at depth %d: %w", imageSpec, depth, err)
}
vol.RbdImageName = vol.ParentName
vol.Pool = vol.ParentPool

info, err = image.GetParent()
// info and err checking is done in the top of the for loop

// Using defer in a for loop seems to be problematic. Just
// always close the image.
image.Close()
}

return depth, nil
}

type trashSnapInfo struct {
@@ -803,7 +845,7 @@ func (ri *rbdImage) flattenRbdImage(

// skip clone depth check if request is for force flatten
if !forceFlatten {
depth, err = ri.getCloneDepth(ctx)
depth, err = ri.getCloneDepth(hardlimit)
if err != nil {
return err
}
@@ -830,7 +872,9 @@ func (ri *rbdImage) flattenRbdImage(
_, err = ta.AddFlatten(admin.NewImageSpec(ri.Pool, ri.RadosNamespace, ri.RbdImageName))
rbdCephMgrSupported := isCephMgrSupported(ctx, ri.ClusterID, err)
if rbdCephMgrSupported {
if err != nil {
// it is not possible to flatten an image that is in trash (ErrNotFound)
switch {
case err != nil && !errors.Is(err, rados.ErrNotFound):
// discard flattening error if the image does not have any parent
rbdFlattenNoParent := fmt.Sprintf("Image %s/%s does not have a parent", ri.Pool, ri.RbdImageName)
if strings.Contains(err.Error(), rbdFlattenNoParent) {
@@ -839,11 +883,11 @@ func (ri *rbdImage) flattenRbdImage(
log.ErrorLog(ctx, "failed to add task flatten for %s : %v", ri, err)

return err
}
if forceFlatten || depth >= hardlimit {
case forceFlatten || depth >= hardlimit:
return fmt.Errorf("%w: flatten is in progress for image %s", ErrFlattenInProgress, ri.RbdImageName)
default:
log.DebugLog(ctx, "successfully added task to flatten image %q", ri)
}
log.DebugLog(ctx, "successfully added task to flatten image %q", ri)
}
if !rbdCephMgrSupported {
log.ErrorLog(
@@ -875,6 +919,9 @@ func (ri *rbdImage) getParentName() (string, error) {
return "", err
}

ri.ParentName = parentInfo.Image.ImageName
ri.ParentID = parentInfo.Image.ImageID

return parentInfo.Image.ImageName, nil
}

@@ -1588,6 +1635,11 @@ func (ri *rbdImage) getImageInfo() error {
// TODO: can rv.VolSize not be a uint64? Or initialize it to -1?
ri.VolSize = int64(imageInfo.Size)

ri.ImageID, err = image.GetId()
if err != nil {
return err
}

features, err := image.GetFeatures()
if err != nil {
return err
@@ -1601,11 +1653,13 @@ func (ri *rbdImage) getImageInfo() error {
// the parent is an error or not.
if errors.Is(err, librbd.ErrNotFound) {
ri.ParentName = ""
ri.ParentID = ""
} else {
return err
}
} else {
ri.ParentName = parentInfo.Image.ImageName
ri.ParentID = parentInfo.Image.ImageID
ri.ParentPool = parentInfo.Image.PoolName
}
// Get image creation time
@@ -1636,6 +1690,7 @@ func (ri *rbdImage) getParent() (*rbdImage, error) {
parentImage.Pool = ri.ParentPool
parentImage.RadosNamespace = ri.RadosNamespace
parentImage.RbdImageName = ri.ParentName
parentImage.ImageID = ri.ParentID

err = parentImage.getImageInfo()
if err != nil {
@@ -1692,6 +1747,7 @@ type rbdImageMetadataStash struct {
Pool string `json:"pool"`
RadosNamespace string `json:"radosNamespace"`
ImageName string `json:"image"`
ImageID string `json:"imageID"`
UnmapOptions string `json:"unmapOptions"`
NbdAccess bool `json:"accessType"`
Encrypted bool `json:"encrypted"`
@@ -1721,6 +1777,7 @@ func stashRBDImageMetadata(volOptions *rbdVolume, metaDataPath string) error {
Pool: volOptions.Pool,
RadosNamespace: volOptions.RadosNamespace,
ImageName: volOptions.RbdImageName,
ImageID: volOptions.ImageID,
Encrypted: volOptions.isBlockEncrypted(),
UnmapOptions: volOptions.UnmapOptions,
}
12 changes: 11 additions & 1 deletion internal/rbd/snapshot.go
Original file line number Diff line number Diff line change
@@ -107,7 +107,17 @@ func generateVolFromSnap(rbdSnap *rbdSnapshot) *rbdVolume {
vol.JournalPool = rbdSnap.JournalPool
vol.RadosNamespace = rbdSnap.RadosNamespace
vol.RbdImageName = rbdSnap.RbdSnapName
vol.ImageID = rbdSnap.ImageID
vol.ParentName = rbdSnap.ParentName
vol.ParentID = rbdSnap.ParentID

// /!\ WARNING /!\
//
// Do not set the ImageID to the ID of the snapshot, a new image will
// be created based on the returned rbdVolume. If the ImageID is set to
// the ID of the snapshot, accessing the new image by ID will actually
// access the snapshot!
// vol.ImageID = rbdSnap.ImageID

// copyEncryptionConfig cannot be used here because the volume and the
// snapshot will have the same volumeID which cases the panic in
// copyEncryptionConfig function.