Skip to content

Commit

Permalink
Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Oct 6, 2023
1 parent e655def commit e28903c
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 21 deletions.
2 changes: 2 additions & 0 deletions src/ert/config/ert_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


class ErtScript:
stop_on_fail = False

def __init__(
self,
ert: EnKFMain,
Expand Down
7 changes: 3 additions & 4 deletions src/ert/config/workflow_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class WorkflowJob:
arg_types: List[SchemaItemType]
executable: Optional[str]
script: Optional[str]
stop_on_fail: Optional[bool] = False
stop_on_fail: Optional[bool] = None # If not None, overrides in-file specification

def __post_init__(self) -> None:
self.ert_script: Optional[type] = None
Expand All @@ -50,6 +50,7 @@ def __post_init__(self) -> None:
self.ert_script = ErtScript.loadScriptFromFile(
self.script,
) # type: ignore

# Bare Exception here as we have no control
# of exceptions in the loaded ErtScript
except Exception as err: # noqa
Expand Down Expand Up @@ -90,9 +91,7 @@ def from_file(cls, config_file: str, name: Optional[str] = None) -> "WorkflowJob
script=str(content_dict.get("SCRIPT")) # type: ignore
if "SCRIPT" in content_dict
else None,
stop_on_fail=content_dict.get("STOP_ON_FAIL") # type: ignore
if "STOP_ON_FAIL" in content_dict
else False,
stop_on_fail=content_dict.get("STOP_ON_FAIL"), # type: ignore
)

def is_plugin(self) -> bool:
Expand Down
27 changes: 26 additions & 1 deletion src/ert/job_queue/workflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os
from concurrent import futures
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any, Dict, List, Optional
Expand All @@ -20,6 +21,7 @@ def __init__(self, workflow_job: WorkflowJob):
self.job = workflow_job
self.__running = False
self.__script: Optional[ErtScript] = None
self.stop_on_fail = False

def run(
self,
Expand All @@ -45,12 +47,35 @@ def run(

if self.job.ert_script is not None:
self.__script = self.job.ert_script(ert, storage, ensemble)
if self.job.stop_on_fail is not None:
self.stop_on_fail = self.job.stop_on_fail
elif self.__script is not None:
self.stop_on_fail = self.__script.stop_on_fail or False

elif not self.job.internal:
self.__script = ExternalErtScript(
ert, # type: ignore
storage, # type: ignore
self.job.executable, # type: ignore
)

if self.job.stop_on_fail is not None:
self.stop_on_fail = self.job.stop_on_fail
elif self.job.executable is not None and os.path.isfile(
self.job.executable
):
try:
with open(self.job.executable, encoding="utf-8") as executable:
lines = executable.readlines()
if any(
line.lower().replace(" ", "").replace("\n", "")
== "stop_on_fail=true"
for line in lines
):
self.stop_on_fail = True
except Exception: # pylint: disable=broad-exception-caught
self.stop_on_fail = False

else:
raise UserWarning("Unknown script type!")
result = self.__script.initializeAndRun( # type: ignore
Expand Down Expand Up @@ -171,7 +196,7 @@ def run_blocking(self) -> None:
}

if jobrunner.hasFailed():
if jobrunner.job.stop_on_fail:
if jobrunner.stop_on_fail:
raise RuntimeError(
f"Workflow job {info['job_name']}"
f" failed with error: {info['stderr']}"
Expand Down
172 changes: 157 additions & 15 deletions tests/unit_tests/cli/test_integration_cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# pylint: disable=too-many-lines

import asyncio
import fileinput
import json
import logging
Expand Down Expand Up @@ -864,27 +867,162 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config):


@pytest.mark.usefixtures("use_tmpdir", "copy_poly_case")
def test_that_stop_on_fail_workflow_jobs_stop_ert():
with open("failing_job", "w", encoding="utf-8") as f:
f.write(
@pytest.mark.parametrize(
("job_src", "script_name", "script_src", "expect_stopped"),
[
(
dedent(
"""
STOP_ON_FAIL True
INTERNAL False
EXECUTABLE shitty_script.sh
EXECUTABLE failing_script.sh
"""
)
)
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
"""
),
True,
),
(
dedent(
"""
STOP_ON_FAIL False
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
"""
),
False,
),
(
dedent(
"""
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
STOP_ON_FAIL=False
ekho helo wordl
"""
),
False,
),
(
dedent(
"""
STOP_ON_FAIL True
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=False
"""
),
True,
),
(
dedent(
"""
STOP_ON_FAIL False
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=TRUE
"""
),
False,
),
(
dedent(
"""
INTERNAL False
EXECUTABLE failing_script_w_stop.sh
"""
),
"failing_script_w_stop.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=True
"""
),
True,
),
(
dedent(
"""
INTERNAL True
SCRIPT failing_ert_script.py
"""
),
"failing_ert_script.py",
"""
from ert import ErtScript
class AScript(ErtScript):
stop_on_fail = True
with open("shitty_script.sh", "w", encoding="utf-8") as s:
s.write(
def run(self):
assert False, "failure"
""",
True,
),
(
dedent(
"""
INTERNAL True
SCRIPT failing_ert_script.py
STOP_ON_FAIL False
"""
),
"failing_ert_script.py",
"""
#!/bin/bash
ekho helo wordl
"""
)
from ert import ErtScript
class AScript(ErtScript):
stop_on_fail = True
os.chmod("shitty_script.sh", os.stat("shitty_script.sh").st_mode | 0o111)
def run(self):
assert False, "failure"
""",
False,
),
],
)
def test_that_stop_on_fail_workflow_jobs_stop_ert(
job_src, script_name, script_src, expect_stopped
):
with open("failing_job", "w", encoding="utf-8") as f:
f.write(job_src)

with open(script_name, "w", encoding="utf-8") as s:
s.write(script_src)

os.chmod(script_name, os.stat(script_name).st_mode | 0o111)

with open("dump_failing_workflow", "w", encoding="utf-8") as f:
f.write("failjob")
Expand All @@ -900,6 +1038,10 @@ def test_that_stop_on_fail_workflow_jobs_stop_ert():
)
)

parsed = ert_parser(None, args=[ENSEMBLE_EXPERIMENT_MODE, "poly.ert"])
with pytest.raises(ErtCliError, match="Workflow job failjob failed with error"):
parsed = ert_parser(None, args=[TEST_RUN_MODE, "poly.ert"])

if expect_stopped:
with pytest.raises(Exception, match="Workflow job .* failed with error"):
run_cli(parsed)
else:
run_cli(parsed)
4 changes: 3 additions & 1 deletion tests/unit_tests/job_queue/test_workflow_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ def test_stop_on_fail_is_parsed_internal():
class SevereErtFailureScript(ErtScript):
def __init__(self, ert, storage, ensemble=None):
super().__init__(ert, storage, ensemble=ensemble)
assert False, "Severe ert failure"
def run(self, *args):
pass
"""
)

Expand Down

0 comments on commit e28903c

Please sign in to comment.