Skip to content

Commit

Permalink
Make it possible to specify STOP_ON_FAIL for wf jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Oct 30, 2023
1 parent b9f3a41 commit af93a47
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/ert/config/ert_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


class ErtScript:
stop_on_fail = False

def __init__(
self,
ert: EnKFMain,
Expand Down
1 change: 1 addition & 0 deletions src/ert/config/parsing/workflow_job_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class WorkflowJobKeys(StrEnum):
EXECUTABLE = "EXECUTABLE"
SCRIPT = "SCRIPT"
INTERNAL = "INTERNAL"
STOP_ON_FAIL = "STOP_ON_FAIL"


class ConfigArgAtIndex(StrEnum):
Expand Down
9 changes: 9 additions & 0 deletions src/ert/config/parsing/workflow_job_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ def internal_keyword() -> SchemaItem:
)


def stop_on_fail_keyword() -> SchemaItem:
return SchemaItem(
kw=WorkflowJobKeys.STOP_ON_FAIL,
required_set=False,
type_map=[SchemaItemType.BOOL],
)


class WorkflowJobSchemaDict(SchemaItemDict):
@no_type_check
def check_required(self, config_dict: ConfigDict, filename: str) -> None:
Expand Down Expand Up @@ -101,6 +109,7 @@ def init_workflow_job_schema() -> SchemaItemDict:
min_arg_keyword(),
max_arg_keyword(),
arg_type_keyword(),
stop_on_fail_keyword(),
]:
schema[item.kw] = item

Expand Down
17 changes: 10 additions & 7 deletions src/ert/config/workflow_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class WorkflowJob:
arg_types: List[SchemaItemType]
executable: Optional[str]
script: Optional[str]
stop_on_fail: Optional[bool] = None # If not None, overrides in-file specification

def __post_init__(self) -> None:
self.ert_script: Optional[type] = None
Expand All @@ -48,6 +49,7 @@ def __post_init__(self) -> None:
self.ert_script = ErtScript.loadScriptFromFile(
self.script,
) # type: ignore

# Bare Exception here as we have no control
# of exceptions in the loaded ErtScript
except Exception as err: # noqa
Expand Down Expand Up @@ -79,15 +81,16 @@ def from_file(cls, config_file: str, name: Optional[str] = None) -> "WorkflowJob
content_dict = workflow_job_parser(config_file)
arg_types_list = cls._make_arg_types_list(content_dict)
return cls(
name,
content_dict.get("INTERNAL"), # type: ignore
content_dict.get("MIN_ARG"), # type: ignore
content_dict.get("MAX_ARG"), # type: ignore
arg_types_list,
content_dict.get("EXECUTABLE"), # type: ignore
str(content_dict.get("SCRIPT")) # type: ignore
name=name,
internal=content_dict.get("INTERNAL"), # type: ignore
min_args=content_dict.get("MIN_ARG"), # type: ignore
max_args=content_dict.get("MAX_ARG"), # type: ignore
arg_types=arg_types_list,
executable=content_dict.get("EXECUTABLE"), # type: ignore
script=str(content_dict.get("SCRIPT")) # type: ignore
if "SCRIPT" in content_dict
else None,
stop_on_fail=content_dict.get("STOP_ON_FAIL"), # type: ignore
)

def is_plugin(self) -> bool:
Expand Down
33 changes: 33 additions & 0 deletions src/ert/job_queue/workflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os
from concurrent import futures
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any, Dict, List, Optional
Expand All @@ -19,6 +20,7 @@ def __init__(self, workflow_job: WorkflowJob):
self.job = workflow_job
self.__running = False
self.__script: Optional[ErtScript] = None
self.stop_on_fail = False

