Skip to content

Commit

Permalink
Do not delete bdev on update after changing a namespace load balancin…
Browse files Browse the repository at this point in the history
…g group

Fixes #730

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Jun 23, 2024
1 parent 2145b4b commit 404b0ec
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 77 deletions.
2 changes: 1 addition & 1 deletion control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ def store_async(self, conn, data, cmd_id):
self_conn.recv_async = True
self_conn.async_cmd_id = cmd_id

def _state_notify_update(self, update, is_add_req):
def _state_notify_update(self, update, is_add_req, is_changed):
"""Notify and reply async event."""

should_send_async_event = False
Expand Down
138 changes: 75 additions & 63 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,20 +251,20 @@ def _alloc_cluster(self, anagrp: int) -> str:
self.cluster_nonce[name] = nonce
return name

def _grpc_function_with_lock(self, func, request, context):
def _grpc_function_with_lock(self, func, request, context, extra):
with self.rpc_lock:
rc = func(request, context)
rc = func(request, context, extra)
if not self.omap_lock.omap_file_disable_unlock:
assert not self.omap_lock.locked(), f"OMAP is still locked when we're out of function {func}"
return rc

def execute_grpc_function(self, func, request, context):
def execute_grpc_function(self, func, request, context, extra=None):
"""This functions handles RPC lock by wrapping 'func' with
self._grpc_function_with_lock, and assumes (?!) the function 'func'
called might take OMAP lock internally, however does NOT ensure
taking OMAP lock in any way.
"""
return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context)
return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context, extra)

def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, context, peer_msg = ""):
"""Creates a bdev from an RBD image."""
Expand Down Expand Up @@ -500,7 +500,7 @@ def get_peer_message(self, context) -> str:

return ""

def create_subsystem_safe(self, request, context):
def create_subsystem_safe(self, request, context, extra):
"""Creates a subsystem."""

create_subsystem_error_prefix = f"Failure creating subsystem {request.subsystem_nqn}"
Expand Down Expand Up @@ -651,7 +651,7 @@ def remove_subsystem_from_state(self, nqn, context):
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)
return pb2.req_status(status=0, error_message=os.strerror(0))

def delete_subsystem_safe(self, request, context):
def delete_subsystem_safe(self, request, context, extra):
"""Deletes a subsystem."""

delete_subsystem_error_prefix = f"Failure deleting subsystem {request.subsystem_nqn}"
Expand Down Expand Up @@ -820,7 +820,7 @@ def get_ns_id_message(self, nsid, uuid):
def set_ana_state(self, request, context=None):
return self.execute_grpc_function(self.set_ana_state_safe, request, context)

def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
def set_ana_state_safe(self, ana_info: pb2.ana_info, context, extra):
peer_msg = self.get_peer_message(context)
"""Sets ana state for this gateway."""
self.logger.info(f"Received request to set ana states {ana_info.states}, {peer_msg}")
Expand Down Expand Up @@ -911,14 +911,15 @@ def choose_anagrpid_for_namespace(self, nsid) ->int:
self.logger.info(f"Found min loaded cluster: chosen ana group {chosen_ana_group} for ns {nsid} ")
return chosen_ana_group

def namespace_add_safe(self, request, context):
def namespace_add_safe(self, request, context, should_create_bdev):
"""Adds a namespace to a subsystem."""

grps_list = []
anagrp = 0
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")
bdev_msg = "a new bdev will be created" if should_create_bdev else "no new bdev will be created"
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid}, {bdev_msg} context: {context}{peer_msg}")

if not request.uuid:
request.uuid = str(uuid.uuid4())
Expand Down Expand Up @@ -959,26 +960,27 @@ def namespace_add_safe(self, request, context):
request.anagrpid = anagrp

