Skip to content

Commit

Permalink
Change job to forward model step
Browse files Browse the repository at this point in the history
Still many like these to go. Not changing the string job in the outputted
XML file.
  • Loading branch information
berland committed Oct 25, 2024
1 parent d24eb13 commit f1edebb
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions src/_ert/forward_model_runner/reporting/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def __init__(self):
self.node = socket.gethostname()

def report(self, msg: Message):
job_status = {}
fm_step_status = {}

if msg.job:
logger.debug("Adding message job to status dictionary.")
job_status = self.status_dict["jobs"][msg.job.index]
fm_step_status = self.status_dict["jobs"][msg.job.index]

if isinstance(msg, Init):
logger.debug("Init Message Instance")
Expand All @@ -56,28 +56,30 @@ def report(self, msg: Message):

elif isinstance(msg, Start):
if msg.success():
logger.debug(f"Job {msg.job.name()} was successfully started")
logger.debug(
f"Forward model step {msg.job.name()} was successfully started"
)
self._start_status_file(msg)
self._add_log_line(msg.job)
job_status.update(
fm_step_status.update(
status=_JOB_STATUS_RUNNING,
start_time=data_util.datetime_serialize(msg.timestamp),
)
else:
logger.error(f"Job {msg.job.name()} FAILED to start")
logger.error(f"Forward model step {msg.job.name()} FAILED to start")
error_msg = msg.error_message
job_status.update(
fm_step_status.update(
status=_JOB_STATUS_FAILURE,
error=error_msg,
end_time=data_util.datetime_serialize(msg.timestamp),
)
self._complete_status_file(msg)

elif isinstance(msg, Exited):
job_status["end_time"] = data_util.datetime_serialize(msg.timestamp)
fm_step_status["end_time"] = data_util.datetime_serialize(msg.timestamp)
if msg.success():
logger.debug(f"Job {msg.job.name()} exited successfully")
job_status["status"] = _JOB_STATUS_SUCCESS
logger.debug(f"Forward model step {msg.job.name()} exited successfully")
fm_step_status["status"] = _JOB_STATUS_SUCCESS
self._complete_status_file(msg)
else:
error_msg = msg.error_message
Expand All @@ -88,7 +90,7 @@ def report(self, msg: Message):
error_message=msg.error_message,
)
)
job_status.update(error=error_msg, status=_JOB_STATUS_FAILURE)
fm_step_status.update(error=error_msg, status=_JOB_STATUS_FAILURE)

# A STATUS_file is not written if there is no exit_code, i.e.
# when the job is killed due to timeout.
Expand All @@ -97,7 +99,7 @@ def report(self, msg: Message):
self._dump_error_file(msg.job, error_msg)

elif isinstance(msg, Running):
job_status.update(
fm_step_status.update(
max_memory_usage=msg.memory_status.max_rss,
current_memory_usage=msg.memory_status.rss,
cpu_seconds=msg.memory_status.cpu_seconds,
Expand Down Expand Up @@ -166,25 +168,27 @@ def _add_log_line(job):
f.write(f"{time_str} Calling: {job.job_data['executable']} {args}\n")

@staticmethod
def _dump_error_file(job, error_msg):
def _dump_error_file(fm_step, error_msg):
with append(ERROR_file) as file:
file.write("<error>\n")
file.write(
f" <time>{time.strftime(TIME_FORMAT, time.localtime())}</time>\n"
)
file.write(f" <job>{job.name()}</job>\n")
file.write(f" <job>{fm_step.name()}</job>\n")
file.write(f" <reason>{error_msg}</reason>\n")
stderr_file = None
if job.std_err:
if os.path.exists(job.std_err):
with open(job.std_err, "r", encoding="utf-8") as error_file_handler:
if fm_step.std_err:
if os.path.exists(fm_step.std_err):
with open(
fm_step.std_err, "r", encoding="utf-8"
) as error_file_handler:
stderr = error_file_handler.read()
if stderr:
stderr_file = os.path.join(os.getcwd(), job.std_err)
stderr_file = os.path.join(os.getcwd(), fm_step.std_err)
else:
stderr = f"Not written by:{job.name()}\n"
stderr = f"Empty stderr from {fm_step.name()}\n"
else:
stderr = f"stderr: Could not find file:{job.std_err}\n"
stderr = f"stderr: Could not find file: {fm_step.std_err}\n"
else:
stderr = "stderr: Not redirected\n"

Expand Down

0 comments on commit f1edebb

Please sign in to comment.