Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow extending volume size with inUse status #1174

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 27 additions & 28 deletions pkg/api/controllers/attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,35 @@ func (v *VolumeAttachmentPortal) CreateVolumeAttachment() {
return
}

if vol.Status == model.VolumeAvailable {
db.UpdateVolumeStatus(ctx, db.C, vol.Id, model.VolumeAttaching)
} else if vol.Status == model.VolumeInUse {
if vol.MultiAttach {
db.UpdateVolumeStatus(ctx, db.C, vol.Id, model.VolumeAttaching)
} else {
errMsg := "volume is already attached to one of the host. If you want to attach to multiple host, volume multiattach must be true"
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}
} else {
errMsg := "status of volume is available. It can be attached to host"
if vol.Status != model.VolumeAvailable {
errMsg := fmt.Sprintf("volume:%s status:%s should be available when mapping volume.", vol.Name, vol.Status)
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}

if (vol.Attached != nil && *vol.Attached) && !vol.MultiAttach {
errMsg := fmt.Sprintf("volume:%s is already attached to host:%s. If you want to attach to multiple host, volume multiattach must be true", vol.Name, host.HostName)
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}

// Check volume attachment with host id and volume id
attachments, err := db.C.ListVolumeAttachmentsWithFilter(ctx,
map[string][]string{"hostId": []string{attachment.HostId},
"volumeId": []string{attachment.VolumeId}})
if err != nil {
errMsg := fmt.Sprintf("check volume attachment failed, host:%s, volume:%s, err:%v", host.HostName, vol.Name, err)
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}
if len(attachments) > 0 {
errMsg := fmt.Sprintf("cannot attach volume:%s to same host:%s twice", vol.Name, host.HostName)
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}

// TODO: should we set volume status with VolumeAttaching??

// Set AccessProtocol
pol, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
Expand Down Expand Up @@ -146,14 +159,6 @@ func (v *VolumeAttachmentPortal) CreateVolumeAttachment() {
}
defer v.CtrClient.Close()

// // Note: In some protocols, there is no related initiator
// var initiatorPort = ""
// for _, e := range host.Initiators {
// if e.Protocol == protocol {
// initiatorPort = e.PortName
// break
// }
// }
var initiators []*pb.Initiator
for _, e := range host.Initiators {
initiator := pb.Initiator{
Expand Down Expand Up @@ -291,16 +296,10 @@ func (v *VolumeAttachmentPortal) DeleteVolumeAttachment() {
return
}

// If volume id is invalid, it would mean that volume attachment creation failed before the create method
// in storage driver was called, and delete its db entry directly.
vol, err := db.C.GetVolume(ctx, attachment.VolumeId)
if err != nil {
if err := db.C.DeleteVolumeAttachment(ctx, attachment.Id); err != nil {
errMsg := fmt.Sprintf("failed to delete volume attachment: %s", err.Error())
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}
v.SuccessHandle(StatusAccepted, nil)
errMsg := fmt.Sprintf("get volume failed in delete volume attachment method: %v", err)
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}

Expand Down
35 changes: 34 additions & 1 deletion pkg/api/controllers/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,36 @@ func (v *VolumePortal) ExtendVolume() {
return
}

if volume.Status != model.VolumeAvailable && volume.Status != model.VolumeErrorExtending {
errMsg := "the status of the volume to be extended must be available or error extending!"
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}

if extendRequestBody.NewSize <= volume.Size {
errMsg := fmt.Sprintf("new size for extend must be greater than current size."+
"(current: %d GB, extended: %d GB).", volume.Size, extendRequestBody.NewSize)
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}

pool, err := db.C.GetPool(ctx, volume.PoolId)
if err != nil {
errMsg := fmt.Sprintf("pool %s not found: %s", id, err.Error())
v.ErrorHandle(model.ErrorNotFound, errMsg)
return
}
if pool.FreeCapacity < extendRequestBody.NewSize-volume.Size {
errMsg := fmt.Sprintf("pool:%s capacity:%v is not enough", pool.Name, pool.FreeCapacity)
v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}
if pool.Status != model.PoolAvailable {
errMsg := fmt.Sprintf("pool:%s with status:%s can not support extending volume", pool.Name, pool.Status)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the message be

Suggested change
errMsg := fmt.Sprintf("pool:%s with status:%s can not support extending volume", pool.Name, pool.Status)
errMsg := fmt.Sprintf("pool:%s has status:%s. Can not extend volume", pool.Name, pool.Status)

v.ErrorHandle(model.ErrorBadRequest, errMsg)
return
}

prf, err := db.C.GetProfile(ctx, volume.ProfileId)
if err != nil {
errMsg := fmt.Sprintf("extend volume failed: %v", err.Error())
Expand All @@ -248,7 +278,8 @@ func (v *VolumePortal) ExtendVolume() {

// NOTE:It will update the the status of the volume waiting for expansion in
// the database to "extending" and return the result immediately.
result, err := util.ExtendVolumeDBEntry(ctx, id, &extendRequestBody)
volume.Status = model.VolumeExtending
result, err := db.C.ExtendVolume(ctx, volume)
if err != nil {
errMsg := fmt.Sprintf("extend volume failed: %s", err.Error())
v.ErrorHandle(model.ErrorBadRequest, errMsg)
Expand All @@ -271,6 +302,8 @@ func (v *VolumePortal) ExtendVolume() {
opt := &pb.ExtendVolumeOpts{
Id: id,
Size: extendRequestBody.NewSize,
PoolId: pool.Id,
PoolName: pool.Name,
Metadata: result.Metadata,
Context: ctx.ToJson(),
Profile: prf.ToJson(),
Expand Down
1 change: 1 addition & 0 deletions pkg/api/controllers/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func TestExtendVolume(t *testing.T) {
mockClient.On("GetVolume", c.NewAdminContext(), "bd5b12a8-a101-11e7-941e-d77981b584d8").Return(&SampleVolumes[0], nil)
mockClient.On("ExtendVolume", c.NewAdminContext(), &expected).Return(&expected, nil)
mockClient.On("GetProfile", c.NewAdminContext(), SampleReplications[0].ProfileId).Return(&SampleProfiles[0], nil)
mockClient.On("GetPool", c.NewAdminContext(), SampleVolumes[0].PoolId).Return(&SamplePools[0], nil)
db.C = mockClient

r, _ := http.NewRequest("POST", "/v1beta/block/volumes/bd5b12a8-a101-11e7-941e-d77981b584d8/resize", bytes.NewBuffer(jsonStr))
Expand Down
18 changes: 9 additions & 9 deletions pkg/api/util/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ func CreateVolumeSnapshotDBEntry(ctx *c.Context, in *model.VolumeSnapshotSpec) (
log.Error("get volume failed in create volume snapshot method: ", err)
return nil, err
}
if vol.Status != model.VolumeAvailable && vol.Status != model.VolumeInUse {
errMsg := "only the status of volume is available or in-use, the snapshot can be created"
if vol.Status != model.VolumeAvailable {
errMsg := "only the status of volume is available, the snapshot can be created"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is these messages correct?

log.Error(errMsg)
return nil, errors.New(errMsg)
}
Expand Down Expand Up @@ -504,8 +504,8 @@ func CreateReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec) (*model
log.Error("get primary volume failed in create volume replication method: ", err)
return nil, err
}
if pVol.Status != model.VolumeAvailable && pVol.Status != model.VolumeInUse {
var errMsg = fmt.Errorf("only the status of primary volume is available or in-use, the replicaiton can be created")
if pVol.Status != model.VolumeAvailable {
var errMsg = fmt.Errorf("only the status of primary volume is available, the replicaiton can be created")
log.Error(errMsg)
return nil, errMsg
}
Expand All @@ -514,8 +514,8 @@ func CreateReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec) (*model
log.Error("get secondary volume failed in create volume replication method: ", err)
return nil, err
}
if sVol.Status != model.VolumeAvailable && sVol.Status != model.VolumeInUse {
var errMsg = fmt.Errorf("only the status of secondary volume is available or in-use, the replicaiton can be created")
if sVol.Status != model.VolumeAvailable {
var errMsg = fmt.Errorf("only the status of secondary volume is available, the replicaiton can be created")
log.Error(errMsg)
return nil, errMsg
}
Expand Down Expand Up @@ -800,7 +800,7 @@ func ValidateAddVolumes(ctx *c.Context, volumes []*model.VolumeSpec, addVolumes
if !utils.Contained(addVolRef.ProfileId, vg.Profiles) {
return nil, fmt.Errorf("cannot add volume %s to group %s , volume profile is not supported by the group.", addVolRef.Id, vg.Id)
}
if addVolRef.Status != model.VolumeAvailable && addVolRef.Status != model.VolumeInUse {
if addVolRef.Status != model.VolumeAvailable {
return nil, fmt.Errorf("cannot add volume %s to group %s because volume is in invalid status %s", addVolRef.Id, vg.Id, addVolRef.Status)
}
if addVolRef.PoolId != vg.PoolId {
Expand All @@ -818,7 +818,7 @@ func ValidateRemoveVolumes(ctx *c.Context, volumes []*model.VolumeSpec, removeVo
for _, v := range removeVolumes {
for _, volume := range volumes {
if v == volume.Id {
if volume.Status != model.VolumeAvailable && volume.Status != model.VolumeInUse && volume.Status != model.VolumeError && volume.Status != model.VolumeErrorDeleting {
if volume.Status != model.VolumeAvailable && volume.Status != model.VolumeError && volume.Status != model.VolumeErrorDeleting {
return nil, fmt.Errorf("cannot remove volume %s from group %s, volume is in invalid status %s", volume.Id, vg.Id, volume.Status)
}
break
Expand Down Expand Up @@ -890,7 +890,7 @@ func DeleteVolumeGroupDBEntry(ctx *c.Context, volumeGroupId string) error {

var volumesUpdate []*model.VolumeSpec
for _, value := range volumes {
if value.AttachStatus == model.VolumeAttached {
if *value.Attached {
msg := fmt.Sprintf("volume %s in group %s is attached. Need to deach first.", value.Id, vg.Id)
log.Error(msg)
return errors.New(msg)
Expand Down
78 changes: 41 additions & 37 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package controller
import (
"context"
"encoding/json"
"errors"
"fmt"

log "github.com/golang/glog"
Expand Down Expand Up @@ -216,7 +215,7 @@ func (c *Controller) ExtendVolume(contx context.Context, opt *pb.ExtendVolumeOpt
log.Info("Controller server receive extend volume request, vr =", opt)

ctx := osdsCtx.NewContextFromJson(opt.GetContext())
vol, err := db.C.GetVolume(ctx, opt.Id)
vol, err := db.C.GetVolume(ctx, opt.GetId())
if err != nil {
log.Error("get volume failed in extend volume method: ", err.Error())
return pb.GenericResponseError(err), err
Expand All @@ -226,33 +225,17 @@ func (c *Controller) ExtendVolume(contx context.Context, opt *pb.ExtendVolumeOpt
var rollBack = false
defer func() {
if rollBack {
db.UpdateVolumeStatus(ctx, db.C, opt.Id, model.VolumeAvailable)
vol.Status = model.VolumeErrorExtending
db.C.UpdateVolume(ctx, vol)
}
}()

pool, err := db.C.GetPool(ctx, vol.PoolId)
if nil != err {
log.Error("get pool failed in extend volume method: ", err.Error())
rollBack = true
return pb.GenericResponseError(err), err
}

var newSize = opt.GetSize()
if pool.FreeCapacity <= (newSize - vol.Size) {
reason := fmt.Sprintf("pool free capacity(%d) < new size(%d) - old size(%d)",
pool.FreeCapacity, newSize, vol.Size)
rollBack = true
return pb.GenericResponseError(reason), errors.New(reason)
}
opt.PoolId = pool.Id
opt.PoolName = pool.Name
prf := model.NewProfileFromJson(opt.Profile)

// Select the storage tag according to the lifecycle flag.
prf := model.NewProfileFromJson(opt.Profile)
c.policyController = policy.NewController(prf)
c.policyController.Setup(EXTEND_LIFECIRCLE_FLAG)

dockInfo, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
dockInfo, err := db.C.GetDockByPoolId(ctx, opt.PoolId)
if err != nil {
log.Error("when search dock in db by pool id: ", err.Error())
rollBack = true
Expand All @@ -263,19 +246,21 @@ func (c *Controller) ExtendVolume(contx context.Context, opt *pb.ExtendVolumeOpt
c.volumeController.SetDock(dockInfo)
opt.DriverName = dockInfo.DriverName

result, err := c.volumeController.ExtendVolume(opt)
if err != nil {
if _, err := c.volumeController.ExtendVolume(opt); err != nil {
log.Error("extend volume failed: ", err.Error())
rollBack = true
return pb.GenericResponseError(err), err
}

// Update the volume data in database.
result.Size = newSize
result.PoolId, result.ProfileId = opt.GetPoolId(), opt.GetProfileId()
db.C.UpdateStatus(ctx, result, model.VolumeAvailable)
vol.Size = opt.GetSize()
vol.Status = model.VolumeAvailable
if _, err := db.C.UpdateVolume(ctx, vol); err != nil {
log.Errorf("failed to update volume size and status: %v", err.Error())
return pb.GenericResponseError(err), err
}

volBody, _ := json.Marshal(result)
volBody, _ := json.Marshal(vol)
var errChan = make(chan error, 1)
defer close(errChan)
go c.policyController.ExecuteAsyncPolicy(opt, string(volBody), errChan)
Expand All @@ -285,7 +270,7 @@ func (c *Controller) ExtendVolume(contx context.Context, opt *pb.ExtendVolumeOpt
return pb.GenericResponseError(err), err
}

return pb.GenericResponseResult(result), nil
return pb.GenericResponseResult(vol), nil
}

// CreateVolumeAttachment implements pb.ControllerServer.CreateVolumeAttachment
Expand All @@ -306,7 +291,6 @@ func (c *Controller) CreateVolumeAttachment(contx context.Context, opt *pb.Creat
result, err := c.volumeController.CreateVolumeAttachment(opt)
if err != nil {
db.UpdateVolumeAttachmentStatus(ctx, db.C, opt.Id, model.VolumeAttachError)
db.UpdateVolumeStatus(ctx, db.C, opt.VolumeId, model.VolumeAvailable)
msg := fmt.Sprintf("create volume attachment failed: %v", err)
log.Error(msg)
return pb.GenericResponseError(msg), err
Expand All @@ -317,14 +301,14 @@ func (c *Controller) CreateVolumeAttachment(contx context.Context, opt *pb.Creat
log.Error("get volume failed in CreateVolumeAttachment method: ", err.Error())
return pb.GenericResponseError(err), err
}

if vol.Status == model.VolumeAttaching {
db.UpdateVolumeStatus(ctx, db.C, vol.Id, model.VolumeInUse)
} else {
msg := fmt.Sprintf("wrong volume status when volume attachment creation completed")
vol.Attached = new(bool)
*vol.Attached = true
if _, err := db.C.UpdateVolume(ctx, vol); err != nil {
msg := fmt.Sprintf("failed to set volume:%s attached to true", vol.Name)
log.Error(msg)
return pb.GenericResponseError(msg), err
}

result.AccessProtocol = opt.AccessProtocol
result.Status = model.VolumeAttachAvailable

Expand Down Expand Up @@ -354,7 +338,7 @@ func (c *Controller) DeleteVolumeAttachment(contx context.Context, opt *pb.Delet
if err = c.volumeController.DeleteVolumeAttachment(opt); err != nil {
msg := fmt.Sprintf("delete volume attachment failed: %v", err)
log.Error(msg)
db.C.DeleteVolumeAttachment(ctx, opt.Id)
db.UpdateVolumeAttachmentStatus(ctx, db.C, opt.Id, model.VolumeAttachErrorDeleting)
return pb.GenericResponseError(msg), err
}

Expand All @@ -364,8 +348,28 @@ func (c *Controller) DeleteVolumeAttachment(contx context.Context, opt *pb.Delet
return pb.GenericResponseError(msg), err
}

db.UpdateVolumeStatus(ctx, db.C, opt.VolumeId, model.VolumeAvailable)
// Check and update volume attached status
attachments, err := db.C.ListVolumeAttachmentsWithFilter(ctx, map[string][]string{"volumeId": []string{opt.VolumeId}})
if err != nil {
msg := fmt.Sprintf("list volume attachment failed in controller.DeleteAttachment: %v", err)
log.Error(msg)
return pb.GenericResponseError(msg), err
}
if len(attachments) == 0 {
vol, err := db.C.GetVolume(ctx, opt.VolumeId)
if err != nil {
msg := fmt.Sprintf("get volume failed in controller.DeleteAttachment: %v", err)
log.Error(msg)
return pb.GenericResponseError(msg), err
}
vol.Attached = new(bool)
if _, err := db.C.UpdateVolume(ctx, vol); err != nil {
msg := fmt.Sprintf("set volume attached to false failed in controller.DeleteAttachment: %v", err)
log.Error(msg)
return pb.GenericResponseError(msg), err
}

}
return pb.GenericResponseResult(nil), nil
}

Expand Down
Loading