Skip to content

Commit

Permalink
Fix TimeoutError not being propagated
Browse files Browse the repository at this point in the history
This was the root cause of several jobs with the same features being submitted, even though they had already been submitted
  • Loading branch information
Icemole committed Jul 3, 2024
1 parent 1be2f3c commit 9467ce0
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 30 deletions.
18 changes: 7 additions & 11 deletions sisyphus/load_sharing_facility_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def system_call(self, command, send_to_stdin=None):
try:
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)
except subprocess.TimeoutExpired:
logging.warning("Timeout expired for command: %s" % " ".join(system_command))
logging.warning(self._system_call_timeout_warn_msg(system_command))
return [], ["TimeoutExpired".encode()], -1

def fix_output(o):
Expand Down Expand Up @@ -187,12 +187,10 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, rangestring):
)

while True:
try:
logging.info("bsub_call: %s" % bsub_call)
logging.info("command: %s" % command)
out, err, retval = self.system_call(bsub_call, command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
logging.info("bsub_call: %s" % bsub_call)
logging.info("command: %s" % command)
out, err, retval = self.system_call(bsub_call, command)
if retval != 0:
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down Expand Up @@ -250,10 +248,8 @@ def queue_state(self):
# get bjobs output
system_command = ["bjobs", "-w"]
while True:
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(system_command))
out, err, retval = self.system_call(system_command)
if retval != 0:
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down
15 changes: 5 additions & 10 deletions sisyphus/simple_linux_utility_for_resource_management_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def system_call(self, command, send_to_stdin=None):
try:
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)
except subprocess.TimeoutExpired:
logging.warning("Timeout expired for command: %s" % " ".join(system_command))
logging.warning(self._system_call_timeout_warn_msg(system_command))
return [], ["TimeoutExpired".encode()], -1

def fix_output(o):
Expand Down Expand Up @@ -234,13 +234,10 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id,
sbatch_call += self.options(rqmt)

sbatch_call += ["-a", "%i-%i:%i" % (start_id, end_id, step_size)]
command = '"' + " ".join(call) + '"'
sbatch_call += ["--wrap=%s" % " ".join(call)]
while True:
try:
out, err, retval = self.system_call(sbatch_call)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
out, err, retval = self.system_call(sbatch_call)
if retval != 0:
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down Expand Up @@ -309,10 +306,8 @@ def queue_state(self):
"arrayjobid,arraytaskid,state,name:1000",
]
while True:
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(system_command))
out, err, retval = self.system_call(system_command)
if retval != 0:
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down
14 changes: 5 additions & 9 deletions sisyphus/son_of_grid_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def system_call(self, command, send_to_stdin=None):
try:
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)
except subprocess.TimeoutExpired:
logging.warning("Timeout expired for command: %s" % " ".join(system_command))
logging.warning(self._system_call_timeout_warn_msg(system_command))
return [], ["TimeoutExpired".encode()], -1

def fix_output(o):
Expand Down Expand Up @@ -252,10 +252,8 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id,
qsub_call += ["-t", "%i-%i:%i" % (start_id, end_id, step_size)]
command = " ".join(call) + "\n"
while True:
try:
out, err, retval = self.system_call(qsub_call, command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
out, err, retval = self.system_call(qsub_call, command)
if retval != 0:
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down Expand Up @@ -319,10 +317,8 @@ def queue_state(self):
# get qstat output
system_command = ["qstat", "-xml", "-u", getpass.getuser()]
while True:
try:
out, err, retval = self.system_call(system_command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(system_command))
out, err, retval = self.system_call(system_command)
if retval != 0:
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down

0 comments on commit 9467ce0

Please sign in to comment.