Skip to content

Commit

Permalink
Have lsf driver bkill with SIGKILL after SIGTERM
Browse files Browse the repository at this point in the history
This commits makes the LSF driver spawn a non-blocking separate process running `bkill -s SIGKILL` to make sure the job is actually torn down.
  • Loading branch information
jonathan-eq committed Mar 22, 2024
1 parent a71a6df commit d6140b4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 8 deletions.
25 changes: 21 additions & 4 deletions src/clib/lib/job_queue/lsf_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,13 +735,19 @@ void lsf_driver_kill_job(void *_driver, void *_job) {
char **argv = (char **)calloc(2, sizeof *argv);
CHECK_ALLOC(argv);
argv[0] = driver->remote_lsf_server;
argv[1] = saprintf("%s %s %s", driver->bkill_cmd, "-s SIGKILL",
argv[1] = saprintf("%s %s %s", driver->bkill_cmd, "-s SIGTERM",
job->lsf_jobnr_char);

spawn_blocking(driver->rsh_cmd, 2, (const char **)argv, NULL, NULL);

free(argv[1]);
free(argv);

char **argv2 = (char **)calloc(2, sizeof *argv2);
argv2[0] = driver->remote_lsf_server;
argv2[1] = saprintf("%s %s %s %s %s", "sleep 30;", driver->bkill_cmd,
"-s", "SIGKILL", job->lsf_jobnr_char);
spawn(driver->rsh_cmd, 2, (const char **)argv2, NULL, NULL);
free(argv2[0]);
free(argv2);

} else if (driver->submit_method == LSF_SUBMIT_LOCAL_SHELL) {
char **argv = (char **)calloc(3, sizeof *argv);
CHECK_ALLOC(argv);
Expand All @@ -753,6 +759,17 @@ void lsf_driver_kill_job(void *_driver, void *_job) {
free(argv[1]);
free(argv[2]);
free(argv);

char **argv2 = (char **)calloc(2, sizeof *argv2);
CHECK_ALLOC(argv2);
argv2[0] = saprintf("%s", "-c");
argv2[1] = saprintf("%s %s %s %s %s", "sleep 30;", driver->bkill_cmd,
"-s", "SIGKILL", job->lsf_jobnr_char);
spawn((const char *)saprintf("%s", "/bin/sh"), 2, (const char **)argv2,
NULL, NULL);
free(argv2[0]);
free(argv2[1]);
free(argv2);
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import shlex
import shutil
import stat
import subprocess
import tempfile
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -115,6 +116,7 @@ def __init__(
self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
self._max_attempt: int = 100
self._sleep_time_between_bkills = 30
self._sleep_time_between_cmd_retries = 3
self._bsub_retries = 10

Expand Down Expand Up @@ -191,12 +193,16 @@ async def kill(self, iens: int) -> None:
process = await asyncio.create_subprocess_exec(
self._bkill_cmd,
"-s",
"SIGKILL",
"SIGTERM",
job_id,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
_sigkill_process = subprocess.Popen(
f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL {job_id}",
shell=True,
)
if process.returncode:
logger.error(
f"LSF kill failed with returncode {process.returncode} "
Expand Down
20 changes: 17 additions & 3 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,34 @@ async def test_kill(
bkill_path.write_text(
f"#!/bin/sh\necho '{bkill_stdout}'\n"
f"echo '{bkill_stderr}' >&2\n"
f"echo $@ > 'bkill_args'\n"
f"echo $@ >> 'bkill_args'\n"
f"exit {bkill_returncode}",
encoding="utf-8",
)
bkill_path.chmod(bkill_path.stat().st_mode | stat.S_IEXEC)

driver = LsfDriver()
driver._iens2jobid = mocked_iens2jobid
driver._sleep_time_between_bkills = 0

await driver.kill(iens_to_kill)

async def wait_for_sigkill_in_file():
while True:
bkill_args = (
Path("bkill_args").read_text(encoding="utf-8").strip().split("\n")
)
if f"-s SIGKILL {mocked_iens2jobid[iens_to_kill]}" in bkill_args:
break
await asyncio.sleep(0.1)

if expected_logged_error:
assert expected_logged_error in caplog.text
else:
bkill_args = Path("bkill_args").read_text(encoding="utf-8").strip()
assert f"-s SIGKILL {mocked_iens2jobid[iens_to_kill]}" in bkill_args
bkill_args = Path("bkill_args").read_text(encoding="utf-8").strip().split("\n")
assert f"-s SIGTERM {mocked_iens2jobid[iens_to_kill]}" in bkill_args

await asyncio.wait_for(wait_for_sigkill_in_file(), timeout=5)


@given(st.text())
Expand Down

0 comments on commit d6140b4

Please sign in to comment.