Skip to content

Commit

Permalink
Implement stage/unstage/publish/unpublish volume according csi spec f…
Browse files Browse the repository at this point in the history
…or block mode
  • Loading branch information
antonmyagkov committed Oct 16, 2024
1 parent 080c2ce commit 61eaf8b
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 96 deletions.
85 changes: 54 additions & 31 deletions cloud/blockstore/tests/csi_driver/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,13 @@ def create_volume(self, name: str, size: int):
def delete_volume(self, name: str):
return self._controller_run("deletevolume", "--id", name)

def stage_volume(self, volume_id: str):
def stage_volume(self, volume_id: str, access_type: str):
return self._node_run(
"stagevolume",
"--volume-id",
volume_id,
"--access-type",
access_type,
)

def unstage_volume(self, volume_id: str):
Expand All @@ -205,7 +207,13 @@ def unstage_volume(self, volume_id: str):
volume_id,
)

def publish_volume(self, pod_id: str, volume_id: str, pod_name: str, fs_type: str = ""):
def publish_volume(
self,
pod_id: str,
volume_id: str,
pod_name: str,
access_type: str,
fs_type: str = ""):
return self._node_run(
"publishvolume",
"--pod-id",
Expand All @@ -216,15 +224,19 @@ def publish_volume(self, pod_id: str, volume_id: str, pod_name: str, fs_type: st
pod_name,
"--fs-type",
fs_type,
"--access-type",
access_type,
)

def unpublish_volume(self, pod_id: str, volume_id: str):
def unpublish_volume(self, pod_id: str, volume_id: str, access_type: str):
return self._node_run(
"unpublishvolume",
"--pod-id",
pod_id,
"--volume-id",
volume_id,
"--access-type",
access_type,
)

def stop(self):
Expand All @@ -244,7 +256,7 @@ def volumestats(self, pod_id: str, volume_id: str):
)
return json.loads(ret)

def expand_volume(self, pod_id: str, volume_id: str, size: int):
def expand_volume(self, pod_id: str, volume_id: str, size: int, access_type: str):
return self._node_run(
"expandvolume",
"--pod-id",
Expand All @@ -253,6 +265,8 @@ def expand_volume(self, pod_id: str, volume_id: str, size: int):
volume_id,
"--size",
str(size),
"--access-type",
access_type,
)


Expand Down Expand Up @@ -287,9 +301,10 @@ def test_nbs_csi_driver_mounted_disk_protected_from_deletion():
volume_size = 10 * 1024 ** 3
pod_name = "example-pod"
pod_id = "deadbeef"
access_type = "mount"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id, volume_name, pod_name)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id, volume_name, pod_name, access_type)
result = run(
"destroyvolume",
"--disk-id",
Expand All @@ -303,7 +318,7 @@ def test_nbs_csi_driver_mounted_disk_protected_from_deletion():
raise AssertionError("Destroyvolume must return exit code 1")
assert "E_REJECTED" in result.stdout
with called_process_error_logged():
env.csi.unpublish_volume(pod_id, volume_name)
env.csi.unpublish_volume(pod_id, volume_name, access_type)
with called_process_error_logged():
env.csi.unstage_volume(volume_name)
with called_process_error_logged():
Expand All @@ -329,9 +344,10 @@ def test_nbs_csi_driver_volume_stat():
volume_size = 1024 ** 3
pod_name = "example-pod"
pod_id = "deadbeef"
access_type = "mount"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id, volume_name, pod_name)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id, volume_name, pod_name, access_type)
stats1 = env.csi.volumestats(pod_id, volume_name)

assert "usage" in stats1
Expand Down Expand Up @@ -368,7 +384,7 @@ def test_nbs_csi_driver_volume_stat():
assert 2 == nodesUsage2["used"] - nodesUsage1["used"]

with called_process_error_logged():
env.csi.unpublish_volume(pod_id, volume_name)
env.csi.unpublish_volume(pod_id, volume_name, access_type)
with called_process_error_logged():
env.csi.unstage_volume(volume_name)
with called_process_error_logged():
Expand Down Expand Up @@ -434,12 +450,16 @@ def test_node_volume_expand(fs_type):
volume_size = 1024 ** 3
pod_name = "example-pod"
pod_id = "deadbeef"
access_type = "mount"
env.csi.create_volume(name=volume_name, size=volume_size)

env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id, volume_name, pod_name, fs_type)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id, volume_name, pod_name, access_type, fs_type)

