Skip to content

Commit

Permalink
Do not retry when we get 'Job has finished' from qdel
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Mar 6, 2024
1 parent b20d565 commit 5ab5f56
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
47 changes: 29 additions & 18 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
import asyncio
import logging
import shlex
from typing import List, Literal, Mapping, MutableMapping, Optional, Tuple, Union
from typing import (
Iterable,
List,
Literal,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

from pydantic import BaseModel, Field
from typing_extensions import Annotated
Expand All @@ -30,19 +39,11 @@
]
JOBSTATE_INITIAL: JobState = "Q"

QSUB_INVALID_CREDENTIAL: int = 171
QSUB_PREMATURE_END_OF_MESSAGE: int = 183
QSUB_CONNECTION_REFUSED: int = 162
QDEL_JOB_HAS_FINISHED: int = 35
QDEL_REQUEST_INVALID: int = 168

QSUB_EXIT_CODES = [
QSUB_INVALID_CREDENTIAL,
QSUB_PREMATURE_END_OF_MESSAGE,
QSUB_CONNECTION_REFUSED,
]

QDEL_EXIT_CODES = [QDEL_REQUEST_INVALID, QDEL_JOB_HAS_FINISHED]
QSUB_INVALID_CREDENTIAL = 171
QSUB_PREMATURE_END_OF_MESSAGE = 183
QSUB_CONNECTION_REFUSED = 162
QDEL_JOB_HAS_FINISHED = 35
QDEL_REQUEST_INVALID = 168


class FinishedJob(BaseModel):
Expand Down Expand Up @@ -116,7 +117,8 @@ def _resource_string(self) -> str:
async def _execute_with_retry(
self,
cmd_with_args: List[str],
exit_codes_triggering_retries: List[int],
retry_codes: Iterable[int] = (),
accept_codes: Iterable[int] = (),
) -> Tuple[bool, str]:
error_message: Optional[str] = None

Expand All @@ -130,8 +132,10 @@ async def _execute_with_retry(

if process.returncode == 0:
return True, stdout.decode(errors="ignore").strip()
elif process.returncode in exit_codes_triggering_retries:
elif process.returncode in retry_codes:

Check failure on line 135 in src/ert/scheduler/openpbs_driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Unsupported operand types for in ("int | None" and "Iterable[int]")
error_message = stderr.decode(errors="ignore").strip()
elif process.returncode in accept_codes:

Check failure on line 137 in src/ert/scheduler/openpbs_driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Unsupported operand types for in ("int | None" and "Iterable[int]")
return True, stderr.decode(errors="ignore").strip()
else:
error_message = (
f'Command "{shlex.join(cmd_with_args)}" failed '
Expand Down Expand Up @@ -180,7 +184,12 @@ async def submit(
logger.debug(f"Submitting to PBS with command {shlex.join(qsub_with_args)}")

process_success, process_message = await self._execute_with_retry(
qsub_with_args, exit_codes_triggering_retries=QSUB_EXIT_CODES
qsub_with_args,
retry_codes=(
QSUB_INVALID_CREDENTIAL,
QSUB_PREMATURE_END_OF_MESSAGE,
QSUB_CONNECTION_REFUSED,
),
)
if not process_success:
raise RuntimeError(process_message)
Expand All @@ -200,7 +209,9 @@ async def kill(self, iens: int) -> None:
logger.debug(f"Killing realization {iens} with PBS-id {job_id}")

process_success, process_message = await self._execute_with_retry(
["qdel", str(job_id)], exit_codes_triggering_retries=QDEL_EXIT_CODES
["qdel", str(job_id)],
retry_codes=(QDEL_REQUEST_INVALID,),
accept_codes=(QDEL_JOB_HAS_FINISHED,),
)
if not process_success:
raise RuntimeError(process_message)
Expand Down
10 changes: 7 additions & 3 deletions tests/unit_tests/scheduler/test_openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,13 @@ async def test_that_qdel_will_retry_and_succeed(
driver._retry_pbs_cmd_interval = 0.2
driver._iens2jobid[0] = 111
await driver.kill(0)
assert "TRIED" in Path(bin_path / "script_try").read_text(encoding="utf-8")
assert "qdel executed" in Path(bin_path / "qdel_output").read_text(encoding="utf-8")
assert error_msg in Path(bin_path / "qdel_error").read_text(encoding="utf-8")
assert "TRIED" in (bin_path / "script_try").read_text()
if exit_code == QDEL_JOB_HAS_FINISHED:
# the job has been already qdel-ed so no need to retry
assert not os.path.exists(bin_path / "qdel_output")
else:
assert "qdel executed" in (bin_path / "qdel_output").read_text()
assert error_msg in (bin_path / "qdel_error").read_text()


@pytest.mark.usefixtures("capturing_qsub")
Expand Down

0 comments on commit 5ab5f56

Please sign in to comment.