Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add STOP_ON_FAIL option for wf jobs #6101

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/ert/config/ert_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


class ErtScript:
stop_on_fail = False

def __init__(
self,
ert: EnKFMain,
Expand Down Expand Up @@ -106,7 +108,8 @@ def initializeAndRun(
self.output_stack_trace(error=error_msg)
return None
except Exception as e:
self.output_stack_trace(str(e))
full_trace = "".join(traceback.format_exception(*sys.exc_info()))
self.output_stack_trace(f"{str(e)}\n{full_trace}")
return None
finally:
self.cleanup()
Expand All @@ -120,6 +123,8 @@ def output_stack_trace(self, error: str = "") -> None:
f"The script '{self.__class__.__name__}' caused an "
f"error while running:\n{str(stack_trace).strip()}\n"
)

self._stderrdata = error
self.__failed = True

@staticmethod
Expand Down
1 change: 1 addition & 0 deletions src/ert/config/parsing/workflow_job_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class WorkflowJobKeys(StrEnum):
EXECUTABLE = "EXECUTABLE"
SCRIPT = "SCRIPT"
INTERNAL = "INTERNAL"
STOP_ON_FAIL = "STOP_ON_FAIL"


class ConfigArgAtIndex(StrEnum):
Expand Down
9 changes: 9 additions & 0 deletions src/ert/config/parsing/workflow_job_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ def internal_keyword() -> SchemaItem:
)


def stop_on_fail_keyword() -> SchemaItem:
return SchemaItem(
kw=WorkflowJobKeys.STOP_ON_FAIL,
required_set=False,
type_map=[SchemaItemType.BOOL],
)


class WorkflowJobSchemaDict(SchemaItemDict):
@no_type_check
def check_required(self, config_dict: ConfigDict, filename: str) -> None:
Expand Down Expand Up @@ -101,6 +109,7 @@ def init_workflow_job_schema() -> SchemaItemDict:
min_arg_keyword(),
max_arg_keyword(),
arg_type_keyword(),
stop_on_fail_keyword(),
]:
schema[item.kw] = item

Expand Down
17 changes: 10 additions & 7 deletions src/ert/config/workflow_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class WorkflowJob:
arg_types: List[SchemaItemType]
executable: Optional[str]
script: Optional[str]
stop_on_fail: Optional[bool] = None # If not None, overrides in-file specification

def __post_init__(self) -> None:
self.ert_script: Optional[type] = None
Expand All @@ -48,6 +49,7 @@ def __post_init__(self) -> None:
self.ert_script = ErtScript.loadScriptFromFile(
self.script,
) # type: ignore

# Bare Exception here as we have no control
# of exceptions in the loaded ErtScript
except Exception as err: # noqa
Expand Down Expand Up @@ -79,15 +81,16 @@ def from_file(cls, config_file: str, name: Optional[str] = None) -> "WorkflowJob
content_dict = workflow_job_parser(config_file)
arg_types_list = cls._make_arg_types_list(content_dict)
return cls(
name,
content_dict.get("INTERNAL"), # type: ignore
content_dict.get("MIN_ARG"), # type: ignore
content_dict.get("MAX_ARG"), # type: ignore
arg_types_list,
content_dict.get("EXECUTABLE"), # type: ignore
str(content_dict.get("SCRIPT")) # type: ignore
name=name,
internal=content_dict.get("INTERNAL"), # type: ignore
min_args=content_dict.get("MIN_ARG"), # type: ignore
max_args=content_dict.get("MAX_ARG"), # type: ignore
arg_types=arg_types_list,
executable=content_dict.get("EXECUTABLE"), # type: ignore
script=str(content_dict.get("SCRIPT")) # type: ignore
if "SCRIPT" in content_dict
else None,
stop_on_fail=content_dict.get("STOP_ON_FAIL"), # type: ignore
)

def is_plugin(self) -> bool:
Expand Down
33 changes: 33 additions & 0 deletions src/ert/job_queue/workflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os
from concurrent import futures
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any, Dict, List, Optional
Expand All @@ -19,6 +20,7 @@ def __init__(self, workflow_job: WorkflowJob):
self.job = workflow_job
self.__running = False
self.__script: Optional[ErtScript] = None
self.stop_on_fail = False

