From 8c7b7be72cdaf54466b22fde96b9e42dfbe2e04f Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 18 Feb 2025 10:35:25 +0000 Subject: [PATCH] work around NFS caching issues (#6603) * scan: refresh NFS cache for contact file * load_contact_file: refresh NFS cache for directories above contact file * infer_latest_run: refresh NFS cache for the run directory --- changes.d/6506.fix.md | 1 + cylc/flow/network/scan.py | 17 ++++++++++++++--- cylc/flow/workflow_files.py | 38 ++++++++++++++++++++++++++++++++++++- 3 files changed, 52 insertions(+), 4 deletions(-) create mode 100644 changes.d/6506.fix.md diff --git a/changes.d/6506.fix.md b/changes.d/6506.fix.md new file mode 100644 index 00000000000..5017c89c994 --- /dev/null +++ b/changes.d/6506.fix.md @@ -0,0 +1 @@ +Work around caching behaviour observed on NFS filesystems which could cause workflows to appear to be stopped or even to not exist, when they are running. diff --git a/cylc/flow/network/scan.py b/cylc/flow/network/scan.py index 6de28ada517..d02572e498b 100644 --- a/cylc/flow/network/scan.py +++ b/cylc/flow/network/scan.py @@ -316,10 +316,21 @@ async def is_active(flow, is_active): False to filter for stopped and unregistered flows. """ - contact = flow['path'] / SERVICE / CONTACT - _is_active = contact.exists() + service = flow['path'] / SERVICE + # NOTE: We must list the service directory contents rather than checking + # for the existence of the contact file directly, because listing the + # directory forces NFS filesystems to recompute their local cache. + # See https://github.com/cylc/cylc-flow/issues/6506 + try: + contents = await scandir(service) + except FileNotFoundError: + _is_active = False + else: + _is_active = any( + path.name == WorkflowFiles.Service.CONTACT for path in contents + ) if _is_active: - flow['contact'] = contact + flow['contact'] = service / CONTACT return _is_active == is_active diff --git a/cylc/flow/workflow_files.py b/cylc/flow/workflow_files.py index 347f9e1dbff..add79adbbd6 100644 --- a/cylc/flow/workflow_files.py +++ b/cylc/flow/workflow_files.py @@ -22,8 +22,10 @@ """ +from contextlib import suppress from enum import Enum import errno +from collections import deque import os from pathlib import Path import re @@ -620,8 +622,29 @@ def get_workflow_srv_dir(id_): return os.path.join(run_d, WorkflowFiles.Service.DIRNAME) +def refresh_nfs_cache(path: Path): + """Refresh NFS cache for dirs between ~/cylc-run and inclusive. + + On NFS filesystems, the non-existence of files/directories may become + cashed. To work around this, we can list the contents of these directories + which refreshes the NFS cache. + + See: https://github.com/cylc/cylc-flow/issues/6506 + + Arguments: + path: The directory to refresh. + + Raises: + FileNotFoundError: If any of the directories between ~/cylc-run and + this directory (inclsive) are not present. + + """ + cylc_run_dir = get_cylc_run_dir() + for subdir in reversed(path.relative_to(cylc_run_dir).parents): + deque((cylc_run_dir / subdir).iterdir(), maxlen=0) + + def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]: - """Load contact file. Return data as key=value dict.""" if not run_dir: path = Path(get_contact_file_path(id_)) else: @@ -630,6 +653,14 @@ def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]: WorkflowFiles.Service.DIRNAME, WorkflowFiles.Service.CONTACT ) + + if not path.exists(): + # work around NFS caching issues + try: + refresh_nfs_cache(path) + except FileNotFoundError as exc: + raise ServiceFileError("Couldn't load contact file") from exc + try: with open(path) as f: file_content = f.read() @@ -919,6 +950,11 @@ def infer_latest_run( except ValueError: raise ValueError(f"{path} is not in the cylc-run directory") from None + if not path.exists(): + # work around NFS caching issues + with suppress(FileNotFoundError): + refresh_nfs_cache(path) + if not path.exists(): raise InputError( f'Workflow ID not found: {id_}\n(Directory not found: {path})'