Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema: add first parent descendants #6610

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6506.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Work around caching behaviour observed on NFS filesystems which could cause workflows to appear to be stopped or even to not exist, when they are running.
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ message PbFamily {
repeated string child_families = 9;
optional string first_parent = 10;
optional PbRuntime runtime = 11;
repeated string descendants = 12;
}

message PbFamilyProxy {
Expand Down
52 changes: 26 additions & 26 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions cylc/flow/data_messages_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ class PbTaskProxy(_message.Message):
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., task: _Optional[str] = ..., state: _Optional[str] = ..., cycle_point: _Optional[str] = ..., depth: _Optional[int] = ..., job_submits: _Optional[int] = ..., outputs: _Optional[_Mapping[str, PbOutput]] = ..., namespace: _Optional[_Iterable[str]] = ..., prerequisites: _Optional[_Iterable[_Union[PbPrerequisite, _Mapping]]] = ..., jobs: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., name: _Optional[str] = ..., is_held: bool = ..., edges: _Optional[_Iterable[str]] = ..., ancestors: _Optional[_Iterable[str]] = ..., flow_nums: _Optional[str] = ..., external_triggers: _Optional[_Mapping[str, PbTrigger]] = ..., xtriggers: _Optional[_Mapping[str, PbTrigger]] = ..., is_queued: bool = ..., is_runahead: bool = ..., flow_wait: bool = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ...) -> None: ...

class PbFamily(_message.Message):
__slots__ = ("stamp", "id", "name", "meta", "depth", "proxies", "parents", "child_tasks", "child_families", "first_parent", "runtime")
__slots__ = ("stamp", "id", "name", "meta", "depth", "proxies", "parents", "child_tasks", "child_families", "first_parent", "runtime", "descendants")
STAMP_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -389,6 +389,7 @@ class PbFamily(_message.Message):
CHILD_FAMILIES_FIELD_NUMBER: _ClassVar[int]
FIRST_PARENT_FIELD_NUMBER: _ClassVar[int]
RUNTIME_FIELD_NUMBER: _ClassVar[int]
DESCENDANTS_FIELD_NUMBER: _ClassVar[int]
stamp: str
id: str
name: str
Expand All @@ -400,7 +401,8 @@ class PbFamily(_message.Message):
child_families: _containers.RepeatedScalarFieldContainer[str]
first_parent: str
runtime: PbRuntime
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., name: _Optional[str] = ..., meta: _Optional[_Union[PbMeta, _Mapping]] = ..., depth: _Optional[int] = ..., proxies: _Optional[_Iterable[str]] = ..., parents: _Optional[_Iterable[str]] = ..., child_tasks: _Optional[_Iterable[str]] = ..., child_families: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ...) -> None: ...
descendants: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., name: _Optional[str] = ..., meta: _Optional[_Union[PbMeta, _Mapping]] = ..., depth: _Optional[int] = ..., proxies: _Optional[_Iterable[str]] = ..., parents: _Optional[_Iterable[str]] = ..., child_tasks: _Optional[_Iterable[str]] = ..., child_families: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., descendants: _Optional[_Iterable[str]] = ...) -> None: ...

class PbFamilyProxy(_message.Message):
__slots__ = ("stamp", "id", "cycle_point", "name", "family", "state", "depth", "first_parent", "child_tasks", "child_families", "is_held", "ancestors", "states", "state_totals", "is_held_total", "is_queued", "is_queued_total", "is_runahead", "is_runahead_total", "runtime", "graph_depth")
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ def generate_definition_elements(self):
id=f_id,
name=name,
depth=len(ancestors[name]) - 1,
descendants=list(descendants.get(name, [])),
)
famcfg = config.cfg['runtime'][name]
user_defined_meta = {}
Expand Down
17 changes: 14 additions & 3 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,21 @@ async def is_active(flow, is_active):
False to filter for stopped and unregistered flows.

"""
contact = flow['path'] / SERVICE / CONTACT
_is_active = contact.exists()
service = flow['path'] / SERVICE
# NOTE: We must list the service directory contents rather than checking
# for the existence of the contact file directly, because listing the
# directory forces NFS filesystems to recompute their local cache.
# See https://github.com/cylc/cylc-flow/issues/6506
try:
contents = await scandir(service)
except FileNotFoundError:
_is_active = False
else:
_is_active = any(
path.name == WorkflowFiles.Service.CONTACT for path in contents
)
if _is_active:
flow['contact'] = contact
flow['contact'] = service / CONTACT
return _is_active == is_active


Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,10 @@ class Meta:
delta_store=DELTA_STORE_DEFAULT,
delta_type=DELTA_TYPE_DEFAULT,
resolver=get_nodes_by_ids)
descendants = graphene.List(
String,
description='First-parent descendants.',
)
first_parent = Field(
lambda: Family,
description='Family first parent.',
Expand Down
38 changes: 37 additions & 1 deletion cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

"""

from contextlib import suppress
from enum import Enum
import errno
from queue import deque
import os
from pathlib import Path
import re
Expand Down Expand Up @@ -620,8 +622,29 @@ def get_workflow_srv_dir(id_):
return os.path.join(run_d, WorkflowFiles.Service.DIRNAME)


def refresh_nfs_cache(path: Path):
"""Refresh NFS cache for dirs between ~/cylc-run and <path> inclusive.

On NFS filesystems, the non-existence of files/directories may become
cashed. To work around this, we can list the contents of these directories
which refreshes the NFS cache.

See: https://github.com/cylc/cylc-flow/issues/6506

Arguments:
path: The directory to refresh.

Raises:
FileNotFoundError: If any of the directories between ~/cylc-run and
this directory (inclsive) are not present.

"""
cylc_run_dir = get_cylc_run_dir()
for subdir in reversed(path.relative_to(cylc_run_dir).parents):
deque((cylc_run_dir / subdir).iterdir(), maxlen=0)


def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]:
"""Load contact file. Return data as key=value dict."""
if not run_dir:
path = Path(get_contact_file_path(id_))
else:
Expand All @@ -630,6 +653,14 @@ def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]:
WorkflowFiles.Service.DIRNAME,
WorkflowFiles.Service.CONTACT
)

if not path.exists():
# work around NFS caching issues
try:
refresh_nfs_cache(path)
except FileNotFoundError as exc:
raise ServiceFileError("Couldn't load contact file") from exc

try:
with open(path) as f:
file_content = f.read()
Expand Down Expand Up @@ -919,6 +950,11 @@ def infer_latest_run(
except ValueError:
raise ValueError(f"{path} is not in the cylc-run directory") from None

if not path.exists():
# work around NFS caching issues
with suppress(FileNotFoundError):
refresh_nfs_cache(path)

if not path.exists():
raise InputError(
f'Workflow ID not found: {id_}\n(Directory not found: {path})'
Expand Down
Loading