From 404b0ecfea5f1cc87520b2b84aee07da18f5aec7 Mon Sep 17 00:00:00 2001 From: Gil Bregman Date: Sun, 23 Jun 2024 15:44:48 +0300 Subject: [PATCH] Do not delete bdev on update after changing a namespace load balancing group Fixes #730 Signed-off-by: Gil Bregman --- control/discovery.py | 2 +- control/grpc.py | 138 +++++++++++++++++++++++-------------------- control/server.py | 12 +++- control/state.py | 20 ++++--- tests/test_state.py | 2 +- 5 files changed, 97 insertions(+), 77 deletions(-) diff --git a/control/discovery.py b/control/discovery.py index cecb703b6..a06a654ce 100644 --- a/control/discovery.py +++ b/control/discovery.py @@ -882,7 +882,7 @@ def store_async(self, conn, data, cmd_id): self_conn.recv_async = True self_conn.async_cmd_id = cmd_id - def _state_notify_update(self, update, is_add_req): + def _state_notify_update(self, update, is_add_req, is_changed): """Notify and reply async event.""" should_send_async_event = False diff --git a/control/grpc.py b/control/grpc.py index 58fc956fc..06e6ad1b2 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -251,20 +251,20 @@ def _alloc_cluster(self, anagrp: int) -> str: self.cluster_nonce[name] = nonce return name - def _grpc_function_with_lock(self, func, request, context): + def _grpc_function_with_lock(self, func, request, context, extra): with self.rpc_lock: - rc = func(request, context) + rc = func(request, context, extra) if not self.omap_lock.omap_file_disable_unlock: assert not self.omap_lock.locked(), f"OMAP is still locked when we're out of function {func}" return rc - def execute_grpc_function(self, func, request, context): + def execute_grpc_function(self, func, request, context, extra=None): """This functions handles RPC lock by wrapping 'func' with self._grpc_function_with_lock, and assumes (?!) the function 'func' called might take OMAP lock internally, however does NOT ensure taking OMAP lock in any way. """ - return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context) + return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context, extra) def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, context, peer_msg = ""): """Creates a bdev from an RBD image.""" @@ -500,7 +500,7 @@ def get_peer_message(self, context) -> str: return "" - def create_subsystem_safe(self, request, context): + def create_subsystem_safe(self, request, context, extra): """Creates a subsystem.""" create_subsystem_error_prefix = f"Failure creating subsystem {request.subsystem_nqn}" @@ -651,7 +651,7 @@ def remove_subsystem_from_state(self, nqn, context): return pb2.req_status(status=errno.EINVAL, error_message=errmsg) return pb2.req_status(status=0, error_message=os.strerror(0)) - def delete_subsystem_safe(self, request, context): + def delete_subsystem_safe(self, request, context, extra): """Deletes a subsystem.""" delete_subsystem_error_prefix = f"Failure deleting subsystem {request.subsystem_nqn}" @@ -820,7 +820,7 @@ def get_ns_id_message(self, nsid, uuid): def set_ana_state(self, request, context=None): return self.execute_grpc_function(self.set_ana_state_safe, request, context) - def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): + def set_ana_state_safe(self, ana_info: pb2.ana_info, context, extra): peer_msg = self.get_peer_message(context) """Sets ana state for this gateway.""" self.logger.info(f"Received request to set ana states {ana_info.states}, {peer_msg}") @@ -911,14 +911,15 @@ def choose_anagrpid_for_namespace(self, nsid) ->int: self.logger.info(f"Found min loaded cluster: chosen ana group {chosen_ana_group} for ns {nsid} ") return chosen_ana_group - def namespace_add_safe(self, request, context): + def namespace_add_safe(self, request, context, should_create_bdev): """Adds a namespace to a subsystem.""" grps_list = [] anagrp = 0 peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}") + bdev_msg = "a new bdev will be created" if should_create_bdev else "no new bdev will be created" + self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid}, {bdev_msg} context: {context}{peer_msg}") if not request.uuid: request.uuid = str(uuid.uuid4()) @@ -959,26 +960,27 @@ def namespace_add_safe(self, request, context): request.anagrpid = anagrp anagrp = request.anagrpid - ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name, - request.rbd_image_name, request.block_size, create_image, request.size, context, peer_msg) - if ret_bdev.status != 0: - errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}" - self.logger.error(errmsg) - # Delete the bdev just to be on the safe side - ns_bdev = self.get_bdev_info(bdev_name, False) - if ns_bdev != None: - try: - ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg) - self.logger.debug(f"delete_bdev({bdev_name}): {ret_del.status}") - except AssertionError: - self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}") - raise - except Exception: - self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}") - return pb2.nsid_status(status=ret_bdev.status, error_message=errmsg) + if should_create_bdev: + ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name, + request.rbd_image_name, request.block_size, create_image, request.size, context, peer_msg) + if ret_bdev.status != 0: + errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}" + self.logger.error(errmsg) + # Delete the bdev just to be on the safe side + ns_bdev = self.get_bdev_info(bdev_name, False) + if ns_bdev != None: + try: + ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg) + self.logger.debug(f"delete_bdev({bdev_name}): {ret_del.status}") + except AssertionError: + self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}") + raise + except Exception: + self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}") + return pb2.nsid_status(status=ret_bdev.status, error_message=errmsg) - if ret_bdev.bdev_name != bdev_name: - self.logger.warning(f"Returned bdev name {ret_bdev.bdev_name} differs from requested one {bdev_name}") + if ret_bdev.bdev_name != bdev_name: + self.logger.warning(f"Returned bdev name {ret_bdev.bdev_name} differs from requested one {bdev_name}") ret_ns = self.create_namespace(request.subsystem_nqn, bdev_name, request.nsid, anagrp, request.uuid, context) @@ -989,15 +991,16 @@ def namespace_add_safe(self, request, context): ret_ns.error_message = errmsg if ret_ns.status != 0: - try: - ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg) - if ret_del.status != 0: - self.logger.warning(f"Failure {ret_del.status} deleting bdev {bdev_name}: {ret_del.error_message}") - except AssertionError: - self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}") - raise - except Exception: - self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}") + if should_create_bdev: + try: + ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg) + if ret_del.status != 0: + self.logger.warning(f"Failure {ret_del.status} deleting bdev {bdev_name}: {ret_del.error_message}") + except AssertionError: + self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}") + raise + except Exception: + self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}") errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}:{ret_ns.error_message}" self.logger.error(errmsg) return pb2.nsid_status(status=ret_ns.status, error_message=errmsg) @@ -1019,9 +1022,13 @@ def namespace_add_safe(self, request, context): def namespace_add(self, request, context=None): """Adds a namespace to a subsystem.""" - return self.execute_grpc_function(self.namespace_add_safe, request, context) + return self.execute_grpc_function(self.namespace_add_safe, request, context, True) + + def namespace_add_no_bdev(self, request, context=None): + """Adds a namespace to a subsystem without creating a new bdev.""" + return self.execute_grpc_function(self.namespace_add_safe, request, context, False) - def namespace_change_load_balancing_group_safe(self, request, context): + def namespace_change_load_balancing_group_safe(self, request, context, extra): """Changes a namespace load balancing group.""" grps_list = [] @@ -1051,13 +1058,15 @@ def namespace_change_load_balancing_group_safe(self, request, context): try: nsid = find_ret[0]["nsid"] except KeyError: - nsid = 0 - self.logger.warning(f"Can't get NSID value for namespace {nsid_msg}in {request.subsystem_nqn}:\n{find_ret[0]}") + errmsg = f"Can't get NSID value for namespace {nsid_msg}in {request.subsystem_nqn}:\n{find_ret[0]}" + self.logger.error(errmsg) + return pb2.req_status(status=errno.ENODEV, error_message=errmsg) if request.nsid and request.nsid != nsid: errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Returned NSID {nsid} differs from requested one {request.nsid}" self.logger.error(errmsg) return pb2.req_status(status=errno.ENODEV, error_message=errmsg) + request.nsid = nsid if request.uuid and request.uuid != uuid: errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Returned UUID {uuid} differs from requested one {request.uuid}" @@ -1092,13 +1101,11 @@ def namespace_change_load_balancing_group_safe(self, request, context): self.logger.error(errmsg) return pb2.req_status(status=ret_del.status, error_message=errmsg) - if nsid: - self.remove_namespace_from_state(request.subsystem_nqn, nsid, context) - ret_ns = self.create_namespace(request.subsystem_nqn, bdev_name, nsid, request.anagrpid, uuid, context) if ret_ns.status != 0: errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}:{ret_ns.error_message}" self.logger.error(errmsg) + self.remove_namespace_from_state(request.subsystem_nqn, nsid, context) return pb2.req_status(status=ret_ns.status, error_message=errmsg) if context: @@ -1424,7 +1431,7 @@ def get_qos_limits_string(self, request): return limits_to_set - def namespace_set_qos_limits_safe(self, request, context): + def namespace_set_qos_limits_safe(self, request, context, extra): """Set namespace's qos limits.""" peer_msg = self.get_peer_message(context) @@ -1605,12 +1612,13 @@ def namespace_resize(self, request, context=None): return pb2.req_status(status=ret.status, error_message=errmsg) - def namespace_delete_safe(self, request, context): + def namespace_delete_safe(self, request, context, del_also_bdev): """Delete a namespace.""" peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn}, context: {context}{peer_msg}") + bdev_msg = "and its bdev" if del_also_bdev else "but not its bdev" + self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn} {bdev_msg}, context: {context}{peer_msg}") omap_lock = self.omap_lock.get_omap_lock_to_use(context) with omap_lock: @@ -1621,7 +1629,7 @@ def namespace_delete_safe(self, request, context): self.logger.error(errmsg) return pb2.req_status(status=errno.ENODEV, error_message=errmsg) bdev_name = find_ret[1] - if not bdev_name: + if del_also_bdev and not bdev_name: self.logger.warning(f"Can't find namespace's bdev name, will try to delete namespace anyway") ns = find_ret[0] @@ -1631,7 +1639,7 @@ def namespace_delete_safe(self, request, context): return ret self.remove_namespace_from_state(request.subsystem_nqn, nsid, context) - if bdev_name: + if del_also_bdev and bdev_name: ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg) if ret_del.status != 0: errmsg = f"Failure deleting namespace {nsid_msg}from {request.subsystem_nqn}: {ret_del.error_message}" @@ -1642,7 +1650,11 @@ def namespace_delete_safe(self, request, context): def namespace_delete(self, request, context=None): """Delete a namespace.""" - return self.execute_grpc_function(self.namespace_delete_safe, request, context) + return self.execute_grpc_function(self.namespace_delete_safe, request, context, True) + + def namespace_delete_no_bdev(self, request, context=None): + """Delete a namespace but not its bdev.""" + return self.execute_grpc_function(self.namespace_delete_safe, request, context, False) def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool: if not context: @@ -1654,7 +1666,7 @@ def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool: else: return False - def add_host_safe(self, request, context): + def add_host_safe(self, request, context, extra): """Adds a host to a subsystem.""" peer_msg = self.get_peer_message(context) @@ -1782,7 +1794,7 @@ def remove_host_from_state(self, subsystem_nqn, host_nqn, context): return pb2.req_status(status=errno.EINVAL, error_message=errmsg) return pb2.req_status(status=0, error_message=os.strerror(0)) - def remove_host_safe(self, request, context): + def remove_host_safe(self, request, context, extra): """Removes a host from a subsystem.""" peer_msg = self.get_peer_message(context) @@ -1862,7 +1874,7 @@ def remove_host_safe(self, request, context): def remove_host(self, request, context=None): return self.execute_grpc_function(self.remove_host_safe, request, context) - def list_hosts_safe(self, request, context): + def list_hosts_safe(self, request, context, extra): """List hosts.""" peer_msg = self.get_peer_message(context) @@ -1908,7 +1920,7 @@ def list_hosts_safe(self, request, context): def list_hosts(self, request, context=None): return self.execute_grpc_function(self.list_hosts_safe, request, context) - def list_connections_safe(self, request, context): + def list_connections_safe(self, request, context, extra): """List connections.""" peer_msg = self.get_peer_message(context) @@ -2052,7 +2064,7 @@ def get_subsystem_ha_status(self, nqn) -> bool: self.logger.debug(f"Subsystem {nqn} enable_ha: {enable_ha}") return enable_ha - def create_listener_safe(self, request, context): + def create_listener_safe(self, request, context, extra): """Creates a listener for a subsystem at a given IP/Port.""" ret = True @@ -2235,7 +2247,7 @@ def remove_listener_from_state(self, nqn, host_name, traddr, port, context): return req_status - def delete_listener_safe(self, request, context): + def delete_listener_safe(self, request, context, extra): """Deletes a listener from a subsystem at a given IP/Port.""" ret = True @@ -2268,7 +2280,7 @@ def delete_listener_safe(self, request, context): if not request.force: list_conn_req = pb2.list_connections_req(subsystem=request.nqn) - list_conn_ret = self.list_connections_safe(list_conn_req, context) + list_conn_ret = self.list_connections_safe(list_conn_req, context, extra) if list_conn_ret.status != 0: errmsg=f"{delete_listener_error_prefix}. Can't verify there are no active connections for this address" self.logger.error(errmsg) @@ -2330,7 +2342,7 @@ def delete_listener_safe(self, request, context): def delete_listener(self, request, context=None): return self.execute_grpc_function(self.delete_listener_safe, request, context) - def list_listeners_safe(self, request, context): + def list_listeners_safe(self, request, context, extra): """List listeners.""" peer_msg = self.get_peer_message(context) @@ -2365,7 +2377,7 @@ def list_listeners_safe(self, request, context): def list_listeners(self, request, context=None): return self.execute_grpc_function(self.list_listeners_safe, request, context) - def list_subsystems_safe(self, request, context): + def list_subsystems_safe(self, request, context, extra): """List subsystems.""" peer_msg = self.get_peer_message(context) @@ -2459,7 +2471,7 @@ def get_subsystems(self, request, context): def list_subsystems(self, request, context=None): return self.execute_grpc_function(self.list_subsystems_safe, request, context) - def get_spdk_nvmf_log_flags_and_level_safe(self, request, context): + def get_spdk_nvmf_log_flags_and_level_safe(self, request, context, extra): """Gets spdk nvmf log flags, log level and log print level""" peer_msg = self.get_peer_message(context) self.logger.info(f"Received request to get SPDK nvmf log flags and level{peer_msg}") @@ -2498,7 +2510,7 @@ def get_spdk_nvmf_log_flags_and_level_safe(self, request, context): def get_spdk_nvmf_log_flags_and_level(self, request, context=None): return self.execute_grpc_function(self.get_spdk_nvmf_log_flags_and_level_safe, request, context) - def set_spdk_nvmf_logs_safe(self, request, context): + def set_spdk_nvmf_logs_safe(self, request, context, extra): """Enables spdk nvmf logs""" log_level = None print_level = None @@ -2566,7 +2578,7 @@ def set_spdk_nvmf_logs_safe(self, request, context): def set_spdk_nvmf_logs(self, request, context=None): return self.execute_grpc_function(self.set_spdk_nvmf_logs_safe, request, context) - def disable_spdk_nvmf_logs_safe(self, request, context): + def disable_spdk_nvmf_logs_safe(self, request, context, extra): """Disables spdk nvmf logs""" peer_msg = self.get_peer_message(context) self.logger.info(f"Received request to disable SPDK nvmf logs{peer_msg}") @@ -2616,7 +2628,7 @@ def parse_version(self, version): return None return (v1, v2, v3) - def get_gateway_info_safe(self, request, context): + def get_gateway_info_safe(self, request, context, extra): """Get gateway's info""" peer_msg = self.get_peer_message(context) diff --git a/control/server.py b/control/server.py index 255ad5997..47b73df41 100644 --- a/control/server.py +++ b/control/server.py @@ -530,7 +530,7 @@ def _ping(self): self.logger.exception(f"spdk_get_version failed") return False - def gateway_rpc_caller(self, requests, is_add_req): + def gateway_rpc_caller(self, requests, is_add_req, is_changed): """Passes RPC requests to gateway service.""" for key, val in requests.items(): if key.startswith(GatewayState.SUBSYSTEM_PREFIX): @@ -545,12 +545,18 @@ def gateway_rpc_caller(self, requests, is_add_req): elif key.startswith(GatewayState.NAMESPACE_PREFIX): if is_add_req: req = json_format.Parse(val, pb2.namespace_add_req(), ignore_unknown_fields=True) - self.gateway_rpc.namespace_add(req) + if is_changed: + self.gateway_rpc.namespace_add_no_bdev(req) + else: + self.gateway_rpc.namespace_add(req) else: req = json_format.Parse(val, pb2.namespace_delete_req(), ignore_unknown_fields=True) - self.gateway_rpc.namespace_delete(req) + if is_changed: + self.gateway_rpc.namespace_delete_no_bdev(req) + else: + self.gateway_rpc.namespace_delete(req) elif key.startswith(GatewayState.NAMESPACE_QOS_PREFIX): if is_add_req: req = json_format.Parse(val, pb2.namespace_set_qos_req(), ignore_unknown_fields=True) diff --git a/control/state.py b/control/state.py index a3001661b..7e8cd9315 100644 --- a/control/state.py +++ b/control/state.py @@ -244,11 +244,11 @@ def get_omap_lock_to_use(self, context): # This function accepts a function in which there is Omap locking. It will execute this function # and in case the Omap is not current, will reload it and try again # - def execute_omap_locking_function(self, grpc_func, omap_locking_func, request, context): + def execute_omap_locking_function(self, grpc_func, omap_locking_func, request, context, extra): for i in range(0, self.omap_file_update_reloads + 1): need_to_update = False try: - return grpc_func(omap_locking_func, request, context) + return grpc_func(omap_locking_func, request, context, extra) except OSError as err: if err.errno == errno.EAGAIN: need_to_update = True @@ -714,13 +714,15 @@ def update(self) -> bool: grouped_removed = self._group_by_prefix(removed, prefix_list) # Handle OMAP removals and remove outdated changed components - grouped_removed.update(grouped_changed) if grouped_removed: - self._update_call_rpc(grouped_removed, False, prefix_list) + self._update_call_rpc(grouped_removed, False, False, prefix_list) + if grouped_changed: + self._update_call_rpc(grouped_changed, False, True, prefix_list) # Handle OMAP additions and add updated changed components - grouped_added.update(grouped_changed) if grouped_added: - self._update_call_rpc(grouped_added, True, prefix_list) + self._update_call_rpc(grouped_added, True, False, prefix_list) + if grouped_changed: + self._update_call_rpc(grouped_changed, True, True, prefix_list) # Update local state and version self.local.reset(omap_state_dict) @@ -738,15 +740,15 @@ def _group_by_prefix(self, state_update, prefix_list): grouped_state_update[prefix][key] = val return grouped_state_update - def _update_call_rpc(self, grouped_state_update, is_add_req, prefix_list): + def _update_call_rpc(self, grouped_state_update, is_add_req, is_changed, prefix_list): """Calls to initiate gateway RPCs in necessary component order.""" if is_add_req: for prefix in prefix_list: component_update = grouped_state_update.get(prefix, {}) if component_update: - self.gateway_rpc_caller(component_update, True) + self.gateway_rpc_caller(component_update, True, is_changed) else: for prefix in list(reversed(prefix_list)): component_update = grouped_state_update.get(prefix, {}) if component_update: - self.gateway_rpc_caller(component_update, False) + self.gateway_rpc_caller(component_update, False, is_changed) diff --git a/tests/test_state.py b/tests/test_state.py index ab2bf521a..29bfa0244 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -107,7 +107,7 @@ def test_state_notify_update(config, ioctx, local_state, omap_state): update_counter = 0 - def _state_notify_update(update, is_add_req): + def _state_notify_update(update, is_add_req, is_changed): nonlocal update_counter update_counter += 1 elapsed = time.time() - start