diff --git a/control/__main__.py b/control/__main__.py index 2c275decf..8c5076803 100644 --- a/control/__main__.py +++ b/control/__main__.py @@ -13,10 +13,13 @@ from .config import GatewayConfig from .utils import GatewayLogger +gw = None gw_logger = None gw_name = None def sigterm_handler(signum, frame): + if gw and gw.omap_state: + gw.omap_state.cleanup_omap() if gw_logger and gw_name: gw_logger.compress_final_log_file(gw_name) @@ -40,6 +43,7 @@ def sigterm_handler(signum, frame): config.display_environment_info(gw_logger.logger) config.dump_config_file(gw_logger.logger) with GatewayServer(config) as gateway: + gw = gateway gw_name = gateway.name gateway.serve() gateway.keep_alive() diff --git a/control/discovery.py b/control/discovery.py index e98c4d1a1..cecb703b6 100644 --- a/control/discovery.py +++ b/control/discovery.py @@ -302,7 +302,6 @@ class DiscoveryService: config: Basic gateway parameters logger: Logger instance to track discovery controller events omap_name: OMAP object name - ioctx: I/O context which allows OMAP access discovery_addr: Discovery controller addr which allows initiator send command discovery_port: Discovery controller's listening port """ @@ -321,7 +320,6 @@ def __init__(self, config): if gateway_group else "nvmeof.state" self.logger.info(f"log pages info from omap: {self.omap_name}") - self.ioctx = self.omap_state.open_rados_connection(config) self.discovery_addr = self.config.get_with_default("discovery", "addr", "0.0.0.0") self.discovery_port = self.config.get_with_default("discovery", "port", "8009") if not self.discovery_addr or not self.discovery_port: diff --git a/control/server.py b/control/server.py index 0fbbd6f14..255ad5997 100644 --- a/control/server.py +++ b/control/server.py @@ -83,6 +83,7 @@ def __init__(self, config: GatewayConfig): self.rpc_lock = threading.Lock() self.group_id = 0 self.monitor_client = '/usr/bin/ceph-nvmeof-monitor-client' + self.omap_state = None self.name = self.config.get("gateway", "name") if not self.name: @@ -127,6 +128,10 @@ def __exit__(self, exc_type, exc_value, traceback): if self.discovery_pid: self._stop_discovery() + if self.omap_state: + self.omap_state.cleanup_omap() + self.omap_state = None + if logger: logger.info("Exiting the gateway process.") gw_logger.compress_final_log_file(gw_name) @@ -161,6 +166,7 @@ def serve(self): self.logger.info(f"Starting serve, monitor client version: {self._monitor_client_version()}") omap_state = OmapGatewayState(self.config, f"gateway-{self.name}") + self.omap_state = omap_state local_state = LocalGatewayState() omap_state.check_for_old_format_omap_files() diff --git a/control/state.py b/control/state.py index de79feb4c..5928322b1 100644 --- a/control/state.py +++ b/control/state.py @@ -16,6 +16,7 @@ from collections import defaultdict from abc import ABC, abstractmethod from .utils import GatewayLogger +import atexit class GatewayState(ABC): """Persists gateway NVMeoF target state. @@ -269,6 +270,10 @@ def lock_omap(self): got_lock = False assert self.rpc_lock.locked(), "The RPC lock is not locked." + if not self.omap_state.ioctx: + self.logger.warning(f"Not locking OMAP as Rados connection is closed") + return + for i in range(0, self.omap_file_lock_retries + 1): try: self.omap_state.ioctx.lock_exclusive(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, @@ -310,6 +315,9 @@ def unlock_omap(self): self.logger.warning(f"OMAP file unlock was disabled, will not unlock file") return + if not self.omap_state.ioctx: + return + try: self.omap_state.ioctx.unlock(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE) self.is_locked = False @@ -347,6 +355,7 @@ def __init__(self, config, id_text=""): self.config = config self.version = 1 self.logger = GatewayLogger(self.config).logger + self.ioctx = None self.watch = None gateway_group = self.config.get("gateway", "group") self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state" @@ -369,11 +378,10 @@ def __init__(self, config, id_text=""): except Exception: self.logger.exception(f"Unable to create OMAP, exiting!") raise + atexit.register(self.cleanup_omap) def __exit__(self, exc_type, exc_value, traceback): - if self.watch is not None: - self.watch.close() - self.ioctx.close() + self.cleanup_omap() def check_for_old_format_omap_files(self): omap_dict = self.get_state() @@ -401,6 +409,10 @@ def set_local_version(self, version_update: int): def get_omap_version(self) -> int: """Returns OMAP version.""" + if not self.ioctx: + self.logger.warning(f"Trying to get OMAP version when Rados connection is closed") + return -1 + with rados.ReadOpCtx() as read_op: i, _ = self.ioctx.get_omap_vals_by_keys(read_op, (self.OMAP_VERSION_KEY,)) @@ -419,6 +431,9 @@ def get_state(self) -> Dict[str, str]: """Returns dict of all OMAP keys and values.""" omap_list = [("", 0)] # Dummy, non empty, list value. Just so we would enter the while omap_dict = {} + if not self.ioctx: + self.logger.warning(f"Trying to get OMAP state when Rados connection is closed") + return omap_dict # The number of items returned is limited by Ceph, so we need to read in a loop until no more items are returned while len(omap_list) > 0: last_key_read = omap_list[-1][0] @@ -431,6 +446,9 @@ def get_state(self) -> Dict[str, str]: def _add_key(self, key: str, val: str): """Adds key and value to the OMAP.""" + if not self.ioctx: + raise + try: version_update = self.version + 1 with rados.WriteOpCtx() as write_op: @@ -455,6 +473,9 @@ def _add_key(self, key: str, val: str): def _remove_key(self, key: str): """Removes key from the OMAP.""" + if not self.ioctx: + raise + try: version_update = self.version + 1 with rados.WriteOpCtx() as write_op: @@ -479,6 +500,9 @@ def _remove_key(self, key: str): def delete_state(self): """Deletes OMAP object contents.""" + if not self.ioctx: + raise + try: with rados.WriteOpCtx() as write_op: self.ioctx.clear_omap(write_op) @@ -497,6 +521,9 @@ def register_watch(self, notify_event): def _watcher_callback(notify_id, notifier_id, watch_id, data): notify_event.set() + if not self.ioctx: + return + if self.watch is None: try: self.watch = self.ioctx.watch(self.omap_name, _watcher_callback) @@ -505,6 +532,16 @@ def _watcher_callback(notify_id, notifier_id, watch_id, data): else: self.logger.info(f"Watch already exists.") + def cleanup_omap(self): + self.logger.info(f"Cleanup OMAP on exit ({self.id_text})") + if self.watch: + self.logger.debug("Unregistering watch") + self.watch.close() + self.watch = None + if self.ioctx: + self.logger.debug("Closing Rados connection") + self.ioctx.close() + self.ioctx = None class GatewayStateHandler: """Maintains consistency in NVMeoF target state store instances. @@ -635,6 +672,10 @@ def update(self) -> bool: self.logger.warning(f"An update is already running, ignore") return False + if not self.omap.ioctx: + self.logger.warning(f"Can't update when Rados connection is closed") + return False + with self.update_is_active_lock: prefix_list = [ GatewayState.SUBSYSTEM_PREFIX,