Skip to content

Commit

Permalink
Have OpenPBS driver use qstat -w option
Browse files Browse the repository at this point in the history
* Have OpenPBS driver use qstat -w option
  • Loading branch information
jonathan-eq authored Mar 15, 2024
1 parent 1e6d3fa commit bfbbe09
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 28 deletions.
20 changes: 2 additions & 18 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,6 @@ async def submit(
self._iens2jobid[iens] = job_id_
self._non_finished_job_ids.add(job_id_)

def _expand_truncated_jobids(
self, truncated_jobs: Dict[str, Dict[str, Dict[str, str]]]
) -> Dict[str, Dict[str, Dict[str, str]]]:
"""Job ids gotten through normal qstat are truncated at length 16"""
data = {}
for truncated_job_id, job_state in truncated_jobs["Jobs"].items():
full_job_ids = [
job_id for job_id in self._jobs if job_id.startswith(truncated_job_id)
]
assert len(full_job_ids) == 1
data[full_job_ids[0]] = job_state
return {"Jobs": data}

async def kill(self, iens: int) -> None:
if iens not in self._iens2jobid:
logger.error(f"PBS kill failed due to missing jobid for realization {iens}")
Expand Down Expand Up @@ -252,6 +239,7 @@ async def poll(self) -> None:
process = await asyncio.create_subprocess_exec(
"qstat",
"-x",
"-w", # wide format
*self._non_finished_job_ids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
Expand All @@ -267,11 +255,7 @@ async def poll(self) -> None:
f"qstat gave returncode {QSTAT_UNKNOWN_JOB_ID} "
f"with message {stderr.decode(errors='ignore')}"
)
stat = _Stat(
**self._expand_truncated_jobids(
parse_qstat(stdout.decode(errors="ignore"))
)
)
stat = _Stat(**parse_qstat(stdout.decode(errors="ignore")))
for job_id, job in stat.jobs.items():
if isinstance(job, FinishedJob):
self._non_finished_job_ids.remove(job_id)
Expand Down
5 changes: 3 additions & 2 deletions tests/integration_tests/scheduler/bin/qstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def parse_args() -> Namespace:
ap.add_argument("-x", action="store_true", required=True)
ap.add_argument("-F", default="")
ap.add_argument("jobs", nargs="*")

ap.add_argument("-w", action="store_true")
return ap.parse_args()


Expand Down Expand Up @@ -67,8 +67,9 @@ def main() -> None:
user = "mock"
time = "00:00:00"
queue = "mocked"

print(
f"{job:16.16} {name:16.16} {user:16.16} {time:8.8} {state:1.1} {queue:5.5}"
f"{job:30.30} {name:15.15} {user:15.15} {time:8.8} {state:1.1} {queue:5.5}"
)

if args.F == "json":
Expand Down
17 changes: 9 additions & 8 deletions tests/unit_tests/scheduler/test_openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,23 +201,24 @@ async def test_cluster_label():


QSTAT_HEADER = (
"Job id Name User Time Use S Queue\n"
"---------------- ---------------- ---------------- -------- - -----\n"
"Job id Name User Time Use S Queue\n"
"----------------------------- --------------- --------------- -------- - ---------------\n"
)
formatter = "%-30s %-15s %-15s %-8s %-1s %-5s"


@pytest.mark.parametrize(
"qstat_script, started_expected",
[
pytest.param(
f"echo '{QSTAT_HEADER}1 foo someuser 0 R normal'; exit 0",
f"echo '{QSTAT_HEADER}';printf '{formatter}' 1 foo someuser 0 R normal; exit 0",
True,
id="all-good",
),
pytest.param(
(
f"echo '{QSTAT_HEADER}'; "
"echo '1 foo someuser 0 R normal'"
f"printf '{formatter}' 1 foo someuser 0 R normal"
),
True,
id="all-good-properly-formatted",
Expand All @@ -233,19 +234,19 @@ async def test_cluster_label():
id="empty_cluster_specific_id",
),
pytest.param(
"echo '1 foo someuser 0 Z normal'",
f"printf '{formatter}' 1 foo someuser 0 Z normal",
False,
id="unknown_jobstate_token_from_pbs", # Never observed
),
pytest.param(
f"echo '{QSTAT_HEADER}1 foo someuser 0 R normal'; "
f"echo '{QSTAT_HEADER}'; printf '{formatter}' 1 foo someuser 0 R normal; "
"echo 'qstat: Unknown Job Id 2' >&2 ; exit 153",
# If we have some success and some failures, actual command returns 153
True,
id="error_for_irrelevant_job_id",
),
pytest.param(
f"echo '{QSTAT_HEADER}2 foo someuser 0 R normal'",
f"echo '{QSTAT_HEADER}'; printf '{formatter}' 2 foo someuser 0 R normal",
False,
id="wrong-job-id",
),
Expand Down Expand Up @@ -283,7 +284,7 @@ async def started(iens):

with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(
poll(driver, expected=set(), started=started), timeout=0.1
poll(driver, expected=set(), started=started), timeout=0.5
)

assert was_started == started_expected
Expand Down

0 comments on commit bfbbe09

Please sign in to comment.