def run(
self,
Expand All @@ -44,19 +46,43 @@ def run(

if self.job.ert_script is not None:
self.__script = self.job.ert_script(ert, storage, ensemble)
if self.job.stop_on_fail is not None:
self.stop_on_fail = self.job.stop_on_fail
elif self.__script is not None:
self.stop_on_fail = self.__script.stop_on_fail or False

elif not self.job.internal:
self.__script = ExternalErtScript(
ert, # type: ignore
storage, # type: ignore
self.job.executable, # type: ignore
)

if self.job.stop_on_fail is not None:
self.stop_on_fail = self.job.stop_on_fail
elif self.job.executable is not None and os.path.isfile(
self.job.executable
):
try:
with open(self.job.executable, encoding="utf-8") as executable:
lines = executable.readlines()
if any(
line.lower().replace(" ", "").replace("\n", "")
== "stop_on_fail=true"
for line in lines
):
self.stop_on_fail = True
except Exception: # pylint: disable=broad-exception-caught
self.stop_on_fail = False

else:
raise UserWarning("Unknown script type!")
result = self.__script.initializeAndRun( # type: ignore
self.job.argument_types(),
arguments,
)
self.__running = False

return result

@property
Expand Down Expand Up @@ -136,6 +162,7 @@ def __exit__(
def run(self) -> None:
if self.isRunning():
raise AssertionError("An instance of workflow is already running!")

self._workflow_job = self._workflow_executor.submit(self.run_blocking)

def run_blocking(self) -> None:
Expand Down Expand Up @@ -168,6 +195,12 @@ def run_blocking(self) -> None:
}

if jobrunner.hasFailed():
if jobrunner.stop_on_fail:
raise RuntimeError(
f"Workflow job {info['job_name']}"
f" failed with error: {info['stderr']}"
)

logger.error(f"Workflow job {jobrunner.name} failed", extra=info)
else:
logger.info(
Expand Down
184 changes: 184 additions & 0 deletions tests/unit_tests/cli/test_integration_cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# pylint: disable=too-many-lines

import asyncio
import fileinput
import json
import logging
Expand Down Expand Up @@ -805,3 +808,184 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config):
assert lines[2].strip() == "TheThirdValue"
# now MYVAR now set, so should be expanded inside the value of FOURTH
assert lines[3].strip() == "fourth:foo"


@pytest.mark.usefixtures("use_tmpdir", "copy_poly_case")
@pytest.mark.parametrize(
("job_src", "script_name", "script_src", "expect_stopped"),
[
(
dedent(
"""
STOP_ON_FAIL True
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
"""
),
True,
),
(
dedent(
"""
STOP_ON_FAIL False
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
"""
),
False,
),
(
dedent(
"""
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
STOP_ON_FAIL=False
ekho helo wordl
"""
),
False,
),
(
dedent(
"""
STOP_ON_FAIL True
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=False
"""
),
True,
),
(
dedent(
"""
STOP_ON_FAIL False
INTERNAL False
EXECUTABLE failing_script.sh
"""
),
"failing_script.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=TRUE
"""
),
False,
),
(
dedent(
"""
INTERNAL False
EXECUTABLE failing_script_w_stop.sh
"""
),
"failing_script_w_stop.sh",
dedent(
"""
#!/bin/bash
ekho helo wordl
STOP_ON_FAIL=True
"""
),
True,
),
(
dedent(
"""
INTERNAL True
SCRIPT failing_ert_script.py
"""
),
"failing_ert_script.py",
"""
from ert import ErtScript
class AScript(ErtScript):
stop_on_fail = True

def run(self):
assert False, "failure"
""",
True,
),
(
dedent(
"""
INTERNAL True
SCRIPT failing_ert_script.py
STOP_ON_FAIL False
"""
),
"failing_ert_script.py",
"""
from ert import ErtScript
class AScript(ErtScript):
stop_on_fail = True

def run(self):
assert False, "failure"
""",
False,
),
],
)
def test_that_stop_on_fail_workflow_jobs_stop_ert(
job_src, script_name, script_src, expect_stopped
):
with open("failing_job", "w", encoding="utf-8") as f:
f.write(job_src)

with open(script_name, "w", encoding="utf-8") as s:
s.write(script_src)

os.chmod(script_name, os.stat(script_name).st_mode | 0o111)

with open("dump_failing_workflow", "w", encoding="utf-8") as f:
f.write("failjob")

with open("poly.ert", mode="a", encoding="utf-8") as fh:
fh.write(
dedent(
"""
LOAD_WORKFLOW_JOB failing_job failjob
LOAD_WORKFLOW dump_failing_workflow wffail
HOOK_WORKFLOW wffail POST_SIMULATION
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could change this to PRE_SIMULATION to avoid spinning up the ensemble evaluator, or perhaps you want it to be POST to check that we exit the tracking loop? If so, that might be worth a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look a bit more into this, running tests it also hangs on PRE_SIMULATION because it waits 60 seconds for the websocket client to connect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should maybe be another issue, I tried some stuff w/ threading, like setting the monitor thread to be a daemon, but there is still something else that keeps it hanging.

When the thread named ert_cli_simulation_thread is spawned, it spawns some other threads too:
Screenshot 2023-09-20 at 09 37 23

Should maybe be a separate issue/thing to look into/thing to postpone until experiment server milestone which will refactor this away anyway?

"""
)
)

parsed = ert_parser(None, args=[TEST_RUN_MODE, "poly.ert"])

if expect_stopped:
with pytest.raises(Exception, match="Workflow job .* failed with error"):
run_cli(parsed)
else:
run_cli(parsed)
Loading
Loading