anagrp = request.anagrpid
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size, context, peer_msg)
if ret_bdev.status != 0:
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}"
self.logger.error(errmsg)
# Delete the bdev just to be on the safe side
ns_bdev = self.get_bdev_info(bdev_name, False)
if ns_bdev != None:
try:
ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg)
self.logger.debug(f"delete_bdev({bdev_name}): {ret_del.status}")
except AssertionError:
self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}")
raise
except Exception:
self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}")
return pb2.nsid_status(status=ret_bdev.status, error_message=errmsg)
if should_create_bdev:
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size, context, peer_msg)
if ret_bdev.status != 0:
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}"
self.logger.error(errmsg)
# Delete the bdev just to be on the safe side
ns_bdev = self.get_bdev_info(bdev_name, False)
if ns_bdev != None:
try:
ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg)
self.logger.debug(f"delete_bdev({bdev_name}): {ret_del.status}")
except AssertionError:
self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}")
raise
except Exception:
self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}")
return pb2.nsid_status(status=ret_bdev.status, error_message=errmsg)

if ret_bdev.bdev_name != bdev_name:
self.logger.warning(f"Returned bdev name {ret_bdev.bdev_name} differs from requested one {bdev_name}")
if ret_bdev.bdev_name != bdev_name:
self.logger.warning(f"Returned bdev name {ret_bdev.bdev_name} differs from requested one {bdev_name}")

ret_ns = self.create_namespace(request.subsystem_nqn, bdev_name, request.nsid, anagrp, request.uuid, context)

Expand All @@ -989,15 +991,16 @@ def namespace_add_safe(self, request, context):
ret_ns.error_message = errmsg

if ret_ns.status != 0:
try:
ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg)
if ret_del.status != 0:
self.logger.warning(f"Failure {ret_del.status} deleting bdev {bdev_name}: {ret_del.error_message}")
except AssertionError:
self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}")
raise
except Exception:
self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}")
if should_create_bdev:
try:
ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg)
if ret_del.status != 0:
self.logger.warning(f"Failure {ret_del.status} deleting bdev {bdev_name}: {ret_del.error_message}")
except AssertionError:
self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}")
raise
except Exception:
self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}")
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}:{ret_ns.error_message}"
self.logger.error(errmsg)
return pb2.nsid_status(status=ret_ns.status, error_message=errmsg)
Expand All @@ -1019,9 +1022,13 @@ def namespace_add_safe(self, request, context):

def namespace_add(self, request, context=None):
"""Adds a namespace to a subsystem."""
return self.execute_grpc_function(self.namespace_add_safe, request, context)
return self.execute_grpc_function(self.namespace_add_safe, request, context, True)

def namespace_add_no_bdev(self, request, context=None):
"""Adds a namespace to a subsystem without creating a new bdev."""
return self.execute_grpc_function(self.namespace_add_safe, request, context, False)

def namespace_change_load_balancing_group_safe(self, request, context):
def namespace_change_load_balancing_group_safe(self, request, context, extra):
"""Changes a namespace load balancing group."""

grps_list = []
Expand Down Expand Up @@ -1051,13 +1058,15 @@ def namespace_change_load_balancing_group_safe(self, request, context):
try:
nsid = find_ret[0]["nsid"]
except KeyError:
nsid = 0
self.logger.warning(f"Can't get NSID value for namespace {nsid_msg}in {request.subsystem_nqn}:\n{find_ret[0]}")
errmsg = f"Can't get NSID value for namespace {nsid_msg}in {request.subsystem_nqn}:\n{find_ret[0]}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

if request.nsid and request.nsid != nsid:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Returned NSID {nsid} differs from requested one {request.nsid}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
request.nsid = nsid

if request.uuid and request.uuid != uuid:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Returned UUID {uuid} differs from requested one {request.uuid}"
Expand Down Expand Up @@ -1092,13 +1101,11 @@ def namespace_change_load_balancing_group_safe(self, request, context):
self.logger.error(errmsg)
return pb2.req_status(status=ret_del.status, error_message=errmsg)

if nsid:
self.remove_namespace_from_state(request.subsystem_nqn, nsid, context)

