Skip to content

Commit

Permalink
work around NFS caching issues (#6603)
Browse files Browse the repository at this point in the history
* scan: refresh NFS cache for contact file
* load_contact_file: refresh NFS cache for directories above contact file
* infer_latest_run: refresh NFS cache for the run directory
  • Loading branch information
oliver-sanders authored Feb 18, 2025
1 parent aad39a9 commit 8c7b7be
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 4 deletions.
1 change: 1 addition & 0 deletions changes.d/6506.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Work around caching behaviour observed on NFS filesystems which could cause workflows to appear to be stopped or even to not exist, when they are running.
17 changes: 14 additions & 3 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,21 @@ async def is_active(flow, is_active):
False to filter for stopped and unregistered flows.
"""
contact = flow['path'] / SERVICE / CONTACT
_is_active = contact.exists()
service = flow['path'] / SERVICE
# NOTE: We must list the service directory contents rather than checking
# for the existence of the contact file directly, because listing the
# directory forces NFS filesystems to recompute their local cache.
# See https://github.com/cylc/cylc-flow/issues/6506
try:
contents = await scandir(service)
except FileNotFoundError:
_is_active = False
else:
_is_active = any(
path.name == WorkflowFiles.Service.CONTACT for path in contents
)
if _is_active:
flow['contact'] = contact
flow['contact'] = service / CONTACT
return _is_active == is_active


Expand Down
38 changes: 37 additions & 1 deletion cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
"""

from contextlib import suppress
from enum import Enum
import errno
from collections import deque
import os
from pathlib import Path
import re
Expand Down Expand Up @@ -620,8 +622,29 @@ def get_workflow_srv_dir(id_):
return os.path.join(run_d, WorkflowFiles.Service.DIRNAME)


def refresh_nfs_cache(path: Path):
"""Refresh NFS cache for dirs between ~/cylc-run and <path> inclusive.
On NFS filesystems, the non-existence of files/directories may become
cashed. To work around this, we can list the contents of these directories
which refreshes the NFS cache.
See: https://github.com/cylc/cylc-flow/issues/6506
Arguments:
path: The directory to refresh.
Raises:
FileNotFoundError: If any of the directories between ~/cylc-run and
this directory (inclsive) are not present.
"""
cylc_run_dir = get_cylc_run_dir()
for subdir in reversed(path.relative_to(cylc_run_dir).parents):
deque((cylc_run_dir / subdir).iterdir(), maxlen=0)


def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]:
"""Load contact file. Return data as key=value dict."""
if not run_dir:
path = Path(get_contact_file_path(id_))
else:
Expand All @@ -630,6 +653,14 @@ def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]:
WorkflowFiles.Service.DIRNAME,
WorkflowFiles.Service.CONTACT
)

if not path.exists():
# work around NFS caching issues
try:
refresh_nfs_cache(path)
except FileNotFoundError as exc:
raise ServiceFileError("Couldn't load contact file") from exc

try:
with open(path) as f:
file_content = f.read()
Expand Down Expand Up @@ -919,6 +950,11 @@ def infer_latest_run(
except ValueError:
raise ValueError(f"{path} is not in the cylc-run directory") from None

if not path.exists():
# work around NFS caching issues
with suppress(FileNotFoundError):
refresh_nfs_cache(path)

if not path.exists():
raise InputError(
f'Workflow ID not found: {id_}\n(Directory not found: {path})'
Expand Down

0 comments on commit 8c7b7be

Please sign in to comment.