Skip to content

Commit

Permalink
Fix ControllerVolumeUnpublish with deleted nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
clintonk authored Mar 2, 2022
1 parent a357c9e commit 7a1104d
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 105 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Fixed issue where CSI storage class parameters weren't allowed. (Issue [#598](https://github.com/NetApp/trident/issues/598))
- Fixed duplicate key declaration in Trident CRD. (Issue [#671](https://github.com/NetApp/trident/issues/671))
- Fixed inaccurate CSI Snapshot logs. (Issue [#629](https://github.com/NetApp/trident/issues/629))
- Fixed issue with unpublishing volumes on deleted nodes. (Issue [#691](https://github.com/NetApp/trident/issues/691))

**Enhancements**

Expand Down
122 changes: 105 additions & 17 deletions core/orchestrator_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2174,8 +2174,9 @@ func (o *TridentOrchestrator) GetVolumeExternal(
}

// GetVolumeByInternalName returns a volume by the given internal name
func (o *TridentOrchestrator) GetVolumeByInternalName(volumeInternal string, ctx context.Context) (volume string,
err error) {
func (o *TridentOrchestrator) GetVolumeByInternalName(
volumeInternal string, _ context.Context,
) (volume string, err error) {

defer recordTiming("volume_internal_get", &err)()

Expand Down Expand Up @@ -3031,9 +3032,7 @@ func (o *TridentOrchestrator) PublishVolume(
return nil
}

func (o *TridentOrchestrator) UnpublishVolume(
ctx context.Context, volumeName string, publishInfo *utils.VolumePublishInfo,
) (err error) {
func (o *TridentOrchestrator) UnpublishVolume(ctx context.Context, volumeName, nodeName string) (err error) {

if o.bootstrapError != nil {
return o.bootstrapError
Expand All @@ -3049,19 +3048,56 @@ func (o *TridentOrchestrator) UnpublishVolume(
return utils.NotFoundError(fmt.Sprintf("volume %s not found", volumeName))
}

nodes := make([]*utils.Node, 0)
for _, node := range o.nodes {
nodes = append(nodes, node)
node, ok := o.nodes[nodeName]
if !ok {
return utils.NotFoundError(fmt.Sprintf("node %s not found", nodeName))
}
publishInfo.Nodes = nodes
publishInfo.BackendUUID = volume.BackendUUID

// Build list of nodes to which the volume remains published
nodeMap := make(map[string]*utils.Node)
for _, pub := range o.listVolumePublicationsForVolume(ctx, volumeName) {
if pub.NodeName == nodeName {
continue
}
if n, found := o.nodes[pub.NodeName]; !found {
Logc(ctx).WithField("node", pub.NodeName).Warning("Node not found during volume unpublish.")
continue
} else {
nodeMap[pub.NodeName] = n
}
}
nodes := make([]*utils.Node, 0, len(nodeMap))
for _, n := range nodeMap {
nodes = append(nodes, n)
}

backend, ok := o.backends[volume.BackendUUID]
if !ok {
// Not a not found error because this is not user input
return fmt.Errorf("backend %s not found", volume.BackendUUID)
}

return backend.UnpublishVolume(ctx, volume.Config, publishInfo)
// Unpublish the volume
if err = backend.UnpublishVolume(ctx, volume.Config, nodes); err != nil {
return err
}

// Check for publications remaining on the node, not counting the one we just unpublished.
nodePubFound := false
for _, pub := range o.listVolumePublicationsForNode(ctx, nodeName) {
if pub.VolumeName == volumeName {
continue
}
nodePubFound = true
break
}

// If the node is known to be gone, and if we just unpublished the last volume on that node, delete the node.
if !nodePubFound && node.Deleted {
return o.deleteNode(ctx, nodeName)
}

return nil
}

// AttachVolume mounts a volume to the local host. This method is currently only used by Docker,
Expand Down Expand Up @@ -4263,7 +4299,7 @@ func (o *TridentOrchestrator) ListNodes(context.Context) (nodes []*utils.Node, e
return nodes, nil
}

func (o *TridentOrchestrator) DeleteNode(ctx context.Context, nName string) (err error) {
func (o *TridentOrchestrator) DeleteNode(ctx context.Context, nodeName string) (err error) {
if o.bootstrapError != nil {
return o.bootstrapError
}
Expand All @@ -4274,14 +4310,50 @@ func (o *TridentOrchestrator) DeleteNode(ctx context.Context, nName string) (err
defer o.mutex.Unlock()
defer o.updateMetrics()

node, found := o.nodes[nName]
node, found := o.nodes[nodeName]
if !found {
return utils.NotFoundError(fmt.Sprintf("node %s not found", nName))
return utils.NotFoundError(fmt.Sprintf("node %s not found", nodeName))
}

publicationCount := len(o.listVolumePublicationsForNode(ctx, nodeName))
logFields := log.Fields{
"name": nodeName,
"publications": publicationCount,
}

if publicationCount == 0 {

// No volumes are published to this node, so delete the CR and update backends that support node-level access.
Logc(ctx).WithFields(logFields).Debug("No volumes publications remain for node, removing node CR.")

return o.deleteNode(ctx, nodeName)

} else {

// There are still volumes published to this node, so this is a sudden node removal. Preserve the node CR
// with a Deleted flag so we can handle the eventual unpublish calls for the affected volumes.
Logc(ctx).WithFields(logFields).Debug("Volume publications remain for node, marking node CR as deleted.")

node.Deleted = true
if err = o.storeClient.AddOrUpdateNode(ctx, node); err != nil {
return err
}
}

return nil
}

func (o *TridentOrchestrator) deleteNode(ctx context.Context, nodeName string) (err error) {

node, found := o.nodes[nodeName]
if !found {
return utils.NotFoundError(fmt.Sprintf("node %s not found", nodeName))
}

if err = o.storeClient.DeleteNode(ctx, node); err != nil {
return err
}
delete(o.nodes, nName)
delete(o.nodes, nodeName)
o.invalidateAllBackendNodeAccess()
return o.reconcileNodeAccessOnAllBackends(ctx)
}
Expand Down Expand Up @@ -4365,11 +4437,19 @@ func (o *TridentOrchestrator) ListVolumePublicationsForVolume(
o.mutex.Lock()
defer o.mutex.Unlock()

publications = o.listVolumePublicationsForVolume(ctx, volumeName)
return
}

func (o *TridentOrchestrator) listVolumePublicationsForVolume(
_ context.Context, volumeName string,
) (publications []*utils.VolumePublication) {

publications = []*utils.VolumePublication{}
for _, pub := range o.volumePublications[volumeName] {
publications = append(publications, pub)
}
return publications, nil
return publications
}

// ListVolumePublicationsForNode returns a list of all volume publications for a given node
Expand All @@ -4385,13 +4465,21 @@ func (o *TridentOrchestrator) ListVolumePublicationsForNode(
o.mutex.Lock()
defer o.mutex.Unlock()

publications = o.listVolumePublicationsForNode(ctx, nodeName)
return
}

func (o *TridentOrchestrator) listVolumePublicationsForNode(
_ context.Context, nodeName string,
) (publications []*utils.VolumePublication) {

publications = []*utils.VolumePublication{}
for _, pubs := range o.volumePublications {
if pubs[nodeName] != nil {
publications = append(publications, pubs[nodeName])
}
}
return publications, nil
return
}

// DeleteVolumePublication deletes the record of the volume publication for a given volume/node pair
Expand Down
24 changes: 15 additions & 9 deletions core/orchestrator_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3260,6 +3260,7 @@ func TestAddNode(t *testing.T) {
IQN: "myIQN",
IPs: []string{"1.1.1.1", "2.2.2.2"},
TopologyLabels: map[string]string{"topology.kubernetes.io/region": "Region1"},
Deleted: false,
}
orchestrator := getOrchestrator(t)
if err := orchestrator.AddNode(ctx(), node, nil); err != nil {
Expand All @@ -3274,12 +3275,14 @@ func TestGetNode(t *testing.T) {
IQN: "myIQN",
IPs: []string{"1.1.1.1", "2.2.2.2"},
TopologyLabels: map[string]string{"topology.kubernetes.io/region": "Region1"},
Deleted: false,
}
unexpectedNode := &utils.Node{
Name: "testNode2",
IQN: "myOtherIQN",
IPs: []string{"3.3.3.3", "4.4.4.4"},
TopologyLabels: map[string]string{"topology.kubernetes.io/region": "Region2"},
Deleted: false,
}
initialNodes := map[string]*utils.Node{}
initialNodes[expectedNode.Name] = expectedNode
Expand All @@ -3299,14 +3302,16 @@ func TestGetNode(t *testing.T) {
func TestListNodes(t *testing.T) {
orchestrator := getOrchestrator(t)
expectedNode1 := &utils.Node{
Name: "testNode",
IQN: "myIQN",
IPs: []string{"1.1.1.1", "2.2.2.2"},
Name: "testNode",
IQN: "myIQN",
IPs: []string{"1.1.1.1", "2.2.2.2"},
Deleted: false,
}
expectedNode2 := &utils.Node{
Name: "testNode2",
IQN: "myOtherIQN",
IPs: []string{"3.3.3.3", "4.4.4.4"},
Name: "testNode2",
IQN: "myOtherIQN",
IPs: []string{"3.3.3.3", "4.4.4.4"},
Deleted: false,
}
initialNodes := map[string]*utils.Node{}
initialNodes[expectedNode1.Name] = expectedNode1
Expand Down Expand Up @@ -3350,9 +3355,10 @@ func unorderedNodeSlicesEqual(x, y []*utils.Node) bool {
func TestDeleteNode(t *testing.T) {
orchestrator := getOrchestrator(t)
initialNode := &utils.Node{
Name: "testNode",
IQN: "myIQN",
IPs: []string{"1.1.1.1", "2.2.2.2"},
Name: "testNode",
IQN: "myIQN",
IPs: []string{"1.1.1.1", "2.2.2.2"},
Deleted: false,
}
initialNodes := map[string]*utils.Node{}
initialNodes[initialNode.Name] = initialNode
Expand Down
2 changes: 1 addition & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Orchestrator interface {
ListVolumes(ctx context.Context) ([]*storage.VolumeExternal, error)
ListVolumesByPlugin(ctx context.Context, pluginName string) ([]*storage.VolumeExternal, error)
PublishVolume(ctx context.Context, volumeName string, publishInfo *utils.VolumePublishInfo) error
UnpublishVolume(ctx context.Context, volumeName string, publishInfo *utils.VolumePublishInfo) error
UnpublishVolume(ctx context.Context, volumeName, nodeName string) error
ResizeVolume(ctx context.Context, volumeName, newSize string) error
SetVolumeState(ctx context.Context, volumeName string, state storage.VolumeState) error

Expand Down
50 changes: 13 additions & 37 deletions frontend/csi/controller_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,48 +480,24 @@ func (p *Plugin) ControllerUnpublishVolume(
return nil, status.Error(codes.InvalidArgument, "no node ID provided")
}

// Check if volume exists. If not, return success.
volume, err := p.orchestrator.GetVolume(ctx, volumeID)
if err != nil {
if utils.IsNotFoundError(err) {
return &csi.ControllerUnpublishVolumeResponse{}, nil
} else {
return nil, p.getCSIErrorForOrchestratorError(err)
}
}
logFields := log.Fields{"volume": req.GetVolumeId(), "node": req.GetNodeId()}

// Get node attributes from the node ID
nodeInfo, err := p.orchestrator.GetNode(ctx, nodeID)
if err != nil {
Logc(ctx).WithField("node", nodeID).Error("Node info not found.")
return nil, status.Error(codes.NotFound, err.Error())
}

// Set up volume publish info with what we know about the node
volumePublishInfo := &utils.VolumePublishInfo{
Localhost: false,
HostIQN: []string{nodeInfo.IQN},
HostIP: nodeInfo.IPs,
HostName: nodeInfo.Name,
Unmanaged: volume.Config.ImportNotManaged,
}

// Optionally update NFS export rules, remove node IQN from igroup, etc.
if err = p.orchestrator.UnpublishVolume(ctx, volume.Config.Name, volumePublishInfo); err != nil {
return nil, status.Error(codes.Internal, err.Error())
// Unpublish the volume by updating NFS export rules, removing node IQN from igroup, etc.
if err := p.orchestrator.UnpublishVolume(ctx, volumeID, nodeID); err != nil {
if !utils.IsNotFoundError(err) {
Logc(ctx).WithFields(logFields).WithError(err).Error("Could not unpublish volume.")
return nil, status.Error(codes.Internal, err.Error())
}
Logc(ctx).WithFields(logFields).WithError(err).Warning("Unpublish targets not found, continuing.")
}

if err = p.orchestrator.DeleteVolumePublication(ctx, req.GetVolumeId(), req.GetNodeId()); err != nil {
// Remove the stateful publish tracking record.
if err := p.orchestrator.DeleteVolumePublication(ctx, req.GetVolumeId(), req.GetNodeId()); err != nil {
if !utils.IsNotFoundError(err) {
msg := "error removing volume publication record"
Logc(ctx).WithError(err).Error(msg)
return nil, status.Error(codes.Internal, msg)
} else {
Logc(ctx).WithFields(log.Fields{
"VolumeID": req.GetVolumeId(),
"NodeID": req.GetNodeId(),
}).Warn("Volume publication record not found.")
Logc(ctx).WithFields(logFields).WithError(err).Error("Could not remove volume publication record.")
return nil, status.Error(codes.Internal, err.Error())
}
Logc(ctx).WithFields(logFields).Warning("Volume publication record not found, returning success.")
}

return &csi.ControllerUnpublishVolumeResponse{}, nil
Expand Down
Loading

0 comments on commit 7a1104d

Please sign in to comment.