Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Add Modify volume rpc support #4916

Draft
wants to merge 1 commit into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,67 @@ func (cs *ControllerServer) ControllerExpandVolume(
}, nil
}

// ControllerExpandVolume expands CephFS Volumes on demand based on resizer request.
func (cs *ControllerServer) ControllerModifyVolume(
ctx context.Context,
req *csi.ControllerModifyVolumeRequest,
) (*csi.ControllerModifyVolumeResponse, error) {
if err := cs.validateModifyVolumeRequest(req); err != nil {
log.ErrorLog(ctx, "ControllerModifyVolumeRequest validation failed: %v", err)

return nil, err
}

volID := req.GetVolumeId()
secret := req.GetSecrets()
params := req.GetMutableParameters()

// lock out parallel delete operations
if acquired := cs.VolumeLocks.TryAcquire(volID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer cs.VolumeLocks.Release(volID)

// lock out volumeID for clone, delete, expand and restore operation
if err := cs.OperationLocks.GetModifyLock(volID); err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Aborted, err.Error())
}
defer cs.OperationLocks.ReleaseModifyLock(volID)

cr, err := util.NewAdminCredentials(secret)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()

volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, params, secret,
cs.ClusterName, cs.SetMetadata)
if err != nil {
log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err)

return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer volOptions.Destroy()

if volOptions.BackingSnapshot {
return nil, status.Error(codes.InvalidArgument, "cannot modify snapshot-backed volume")
}

volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
if err = volClient.CreateVolume(ctx); err != nil {
log.ErrorLog(ctx, "failed to modify volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err)

return nil, status.Error(codes.Internal, err.Error())
}

return &csi.ControllerModifyVolumeResponse{}, nil
}

// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
//
Expand Down
1 change: 1 addition & 0 deletions internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (fs *Driver) Run(conf *util.Config) {
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
csi.ControllerServiceCapability_RPC_MODIFY_VOLUME,
})

fs.cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
Expand Down
2 changes: 1 addition & 1 deletion internal/cephfs/store/volumeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error {
optName, actual, expected)
}

// getVolumeOptions validates the basic required basic options provided in the
// getVolumeOptions validates the required basic options provided in the
// volume parameters and extract the volumeOptions from volume parameters.
// It contains the following checks:
// - clusterID must be set
Expand Down
18 changes: 18 additions & 0 deletions internal/cephfs/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,21 @@ func (cs *ControllerServer) validateExpandVolumeRequest(req *csi.ControllerExpan

return nil
}

// validateModifyVolumeRequest validates the Controller ModifyVolume request.
func (cs *ControllerServer) validateModifyVolumeRequest(req *csi.ControllerModifyVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_MODIFY_VOLUME); err != nil {
return fmt.Errorf("invalid ModifyVolumeRequest: %w", err)
}

if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
}

mutableParam := req.GetMutableParameters()
if mutableParam == nil {
return status.Error(codes.InvalidArgument, "MutableParameters cannot be empty")
}

return nil
}
42 changes: 40 additions & 2 deletions internal/util/idlocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
cloneOpt operation = "clone"
restoreOp operation = "restore"
expandOp operation = "expand"
modifyOp operation = "modify"
)

// OperationLock implements a map with atomic operations.
Expand Down Expand Up @@ -102,6 +103,7 @@ func NewOperationLock() *OperationLock {
lock[cloneOpt] = make(map[string]int)
lock[restoreOp] = make(map[string]int)
lock[expandOp] = make(map[string]int)
lock[modifyOp] = make(map[string]int)

return &OperationLock{
locks: lock,
Expand Down Expand Up @@ -165,12 +167,37 @@ func (ol *OperationLock) tryAcquire(op operation, volumeID string) error {
if _, ok := ol.locks[cloneOpt][volumeID]; ok {
return fmt.Errorf("a Clone operation with given id %s already exists", volumeID)
}
// check any delete operation is going on for given volume ID
// check any create operation is going on for given volume ID
if _, ok := ol.locks[createOp][volumeID]; ok {
return fmt.Errorf("a Create operation with given id %s already exists", volumeID)
}

ol.locks[expandOp][volumeID] = 1
case modifyOp:
// During modify operation the volume should not be deleted, cloned,
// resized and restored and there should not be a create operation also.
// check any delete operation is going on for given volume ID
if _, ok := ol.locks[deleteOp][volumeID]; ok {
return fmt.Errorf("a Delete operation with given id %s already exists", volumeID)
}
// check any clone operation is going on for given volume ID
if _, ok := ol.locks[cloneOpt][volumeID]; ok {
return fmt.Errorf("a Clone operation with given id %s already exists", volumeID)
}
// check any expand operation is going on for given volume ID
if _, ok := ol.locks[expandOp][volumeID]; ok {
return fmt.Errorf("a Expand operation with given id %s already exists", volumeID)
}
// check any restore operation is going on for given volume ID
if _, ok := ol.locks[restoreOp][volumeID]; ok {
return fmt.Errorf("a Restore operation with given id %s already exists", volumeID)
}
// check any create operation is going on for given volume ID
if _, ok := ol.locks[createOp][volumeID]; ok {
return fmt.Errorf("a Expand operation with given id %s already exists", volumeID)
}

ol.locks[modifyOp][volumeID] = 1
default:
return fmt.Errorf("%v operation not supported", op)
}
Expand Down Expand Up @@ -206,6 +233,12 @@ func (ol *OperationLock) GetExpandLock(volumeID string) error {
return ol.tryAcquire(expandOp, volumeID)
}

// GetModifyLock gets the modify lock on given volumeID,ensures that there is
// no create, delete, clone, expand, and restore operation on given volumeID.
func (ol *OperationLock) GetModifyLock(volumeID string) error {
return ol.tryAcquire(modifyOp, volumeID)
}

// ReleaseSnapshotCreateLock releases the create lock on given volumeID.
func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) {
ol.release(createOp, volumeID)
Expand All @@ -231,12 +264,17 @@ func (ol *OperationLock) ReleaseExpandLock(volumeID string) {
ol.release(expandOp, volumeID)
}

// ReleaseModifyLock releases the modify lock on given volumeID.
func (ol *OperationLock) ReleaseModifyLock(volumeID string) {
ol.release(modifyOp, volumeID)
}

// release deletes the lock on volumeID.
func (ol *OperationLock) release(op operation, volumeID string) {
ol.mux.Lock()
defer ol.mux.Unlock()
switch op {
case cloneOpt, createOp, expandOp, restoreOp, deleteOp:
case cloneOpt, createOp, expandOp, restoreOp, deleteOp, modifyOp:
if val, ok := ol.locks[op][volumeID]; ok {
// decrement the counter for operation
ol.locks[op][volumeID] = val - 1
Expand Down
Loading