Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up Rados connection and notification watcher when we exit #729

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions control/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
from .config import GatewayConfig
from .utils import GatewayLogger

gw = None
gw_logger = None
gw_name = None

def sigterm_handler(signum, frame):
if gw and gw.omap_state:
gw.omap_state.cleanup_omap()
if gw_logger and gw_name:
gw_logger.compress_final_log_file(gw_name)

Expand All @@ -40,6 +43,7 @@ def sigterm_handler(signum, frame):
config.display_environment_info(gw_logger.logger)
config.dump_config_file(gw_logger.logger)
with GatewayServer(config) as gateway:
gw = gateway
gw_name = gateway.name
gateway.serve()
gateway.keep_alive()
2 changes: 0 additions & 2 deletions control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ class DiscoveryService:
config: Basic gateway parameters
logger: Logger instance to track discovery controller events
omap_name: OMAP object name
ioctx: I/O context which allows OMAP access
discovery_addr: Discovery controller addr which allows initiator send command
discovery_port: Discovery controller's listening port
"""
Expand All @@ -321,7 +320,6 @@ def __init__(self, config):
if gateway_group else "nvmeof.state"
self.logger.info(f"log pages info from omap: {self.omap_name}")

self.ioctx = self.omap_state.open_rados_connection(config)
self.discovery_addr = self.config.get_with_default("discovery", "addr", "0.0.0.0")
self.discovery_port = self.config.get_with_default("discovery", "port", "8009")
if not self.discovery_addr or not self.discovery_port:
Expand Down
6 changes: 6 additions & 0 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, config: GatewayConfig):
self.rpc_lock = threading.Lock()
self.group_id = 0
self.monitor_client = '/usr/bin/ceph-nvmeof-monitor-client'
self.omap_state = None

self.name = self.config.get("gateway", "name")
if not self.name:
Expand Down Expand Up @@ -127,6 +128,10 @@ def __exit__(self, exc_type, exc_value, traceback):
if self.discovery_pid:
self._stop_discovery()

if self.omap_state:
self.omap_state.cleanup_omap()
self.omap_state = None

if logger:
logger.info("Exiting the gateway process.")
gw_logger.compress_final_log_file(gw_name)
Expand Down Expand Up @@ -161,6 +166,7 @@ def serve(self):
self.logger.info(f"Starting serve, monitor client version: {self._monitor_client_version()}")

omap_state = OmapGatewayState(self.config, f"gateway-{self.name}")
self.omap_state = omap_state
local_state = LocalGatewayState()
omap_state.check_for_old_format_omap_files()

Expand Down
47 changes: 44 additions & 3 deletions control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from collections import defaultdict
from abc import ABC, abstractmethod
from .utils import GatewayLogger
import atexit

class GatewayState(ABC):
"""Persists gateway NVMeoF target state.
Expand Down Expand Up @@ -269,6 +270,10 @@ def lock_omap(self):
got_lock = False
assert self.rpc_lock.locked(), "The RPC lock is not locked."

if not self.omap_state.ioctx:
gbregman marked this conversation as resolved.
Show resolved Hide resolved
self.logger.warning(f"Not locking OMAP as Rados connection is closed")
raise Exception("An attempt to lock OMAP file after Rados connection was closed")

for i in range(0, self.omap_file_lock_retries + 1):
try:
self.omap_state.ioctx.lock_exclusive(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME,
Expand Down Expand Up @@ -310,6 +315,9 @@ def unlock_omap(self):
self.logger.warning(f"OMAP file unlock was disabled, will not unlock file")
return

if not self.omap_state.ioctx:
return

try:
self.omap_state.ioctx.unlock(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE)
self.is_locked = False
Expand Down Expand Up @@ -347,6 +355,7 @@ def __init__(self, config, id_text=""):
self.config = config
self.version = 1
self.logger = GatewayLogger(self.config).logger
self.ioctx = None
self.watch = None
gateway_group = self.config.get("gateway", "group")
self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state"
Expand All @@ -369,11 +378,10 @@ def __init__(self, config, id_text=""):
except Exception:
self.logger.exception(f"Unable to create OMAP, exiting!")
raise
atexit.register(self.cleanup_omap)

def __exit__(self, exc_type, exc_value, traceback):
if self.watch is not None:
self.watch.close()
self.ioctx.close()
self.cleanup_omap()

def check_for_old_format_omap_files(self):
omap_dict = self.get_state()
Expand Down Expand Up @@ -401,6 +409,10 @@ def set_local_version(self, version_update: int):

def get_omap_version(self) -> int:
"""Returns OMAP version."""
if not self.ioctx:
self.logger.warning(f"Trying to get OMAP version when Rados connection is closed")
return -1

with rados.ReadOpCtx() as read_op:
i, _ = self.ioctx.get_omap_vals_by_keys(read_op,
(self.OMAP_VERSION_KEY,))
Expand All @@ -419,6 +431,9 @@ def get_state(self) -> Dict[str, str]:
"""Returns dict of all OMAP keys and values."""
omap_list = [("", 0)] # Dummy, non empty, list value. Just so we would enter the while
omap_dict = {}
if not self.ioctx:
self.logger.warning(f"Trying to get OMAP state when Rados connection is closed")
return omap_dict
# The number of items returned is limited by Ceph, so we need to read in a loop until no more items are returned
while len(omap_list) > 0:
last_key_read = omap_list[-1][0]
Expand All @@ -431,6 +446,9 @@ def get_state(self) -> Dict[str, str]:

def _add_key(self, key: str, val: str):
"""Adds key and value to the OMAP."""
if not self.ioctx:
raise

try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
Expand All @@ -455,6 +473,9 @@ def _add_key(self, key: str, val: str):

def _remove_key(self, key: str):
"""Removes key from the OMAP."""
if not self.ioctx:
raise

try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
Expand All @@ -479,6 +500,9 @@ def _remove_key(self, key: str):

def delete_state(self):
"""Deletes OMAP object contents."""
if not self.ioctx:
raise

try:
with rados.WriteOpCtx() as write_op:
self.ioctx.clear_omap(write_op)
Expand All @@ -497,6 +521,9 @@ def register_watch(self, notify_event):
def _watcher_callback(notify_id, notifier_id, watch_id, data):
notify_event.set()

if not self.ioctx:
return

if self.watch is None:
try:
self.watch = self.ioctx.watch(self.omap_name, _watcher_callback)
Expand All @@ -505,6 +532,16 @@ def _watcher_callback(notify_id, notifier_id, watch_id, data):
else:
self.logger.info(f"Watch already exists.")

def cleanup_omap(self):
self.logger.info(f"Cleanup OMAP on exit ({self.id_text})")
if self.watch:
self.logger.debug("Unregistering watch")
self.watch.close()
self.watch = None
if self.ioctx:
self.logger.debug("Closing Rados connection")
self.ioctx.close()
self.ioctx = None

class GatewayStateHandler:
"""Maintains consistency in NVMeoF target state store instances.
Expand Down Expand Up @@ -635,6 +672,10 @@ def update(self) -> bool:
self.logger.warning(f"An update is already running, ignore")
return False

if not self.omap.ioctx:
self.logger.warning(f"Can't update when Rados connection is closed")
return False

with self.update_is_active_lock:
prefix_list = [
GatewayState.SUBSYSTEM_PREFIX,
Expand Down
Loading