new_volume_size = 2 * volume_size
env.csi.expand_volume(pod_id, volume_name, new_volume_size)
env.csi.expand_volume(pod_id, volume_name, new_volume_size, access_type)

stats = env.csi.volumestats(pod_id, volume_name)
assert "usage" in stats
Expand All @@ -451,22 +471,22 @@ def test_node_volume_expand(fs_type):
assert bytes_usage["total"] // 1000 ** 3 == 2

# check that expand_volume is idempotent method
env.csi.expand_volume(pod_id, volume_name, new_volume_size)
env.csi.expand_volume(pod_id, volume_name, new_volume_size, access_type)
except subprocess.CalledProcessError as e:
log_called_process_error(e)
raise
finally:
with called_process_error_logged():
env.csi.unpublish_volume(pod_id, volume_name)
env.csi.unpublish_volume(pod_id, volume_name, access_type)
with called_process_error_logged():
env.csi.unstage_volume(volume_name)
with called_process_error_logged():
env.csi.delete_volume(volume_name)
cleanup_after_test(env)


@pytest.mark.parametrize('vm_mode', [True, False])
def test_publish_volume_twice_on_the_same_node(vm_mode):
@pytest.mark.parametrize('access_type,vm_mode', [("mount", True), ("mount", False), ("block", True)])
def test_publish_volume_twice_on_the_same_node(access_type, vm_mode):
env, run = init(vm_mode=vm_mode)
try:
volume_name = "example-disk"
Expand All @@ -476,25 +496,27 @@ def test_publish_volume_twice_on_the_same_node(vm_mode):
pod_id1 = "deadbeef1"
pod_id2 = "deadbeef2"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
env.csi.publish_volume(pod_id2, volume_name, pod_name2)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
env.csi.publish_volume(pod_id2, volume_name, pod_name2, access_type)
except subprocess.CalledProcessError as e:
log_called_process_error(e)
raise
finally:
with called_process_error_logged():
env.csi.unpublish_volume(pod_id1, volume_name)
env.csi.unpublish_volume(pod_id1, volume_name, access_type)
with called_process_error_logged():
env.csi.unpublish_volume(pod_id2, volume_name)
env.csi.unpublish_volume(pod_id2, volume_name, access_type)
with called_process_error_logged():
env.csi.unstage_volume(volume_name)
with called_process_error_logged():
env.csi.delete_volume(volume_name)
cleanup_after_test(env)


def test_restart_kubelet_with_old_format_endpoint():
# test can be removed after migration of all endpoints to the new format
@pytest.mark.parametrize('access_type', ["mount", "block"])
def test_restart_kubelet_with_old_format_endpoint(access_type):
env, run = init()
try:
volume_name = "example-disk"
Expand All @@ -503,42 +525,43 @@ def test_restart_kubelet_with_old_format_endpoint():
pod_id1 = "deadbeef1"
env.csi.create_volume(name=volume_name, size=volume_size)
# skip stage to create endpoint with old format
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
# run stage/publish again to simulate kubelet restart
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
except subprocess.CalledProcessError as e:
log_called_process_error(e)
raise
finally:
with called_process_error_logged():
env.csi.unpublish_volume(pod_id1, volume_name)
env.csi.unpublish_volume(pod_id1, volume_name, access_type)
with called_process_error_logged():
env.csi.unstage_volume(volume_name)
with called_process_error_logged():
env.csi.delete_volume(volume_name)
cleanup_after_test(env)


def test_restart_kubelet_with_new_format_endpoint():
@pytest.mark.parametrize('access_type', ["mount", "block"])
def test_restart_kubelet_with_new_format_endpoint(access_type):
env, run = init()
try:
volume_name = "example-disk"
volume_size = 1024 ** 3
pod_name1 = "example-pod-1"
pod_id1 = "deadbeef1"
env.csi.create_volume(name=volume_name, size=volume_size)
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
# run stage/publish again to simulate kubelet restart
env.csi.stage_volume(volume_name)
env.csi.publish_volume(pod_id1, volume_name, pod_name1)
env.csi.stage_volume(volume_name, access_type)
env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type)
except subprocess.CalledProcessError as e:
log_called_process_error(e)
raise
finally:
with called_process_error_logged():
env.csi.unpublish_volume(pod_id1, volume_name)
env.csi.unpublish_volume(pod_id1, volume_name, access_type)
with called_process_error_logged():
env.csi.unstage_volume(volume_name)
with called_process_error_logged():
Expand Down
Loading

0 comments on commit 61eaf8b

Please sign in to comment.