Skip to content

Commit

Permalink
fix: handle kubelet restart
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmyagkov committed Oct 8, 2024
1 parent a7716dc commit 5496f67
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 3 deletions.
51 changes: 51 additions & 0 deletions cloud/blockstore/tests/csi_driver/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,54 @@ def test_publish_volume_twice_on_the_same_node():
with called_process_error_logged():
env.csi.delete_volume(volume_name)
cleanup_after_test(env)


def test_restart_kubelet_with_old_format_endpoint():
env, run = init()
try:
volume_name = "example-disk"
volume_size = 1024 ** 3
pod_name1 = "example-pod-1"
pod_id1 = "deadbeef1"
env.csi.create_volume(name=volume_name, size=volume_size)

# skip stage to create endpoint with old format
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
except subprocess.CalledProcessError as e:
log_called_process_error(e)
raise
finally:
with called_process_error_logged():
env.csi.unpublish_volume(pod_id1, volume_name)
with called_process_error_logged():
env.csi.unstage_volume(volume_name)
with called_process_error_logged():
env.csi.delete_volume(volume_name)
cleanup_after_test(env)


def test_restart_kubelet_with_new_format_endpoint():
env, run = init()
try:
volume_name = "example-disk"
volume_size = 1024 ** 3
pod_name1 = "example-pod-1"
pod_id1 = "deadbeef1"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
except subprocess.CalledProcessError as e:
log_called_process_error(e)
raise
finally:
with called_process_error_logged():
env.csi.unpublish_volume(pod_id1, volume_name)
with called_process_error_logged():
env.csi.unstage_volume(volume_name)
with called_process_error_logged():
env.csi.delete_volume(volume_name)
cleanup_after_test(env)
59 changes: 56 additions & 3 deletions cloud/blockstore/tools/csi_driver/internal/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,12 +650,65 @@ func (s *nodeService) nodePublishDiskAsFilesystem(
return nil
}

func (s *nodeService) IsMountConflictError(err error) bool {
if err != nil {
var clientErr *nbsclient.ClientError
if errors.As(err, &clientErr) {
if clientErr.Code == nbsclient.E_MOUNT_CONFLICT {
return true
}
}
}

return false
}

func (s *nodeService) hasDeprecatedEndpoint(
ctx context.Context,
volumeId string) (bool, error) {

listEndpointsResp, err := s.nbsClient.ListEndpoints(
ctx, &nbsapi.TListEndpointsRequest{},
)
if err != nil {
log.Printf("List endpoints failed %v", err)
return false, err
}

if len(listEndpointsResp.Endpoints) == 0 {
return false, nil
}

for _, endpoint := range listEndpointsResp.Endpoints {
if endpoint.DiskId == volumeId {
pathList := filepath.SplitList(endpoint.UnixSocketPath)
for _, path := range pathList {
if path == "v2" {
return false, nil
}
}
}
}

return true, nil
}

func (s *nodeService) nodeStageDiskAsFilesystem(
ctx context.Context,
req *csi.NodeStageVolumeRequest) error {

resp, err := s.startNbsEndpointForNBD(ctx, s.nodeID, req.VolumeId, req.VolumeContext)
instanceId := filepath.Join("v2", s.nodeID)
resp, err := s.startNbsEndpointForNBD(ctx, instanceId, req.VolumeId, req.VolumeContext)
if err != nil {
if s.IsMountConflictError(err) {
deprecatedEndpoint, err := s.hasDeprecatedEndpoint(ctx, req.VolumeId)
if err != nil {
return err
}
if deprecatedEndpoint {
return nil
}
}
return fmt.Errorf("failed to start NBS endpoint: %w", err)
}

Expand Down Expand Up @@ -855,7 +908,7 @@ func (s *nodeService) nodeUnstageVolume(
return err
}

endpointDir := s.getEndpointDir(s.nodeID, req.VolumeId)
endpointDir := s.getEndpointDir(filepath.Join("v2", s.nodeID), req.VolumeId)
if s.nbsClient != nil {
_, err := s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{
UnixSocketPath: filepath.Join(endpointDir, nbsSocketName),
Expand Down Expand Up @@ -1300,7 +1353,7 @@ func (s *nodeService) NodeExpandVolume(
endpointDirOld := s.getEndpointDir(podId, req.VolumeId)
unixSocketPathOld := filepath.Join(endpointDirOld, nbsSocketName)

endpointDirNew := s.getEndpointDir(s.nodeID, req.VolumeId)
endpointDirNew := s.getEndpointDir(filepath.Join("v2", s.nodeID), req.VolumeId)
unixSocketPathNew := filepath.Join(endpointDirNew, nbsSocketName)

listEndpointsResp, err := s.nbsClient.ListEndpoints(
Expand Down

0 comments on commit 5496f67

Please sign in to comment.