def run(
self,
Expand All @@ -44,19 +46,43 @@ def run(

if self.job.ert_script is not None:
self.__script = self.job.ert_script(ert, storage, ensemble)
if self.job.stop_on_fail is not None:
self.stop_on_fail = self.job.stop_on_fail
elif self.__script is not None:
self.stop_on_fail = self.__script.stop_on_fail or False

elif not self.job.internal:
self.__script = ExternalErtScript(
ert, # type: ignore
storage, # type: ignore
self.job.executable, # type: ignore
)

if self.job.stop_on_fail is not None:
self.stop_on_fail = self.job.stop_on_fail
elif self.job.executable is not None and os.path.isfile(
self.job.executable
):
try:
with open(self.job.executable, encoding="utf-8") as executable:
lines = executable.readlines()
if any(
line.lower().replace(" ", "").replace("\n", "")
== "stop_on_fail=true"
for line in lines
):
self.stop_on_fail = True
except Exception: # pylint: disable=broad-exception-caught
self.stop_on_fail = False

else:
raise UserWarning("Unknown script type!")
result = self.__script.initializeAndRun( # type: ignore
self.job.argument_types(),
arguments,
)
self.__running = False

return result

@property
Expand Down Expand Up @@ -136,6 +162,7 @@ def __exit__(
def run(self) -> None:
if self.isRunning():
raise AssertionError("An instance of workflow is already running!")

self._workflow_job = self._workflow_executor.submit(self.run_blocking)

def run_blocking(self) -> None:
Expand Down Expand Up @@ -168,6 +195,12 @@ def run_blocking(self) -> None:
}

if jobrunner.hasFailed():
if jobrunner.stop_on_fail:
raise RuntimeError(
f"Workflow job {info['job_name']}"
f" failed with error: {info['stderr']}"
)

logger.error(f"Workflow job {jobrunner.name} failed", extra=info)
else:
logger.info(
Expand Down
184 changes: 184 additions & 0 deletions tests/unit_tests/cli/test_integration_cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# pylint: disable=too-many-lines

import asyncio
import fileinput
import json
import logging
Expand Down Expand Up @@ -805,3 +808,184 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config):
assert lines[2].strip() == "TheThirdValue"
# now MYVAR now set, so should be expanded inside the value of FOURTH
assert lines[3].strip() == "fourth:foo"


@pytest.mark.usefixtures("use_tmpdir", "copy_poly_case")
@pytest.mark.parametrize(
("job_src", "script_name", "script_src", "expect_stopped"),
[
(
dedent(
"""
STOP_ON_FAIL True
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
"""
),
True,
),
(
dedent(
"""
STOP_ON_FAIL False
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
"""
),
False,
),
(
dedent(
"""
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
STOP_ON_FAIL=False
ekho helo wordl
"""
),
False,
),
(
dedent(
"""
STOP_ON_FAIL True
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=False
"""
),
True,
),
(
dedent(
"""
STOP_ON_FAIL False
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=TRUE
"""
),
False,
),
(
dedent(
"""
INTERNAL False
EXECUTABLE failing_script_w_stop.sh
"""
),
"failing_script_w_stop.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=True
"""
),
True,
),
(
dedent(
"""
INTERNAL True
SCRIPT failing_ert_script.py
"""
),
"failing_ert_script.py",
"""
from ert import ErtScript
class AScript(ErtScript):
stop_on_fail = True
def run(self):
assert False, "failure"
""",
True,
),
(
dedent(
"""
INTERNAL True
SCRIPT failing_ert_script.py
STOP_ON_FAIL False
"""
),
"failing_ert_script.py",
"""
from ert import ErtScript
class AScript(ErtScript):
stop_on_fail = True
def run(self):
assert False, "failure"
""",
False,
),
],
)
def test_that_stop_on_fail_workflow_jobs_stop_ert(
job_src, script_name, script_src, expect_stopped
):
with open("failing_job", "w", encoding="utf-8") as f:
f.write(job_src)

with open(script_name, "w", encoding="utf-8") as s:
s.write(script_src)

os.chmod(script_name, os.stat(script_name).st_mode | 0o111)

with open("dump_failing_workflow", "w", encoding="utf-8") as f:
f.write("failjob")

with open("poly.ert", mode="a", encoding="utf-8") as fh:
fh.write(
dedent(
"""
LOAD_WORKFLOW_JOB failing_job failjob
LOAD_WORKFLOW dump_failing_workflow wffail
HOOK_WORKFLOW wffail POST_SIMULATION
"""
)
)

parsed = ert_parser(None, args=[TEST_RUN_MODE, "poly.ert"])

if expect_stopped:
with pytest.raises(Exception, match="Workflow job .* failed with error"):
run_cli(parsed)
else:
run_cli(parsed)
Loading

0 comments on commit af93a47

Please sign in to comment.