Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to SSH queue scan command #191

Merged
merged 11 commits into from
Jun 18, 2024
9 changes: 4 additions & 5 deletions sisyphus/aws_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ def system_call(self, command, send_to_stdin=None):
system_command = command

logging.debug("shell_cmd: %s" % " ".join(system_command))
p = subprocess.Popen(system_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if send_to_stdin:
send_to_stdin = send_to_stdin.encode()
out, err = p.communicate(input=send_to_stdin, timeout=30)
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)

def fix_output(o):
"""
Expand All @@ -105,9 +104,9 @@ def fix_output(o):
assert False
return o[:-1]

out = fix_output(out)
err = fix_output(err)
retval = p.wait(timeout=30)
out = fix_output(p.stdout)
err = fix_output(p.stderr)
retval = p.returncode

return out, err, retval

Expand Down
9 changes: 4 additions & 5 deletions sisyphus/load_sharing_facility_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ def system_call(self, command, send_to_stdin=None):
system_command = command

logging.debug("shell_cmd: %s" % " ".join(system_command))
p = subprocess.Popen(system_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if send_to_stdin:
send_to_stdin = send_to_stdin.encode()
out, err = p.communicate(input=send_to_stdin, timeout=30)
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)

def fix_output(o):
# split output and drop last empty line
Expand All @@ -69,9 +68,9 @@ def fix_output(o):
assert False
return o[:-1]

out = fix_output(out)
err = fix_output(err)
retval = p.wait(timeout=30)
out = fix_output(p.stdout)
err = fix_output(p.stderr)
retval = p.returncode

# Check for ssh error
err_ = []
Expand Down
13 changes: 7 additions & 6 deletions sisyphus/simple_linux_utility_for_resource_management_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,17 @@ def system_call(self, command, send_to_stdin=None):
"""
if self.gateway:
escaped_command = [shlex.quote(s) for s in command] # parameters need to be shell safe when sending via ssh
system_command = ["ssh", "-x", self.gateway] + [" ".join(["cd", os.getcwd(), "&&"] + escaped_command)]
system_command = ["ssh", "-x", self.gateway, "-o", "BatchMode=yes"] + [
" ".join(["cd", os.getcwd(), "&&"] + escaped_command)
]
else:
# no gateway given, skip ssh local
system_command = command

logging.debug("shell_cmd: %s" % " ".join(system_command))
p = subprocess.Popen(system_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if send_to_stdin:
send_to_stdin = send_to_stdin.encode()
out, err = p.communicate(input=send_to_stdin, timeout=30)
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, don't you need to catch TimeoutExpired?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right, I'll ignore any TimeoutExpired.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what does it mean? What happens now? The whole manager crashes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I had any TimeoutExpired exception, my error was within the timeout because of a public key mismatch, and it was actually outputted within the first second after the command was run:

[2024-06-13 07:24:26,842] INFO: Submit to queue: work/i6_core/returnn/extract_prior/ReturnnComputePriorJobV2.yYMFtGVTZgnn run [1]
[2024-06-13 07:24:26,953] ERROR: Error to submit job, return value: 255

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I realize now, in the code before your change, we also did not catch TimeoutExpired (in the p.wait(timeout=30)). I wonder why we did not catch it, and what happened when this exception was thrown there. Do you know? (As I understood you, you have run into this exact problem.)

Maybe we should not change this and also not catch TimeoutExpired? @critias?


def fix_output(o):
"""
Expand All @@ -109,9 +110,9 @@ def fix_output(o):
assert False
return o[:-1]

out = fix_output(out)
err = fix_output(err)
retval = p.wait(timeout=30)
out = fix_output(p.stdout)
err = fix_output(p.stderr)
retval = p.returncode

# Check for ssh error
err_ = []
Expand Down
13 changes: 7 additions & 6 deletions sisyphus/son_of_grid_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,17 @@ def system_call(self, command, send_to_stdin=None):
:rtype: list[bytes], list[bytes], int
"""
if self.gateway:
system_command = ["ssh", "-x", self.gateway] + [" ".join(["cd", os.getcwd(), "&&"] + command)]
system_command = ["ssh", "-x", self.gateway, "-o", "BatchMode=yes"] + [
" ".join(["cd", os.getcwd(), "&&"] + command)
]
else:
# no gateway given, skip ssh local
system_command = command

logging.debug("shell_cmd: %s" % " ".join(system_command))
p = subprocess.Popen(system_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if send_to_stdin:
send_to_stdin = send_to_stdin.encode()
out, err = p.communicate(input=send_to_stdin, timeout=30)
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)

def fix_output(o):
"""
Expand All @@ -110,9 +111,9 @@ def fix_output(o):
assert False
return o[:-1]

out = fix_output(out)
err = fix_output(err)
retval = p.wait(timeout=30)
out = fix_output(p.stdout)
err = fix_output(p.stderr)
retval = p.returncode

# Check for ssh error
err_ = []
Expand Down
Loading