diff --git a/docs/reference/workflows/configuring_jobs.rst b/docs/reference/workflows/configuring_jobs.rst index 330c45ad6ab..45fa27d7e47 100644 --- a/docs/reference/workflows/configuring_jobs.rst +++ b/docs/reference/workflows/configuring_jobs.rst @@ -42,9 +42,50 @@ executable: INTERNAL FALSE -- Optional - Set to FALSE by default. EXECUTABLE path/to/program -- Path to a program/script which will be invoked by the job. + NB: note that relative paths are resolved from the location of the job configuration file, not the configuration file provided to ert. +Stop Ert execution upon job failure +----------------------------------- +By default, failing jobs (both internal and external) will not stop the entire ert simulation. +In some cases it is best to cancel the entire simulation if a job fails. +This behavior can be achieved by adding the below line to the job file: + +:: + + STOP_ON_FAIL TRUE + +For example, if a job is defined as follows: + +:: + + INTERNAL FALSE + EXECUTABLE script.sh + STOP_ON_FAIL TRUE -- Tell the job to stop ert on failure + +STOP_ON_FAIL can also be specified within the internal (python) or external (executable) job script. +For example, this internal job script will stop on failure + +:: + + from ert import ErtScript + class AScript(ErtScript): + stop_on_fail = True + + def run(self): + assert False, "failure" + """ + +As will external .sh executables if they contain the line STOP_ON_FAIL=TRUE: + +:: + + #!/bin/bash + STOP_ON_FAIL=True # + ekho helo wordl + + Configuring the arguments ------------------------- diff --git a/src/ert/config/ert_script.py b/src/ert/config/ert_script.py index 2ccf6cabfef..f0746189522 100644 --- a/src/ert/config/ert_script.py +++ b/src/ert/config/ert_script.py @@ -17,6 +17,8 @@ class ErtScript: + stop_on_fail = False + def __init__( self, ert: EnKFMain, @@ -106,7 +108,8 @@ def initializeAndRun( self.output_stack_trace(error=error_msg) return None except Exception as e: - self.output_stack_trace(str(e)) + full_trace = "".join(traceback.format_exception(*sys.exc_info())) + self.output_stack_trace(f"{str(e)}\n{full_trace}") return None finally: self.cleanup() @@ -120,6 +123,8 @@ def output_stack_trace(self, error: str = "") -> None: f"The script '{self.__class__.__name__}' caused an " f"error while running:\n{str(stack_trace).strip()}\n" ) + + self._stderrdata = error self.__failed = True @staticmethod diff --git a/src/ert/config/parsing/workflow_job_keywords.py b/src/ert/config/parsing/workflow_job_keywords.py index dda40d064d1..b1cc05dd4c0 100644 --- a/src/ert/config/parsing/workflow_job_keywords.py +++ b/src/ert/config/parsing/workflow_job_keywords.py @@ -18,6 +18,7 @@ class WorkflowJobKeys(StrEnum): EXECUTABLE = "EXECUTABLE" SCRIPT = "SCRIPT" INTERNAL = "INTERNAL" + STOP_ON_FAIL = "STOP_ON_FAIL" class ConfigArgAtIndex(StrEnum): diff --git a/src/ert/config/parsing/workflow_job_schema.py b/src/ert/config/parsing/workflow_job_schema.py index 57b54ed80c6..0303187da7b 100644 --- a/src/ert/config/parsing/workflow_job_schema.py +++ b/src/ert/config/parsing/workflow_job_schema.py @@ -67,6 +67,14 @@ def internal_keyword() -> SchemaItem: ) +def stop_on_fail_keyword() -> SchemaItem: + return SchemaItem( + kw=WorkflowJobKeys.STOP_ON_FAIL, + required_set=False, + type_map=[SchemaItemType.BOOL], + ) + + class WorkflowJobSchemaDict(SchemaItemDict): @no_type_check def check_required(self, config_dict: ConfigDict, filename: str) -> None: @@ -101,6 +109,7 @@ def init_workflow_job_schema() -> SchemaItemDict: min_arg_keyword(), max_arg_keyword(), arg_type_keyword(), + stop_on_fail_keyword(), ]: schema[item.kw] = item diff --git a/src/ert/config/workflow_job.py b/src/ert/config/workflow_job.py index 085add109ec..a95b955f42a 100644 --- a/src/ert/config/workflow_job.py +++ b/src/ert/config/workflow_job.py @@ -40,6 +40,7 @@ class WorkflowJob: arg_types: List[SchemaItemType] executable: Optional[str] script: Optional[str] + stop_on_fail: Optional[bool] = None # If not None, overrides in-file specification def __post_init__(self) -> None: self.ert_script: Optional[type] = None @@ -48,6 +49,7 @@ def __post_init__(self) -> None: self.ert_script = ErtScript.loadScriptFromFile( self.script, ) # type: ignore + # Bare Exception here as we have no control # of exceptions in the loaded ErtScript except Exception as err: # noqa @@ -79,15 +81,16 @@ def from_file(cls, config_file: str, name: Optional[str] = None) -> "WorkflowJob content_dict = workflow_job_parser(config_file) arg_types_list = cls._make_arg_types_list(content_dict) return cls( - name, - content_dict.get("INTERNAL"), # type: ignore - content_dict.get("MIN_ARG"), # type: ignore - content_dict.get("MAX_ARG"), # type: ignore - arg_types_list, - content_dict.get("EXECUTABLE"), # type: ignore - str(content_dict.get("SCRIPT")) # type: ignore + name=name, + internal=content_dict.get("INTERNAL"), # type: ignore + min_args=content_dict.get("MIN_ARG"), # type: ignore + max_args=content_dict.get("MAX_ARG"), # type: ignore + arg_types=arg_types_list, + executable=content_dict.get("EXECUTABLE"), # type: ignore + script=str(content_dict.get("SCRIPT")) # type: ignore if "SCRIPT" in content_dict else None, + stop_on_fail=content_dict.get("STOP_ON_FAIL"), # type: ignore ) def is_plugin(self) -> bool: diff --git a/src/ert/job_queue/workflow_runner.py b/src/ert/job_queue/workflow_runner.py index d0540e7bd03..74fdbb200ce 100644 --- a/src/ert/job_queue/workflow_runner.py +++ b/src/ert/job_queue/workflow_runner.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import os from concurrent import futures from concurrent.futures import Future from typing import TYPE_CHECKING, Any, Dict, List, Optional @@ -19,6 +20,7 @@ def __init__(self, workflow_job: WorkflowJob): self.job = workflow_job self.__running = False self.__script: Optional[ErtScript] = None + self.stop_on_fail = False def run( self, @@ -44,12 +46,35 @@ def run( if self.job.ert_script is not None: self.__script = self.job.ert_script(ert, storage, ensemble) + if self.job.stop_on_fail is not None: + self.stop_on_fail = self.job.stop_on_fail + elif self.__script is not None: + self.stop_on_fail = self.__script.stop_on_fail or False + elif not self.job.internal: self.__script = ExternalErtScript( ert, # type: ignore storage, # type: ignore self.job.executable, # type: ignore ) + + if self.job.stop_on_fail is not None: + self.stop_on_fail = self.job.stop_on_fail + elif self.job.executable is not None and os.path.isfile( + self.job.executable + ): + try: + with open(self.job.executable, encoding="utf-8") as executable: + lines = executable.readlines() + if any( + line.lower().replace(" ", "").replace("\n", "") + == "stop_on_fail=true" + for line in lines + ): + self.stop_on_fail = True + except Exception: # pylint: disable=broad-exception-caught + self.stop_on_fail = False + else: raise UserWarning("Unknown script type!") result = self.__script.initializeAndRun( # type: ignore @@ -57,6 +82,7 @@ def run( arguments, ) self.__running = False + return result @property @@ -136,6 +162,7 @@ def __exit__( def run(self) -> None: if self.isRunning(): raise AssertionError("An instance of workflow is already running!") + self._workflow_job = self._workflow_executor.submit(self.run_blocking) def run_blocking(self) -> None: @@ -168,6 +195,12 @@ def run_blocking(self) -> None: } if jobrunner.hasFailed(): + if jobrunner.stop_on_fail: + raise RuntimeError( + f"Workflow job {info['job_name']}" + f" failed with error: {info['stderr']}" + ) + logger.error(f"Workflow job {jobrunner.name} failed", extra=info) else: logger.info( diff --git a/tests/unit_tests/cli/test_integration_cli.py b/tests/unit_tests/cli/test_integration_cli.py index 54807d3499a..4c8948d3312 100644 --- a/tests/unit_tests/cli/test_integration_cli.py +++ b/tests/unit_tests/cli/test_integration_cli.py @@ -1,3 +1,6 @@ +# pylint: disable=too-many-lines + +import asyncio import fileinput import json import logging @@ -805,3 +808,184 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config): assert lines[2].strip() == "TheThirdValue" # now MYVAR now set, so should be expanded inside the value of FOURTH assert lines[3].strip() == "fourth:foo" + + +@pytest.mark.usefixtures("use_tmpdir", "copy_poly_case") +@pytest.mark.parametrize( + ("job_src", "script_name", "script_src", "expect_stopped"), + [ + ( + dedent( + """ + STOP_ON_FAIL True + INTERNAL False + EXECUTABLE failing_script.sh + """ + ), + "failing_script.sh", + dedent( + """ + #!/bin/bash + ekho helo wordl + """ + ), + True, + ), + ( + dedent( + """ + STOP_ON_FAIL False + INTERNAL False + EXECUTABLE failing_script.sh + """ + ), + "failing_script.sh", + dedent( + """ + #!/bin/bash + ekho helo wordl + """ + ), + False, + ), + ( + dedent( + """ + INTERNAL False + EXECUTABLE failing_script.sh + """ + ), + "failing_script.sh", + dedent( + """ + #!/bin/bash + STOP_ON_FAIL=False + ekho helo wordl + """ + ), + False, + ), + ( + dedent( + """ + STOP_ON_FAIL True + INTERNAL False + EXECUTABLE failing_script.sh + """ + ), + "failing_script.sh", + dedent( + """ + #!/bin/bash + ekho helo wordl + STOP_ON_FAIL=False + """ + ), + True, + ), + ( + dedent( + """ + STOP_ON_FAIL False + INTERNAL False + EXECUTABLE failing_script.sh + """ + ), + "failing_script.sh", + dedent( + """ + #!/bin/bash + ekho helo wordl + STOP_ON_FAIL=TRUE + """ + ), + False, + ), + ( + dedent( + """ + INTERNAL False + EXECUTABLE failing_script_w_stop.sh + """ + ), + "failing_script_w_stop.sh", + dedent( + """ + #!/bin/bash + ekho helo wordl + STOP_ON_FAIL=True + """ + ), + True, + ), + ( + dedent( + """ + INTERNAL True + SCRIPT failing_ert_script.py + """ + ), + "failing_ert_script.py", + """ +from ert import ErtScript +class AScript(ErtScript): + stop_on_fail = True + + def run(self): + assert False, "failure" +""", + True, + ), + ( + dedent( + """ + INTERNAL True + SCRIPT failing_ert_script.py + STOP_ON_FAIL False + """ + ), + "failing_ert_script.py", + """ +from ert import ErtScript +class AScript(ErtScript): + stop_on_fail = True + + def run(self): + assert False, "failure" +""", + False, + ), + ], +) +def test_that_stop_on_fail_workflow_jobs_stop_ert( + job_src, script_name, script_src, expect_stopped +): + with open("failing_job", "w", encoding="utf-8") as f: + f.write(job_src) + + with open(script_name, "w", encoding="utf-8") as s: + s.write(script_src) + + os.chmod(script_name, os.stat(script_name).st_mode | 0o111) + + with open("dump_failing_workflow", "w", encoding="utf-8") as f: + f.write("failjob") + + with open("poly.ert", mode="a", encoding="utf-8") as fh: + fh.write( + dedent( + """ + LOAD_WORKFLOW_JOB failing_job failjob + LOAD_WORKFLOW dump_failing_workflow wffail + HOOK_WORKFLOW wffail POST_SIMULATION + """ + ) + ) + + parsed = ert_parser(None, args=[TEST_RUN_MODE, "poly.ert"]) + + if expect_stopped: + with pytest.raises(Exception, match="Workflow job .* failed with error"): + run_cli(parsed) + else: + run_cli(parsed) diff --git a/tests/unit_tests/job_queue/test_workflow_job.py b/tests/unit_tests/job_queue/test_workflow_job.py index 8c3ea4242f9..5e96176abee 100644 --- a/tests/unit_tests/job_queue/test_workflow_job.py +++ b/tests/unit_tests/job_queue/test_workflow_job.py @@ -92,3 +92,51 @@ def test_run_internal_script(): result = WorkflowJobRunner(job).run(None, None, None, ["1", "2"]) assert result == -1 + + +@pytest.mark.usefixtures("use_tmpdir") +def test_stop_on_fail_is_parsed_internal(): + with open("fail_job", "w+", encoding="utf-8") as f: + f.write("INTERNAL True\n") + f.write("SCRIPT fail_script.py\n") + f.write("MIN_ARG 1\n") + f.write("MAX_ARG 1\n") + f.write("ARG_TYPE 0 STRING\n") + f.write("STOP_ON_FAIL True\n") + + with open("fail_script.py", "w+", encoding="utf-8") as f: + f.write( + """ +from ert import ErtScript + +class SevereErtFailureScript(ErtScript): + def __init__(self, ert, storage, ensemble=None): + assert False, "Severe ert failure" + + def run(self, *args): + pass + """ + ) + + job_internal = WorkflowJob.from_file( + name="FAIL", + config_file="fail_job", + ) + + assert job_internal.stop_on_fail + + +@pytest.mark.usefixtures("use_tmpdir") +def test_stop_on_fail_is_parsed_external(): + with open("fail_job", "w+", encoding="utf-8") as f: + f.write("INTERNAL False\n") + f.write("EXECUTABLE echo\n") + f.write("MIN_ARG 1\n") + f.write("STOP_ON_FAIL True\n") + + job_internal = WorkflowJob.from_file( + name="FAIL", + config_file="fail_job", + ) + + assert job_internal.stop_on_fail diff --git a/tests/unit_tests/job_queue/test_workflow_runner.py b/tests/unit_tests/job_queue/test_workflow_runner.py index fff6d5e1adc..ae228d9d8be 100644 --- a/tests/unit_tests/job_queue/test_workflow_runner.py +++ b/tests/unit_tests/job_queue/test_workflow_runner.py @@ -137,3 +137,40 @@ def test_workflow_success(): assert os.path.exists("wait_finished_1") assert workflow_runner.workflowResult() + + +@pytest.mark.usefixtures("use_tmpdir") +def test_workflow_stops_with_stopping_job(): + WorkflowCommon.createExternalDumpJob() + with open("dump_failing_job", "a", encoding="utf-8") as f: + f.write("STOP_ON_FAIL True") + + with open("dump_failing_workflow", "w", encoding="utf-8") as f: + f.write("DUMP") + + job_failing_dump = WorkflowJob.from_file("dump_failing_job") + assert job_failing_dump.stop_on_fail + + workflow = Workflow.from_file( + src_file="dump_failing_workflow", + context=SubstitutionList(), + job_dict={"DUMP": job_failing_dump}, + ) + + runner = WorkflowRunner(workflow) + with pytest.raises(RuntimeError, match="Workflow job dump_failing_job failed"): + runner.run_blocking() + + with open("dump_failing_job", "a", encoding="utf-8") as f: + f.write("\nSTOP_ON_FAIL False") + + job_successful_dump = WorkflowJob.from_file("dump_failing_job") + assert not job_successful_dump.stop_on_fail + workflow = Workflow.from_file( + src_file="dump_failing_workflow", + context=SubstitutionList(), + job_dict={"DUMP": job_successful_dump}, + ) + + # Expect no error raised + WorkflowRunner(workflow).run_blocking()