Skip to content

Commit

Permalink
Add logging statements
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Feb 12, 2024
1 parent 34fa32c commit a1a2bc3
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from __future__ import annotations

import asyncio
import logging
import shlex
from asyncio.subprocess import PIPE
from typing import Literal, Mapping, MutableMapping, Optional, Tuple, Union
from typing import List, Literal, Mapping, MutableMapping, Optional, Tuple, Union

from pydantic import BaseModel, Field
from typing_extensions import Annotated

from ert.scheduler.driver import Driver
from ert.scheduler.event import Event, FinishedEvent, StartedEvent

logger = logging.getLogger(__name__)

_POLL_PERIOD = 2.0 # seconds
JobState = Literal["B", "E", "F", "H", "M", "Q", "R", "S", "T", "U", "W", "X"]

Expand Down Expand Up @@ -58,25 +61,31 @@ async def submit(

arg_queue_name = ["-q", self._queue_name] if self._queue_name else []

process = await asyncio.create_subprocess_exec(
command_line: List[str] = [
"qsub",
"-koe", # Discard stdout/stderr of job
"-rn", # Don't restart on failure
f"-N{name}", # Set name of job
*arg_queue_name,
"-",
]
logger.info(f"Submitting to PBS with command {shlex.join(command_line)}")
process = await asyncio.create_subprocess_exec(
*command_line,
stdin=PIPE,
stdout=PIPE,
)
job_id, _ = await process.communicate(script.encode())
job_id_ = job_id.decode("utf-8").strip()
logger.info(f"Realization {iens} accepted by PBS, got id {job_id_}")
self._jobs[job_id_] = (iens, "Q")
self._iens2jobid[iens] = job_id_

async def kill(self, iens: int) -> None:
try:
job_id = self._iens2jobid[iens]

logger.info(f"Killing realization {iens} with PBS-id {job_id}")
proc = await asyncio.create_subprocess_exec("qdel", job_id)
await proc.wait()
except KeyError:
Expand Down Expand Up @@ -109,14 +118,21 @@ async def poll(self) -> None:
self._jobs[job_id] = (iens, new_state)
event: Optional[Event] = None
if isinstance(job, RunningJob):
logger.debug(f"Realization {iens} is running")
event = StartedEvent(iens=iens)
elif isinstance(job, FinishedJob):
aborted = job.returncode >= 256
event = FinishedEvent(
iens=iens,
returncode=job.returncode,
aborted=job.returncode >= 256,
iens=iens, returncode=job.returncode, aborted=aborted
)

if aborted:
logger.warning(
f"Realization {iens} (PBS-id: {self._iens2jobid[iens]}) failed"
)
else:
logger.info(
f"Realization {iens} (PBS-id: {self._iens2jobid[iens]}) succeeded"
)
del self._jobs[job_id]
del self._iens2jobid[iens]

Expand Down

0 comments on commit a1a2bc3

Please sign in to comment.