ret_ns = self.create_namespace(request.subsystem_nqn, bdev_name, nsid, request.anagrpid, uuid, context)
if ret_ns.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}:{ret_ns.error_message}"
self.logger.error(errmsg)
self.remove_namespace_from_state(request.subsystem_nqn, nsid, context)
return pb2.req_status(status=ret_ns.status, error_message=errmsg)

if context:
Expand Down Expand Up @@ -1424,7 +1431,7 @@ def get_qos_limits_string(self, request):

return limits_to_set

def namespace_set_qos_limits_safe(self, request, context):
def namespace_set_qos_limits_safe(self, request, context, extra):
"""Set namespace's qos limits."""

peer_msg = self.get_peer_message(context)
Expand Down Expand Up @@ -1605,12 +1612,13 @@ def namespace_resize(self, request, context=None):

return pb2.req_status(status=ret.status, error_message=errmsg)

def namespace_delete_safe(self, request, context):
def namespace_delete_safe(self, request, context, del_also_bdev):
"""Delete a namespace."""

peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn}, context: {context}{peer_msg}")
bdev_msg = "and its bdev" if del_also_bdev else "but not its bdev"
self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn} {bdev_msg}, context: {context}{peer_msg}")

omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
Expand All @@ -1621,7 +1629,7 @@ def namespace_delete_safe(self, request, context):
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
bdev_name = find_ret[1]
if not bdev_name:
if del_also_bdev and not bdev_name:
self.logger.warning(f"Can't find namespace's bdev name, will try to delete namespace anyway")

ns = find_ret[0]
Expand All @@ -1631,7 +1639,7 @@ def namespace_delete_safe(self, request, context):
return ret

self.remove_namespace_from_state(request.subsystem_nqn, nsid, context)
if bdev_name:
if del_also_bdev and bdev_name:
ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg)
if ret_del.status != 0:
errmsg = f"Failure deleting namespace {nsid_msg}from {request.subsystem_nqn}: {ret_del.error_message}"
Expand All @@ -1642,7 +1650,11 @@ def namespace_delete_safe(self, request, context):

def namespace_delete(self, request, context=None):
"""Delete a namespace."""
return self.execute_grpc_function(self.namespace_delete_safe, request, context)
return self.execute_grpc_function(self.namespace_delete_safe, request, context, True)

def namespace_delete_no_bdev(self, request, context=None):
"""Delete a namespace but not its bdev."""
return self.execute_grpc_function(self.namespace_delete_safe, request, context, False)

def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool:
if not context:
Expand All @@ -1654,7 +1666,7 @@ def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool:
else:
return False

def add_host_safe(self, request, context):
def add_host_safe(self, request, context, extra):
"""Adds a host to a subsystem."""

peer_msg = self.get_peer_message(context)
Expand Down Expand Up @@ -1782,7 +1794,7 @@ def remove_host_from_state(self, subsystem_nqn, host_nqn, context):
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)
return pb2.req_status(status=0, error_message=os.strerror(0))

def remove_host_safe(self, request, context):
def remove_host_safe(self, request, context, extra):
"""Removes a host from a subsystem."""

peer_msg = self.get_peer_message(context)
Expand Down Expand Up @@ -1862,7 +1874,7 @@ def remove_host_safe(self, request, context):
def remove_host(self, request, context=None):
return self.execute_grpc_function(self.remove_host_safe, request, context)

def list_hosts_safe(self, request, context):
def list_hosts_safe(self, request, context, extra):
"""List hosts."""

peer_msg = self.get_peer_message(context)
Expand Down Expand Up @@ -1908,7 +1920,7 @@ def list_hosts_safe(self, request, context):
def list_hosts(self, request, context=None):
return self.execute_grpc_function(self.list_hosts_safe, request, context)

def list_connections_safe(self, request, context):
def list_connections_safe(self, request, context, extra):
"""List connections."""

peer_msg = self.get_peer_message(context)
Expand Down Expand Up @@ -2052,7 +2064,7 @@ def get_subsystem_ha_status(self, nqn) -> bool:
self.logger.debug(f"Subsystem {nqn} enable_ha: {enable_ha}")
return enable_ha

def create_listener_safe(self, request, context):
def create_listener_safe(self, request, context, extra):
"""Creates a listener for a subsystem at a given IP/Port."""

ret = True
Expand Down Expand Up @@ -2235,7 +2247,7 @@ def remove_listener_from_state(self, nqn, host_name, traddr, port, context):

return req_status

def delete_listener_safe(self, request, context):
def delete_listener_safe(self, request, context, extra):
"""Deletes a listener from a subsystem at a given IP/Port."""

ret = True
Expand Down Expand Up @@ -2268,7 +2280,7 @@ def delete_listener_safe(self, request, context):

if not request.force:
list_conn_req = pb2.list_connections_req(subsystem=request.nqn)
list_conn_ret = self.list_connections_safe(list_conn_req, context)
list_conn_ret = self.list_connections_safe(list_conn_req, context, extra)
if list_conn_ret.status != 0:
errmsg=f"{delete_listener_error_prefix}. Can't verify there are no active connections for this address"
self.logger.error(errmsg)
Expand Down Expand Up @@ -2330,7 +2342,7 @@ def delete_listener_safe(self, request, context):
def delete_listener(self, request, context=None):
return self.execute_grpc_function(self.delete_listener_safe, request, context)

def list_listeners_safe(self, request, context):
def list_listeners_safe(self, request, context, extra):
"""List listeners."""

peer_msg = self.get_peer_message(context)
Expand Down Expand Up @@ -2365,7 +2377,7 @@ def list_listeners_safe(self, request, context):
def list_listeners(self, request, context=None):
return self.execute_grpc_function(self.list_listeners_safe, request, context)

def list_subsystems_safe(self, request, context):
def list_subsystems_safe(self, request, context, extra):
"""List subsystems."""

peer_msg = self.get_peer_message(context)
Expand Down Expand Up @@ -2459,7 +2471,7 @@ def get_subsystems(self, request, context):
def list_subsystems(self, request, context=None):
return self.execute_grpc_function(self.list_subsystems_safe, request, context)

def get_spdk_nvmf_log_flags_and_level_safe(self, request, context):
def get_spdk_nvmf_log_flags_and_level_safe(self, request, context, extra):
"""Gets spdk nvmf log flags, log level and log print level"""
peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to get SPDK nvmf log flags and level{peer_msg}")
Expand Down Expand Up @@ -2498,7 +2510,7 @@ def get_spdk_nvmf_log_flags_and_level_safe(self, request, context):
def get_spdk_nvmf_log_flags_and_level(self, request, context=None):
return self.execute_grpc_function(self.get_spdk_nvmf_log_flags_and_level_safe, request, context)

def set_spdk_nvmf_logs_safe(self, request, context):
def set_spdk_nvmf_logs_safe(self, request, context, extra):
"""Enables spdk nvmf logs"""
log_level = None
print_level = None
Expand Down Expand Up @@ -2566,7 +2578,7 @@ def set_spdk_nvmf_logs_safe(self, request, context):
def set_spdk_nvmf_logs(self, request, context=None):
return self.execute_grpc_function(self.set_spdk_nvmf_logs_safe, request, context)

def disable_spdk_nvmf_logs_safe(self, request, context):
def disable_spdk_nvmf_logs_safe(self, request, context, extra):
"""Disables spdk nvmf logs"""
peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to disable SPDK nvmf logs{peer_msg}")
Expand Down Expand Up @@ -2616,7 +2628,7 @@ def parse_version(self, version):
return None
return (v1, v2, v3)

def get_gateway_info_safe(self, request, context):
def get_gateway_info_safe(self, request, context, extra):
"""Get gateway's info"""

peer_msg = self.get_peer_message(context)
Expand Down
Loading

0 comments on commit 404b0ec

Please sign in to comment.