Skip to content

Commit

Permalink
runner fixes and enhancements (#2053)
Browse files Browse the repository at this point in the history
* fixes

* fixes to handler

* try removing clear and set os envs and metadata

* only reset metadata

* still need to reset os envs
  • Loading branch information
madhur-ob authored Sep 24, 2024
1 parent 822449b commit cbfaea9
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 109 deletions.
43 changes: 24 additions & 19 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import inspect
import os
import json
import sys
import traceback
from datetime import datetime
from functools import wraps

import metaflow.tracing as tracing
from metaflow._vendor import click
from metaflow.client.core import get_metadata

from . import decorators, lint, metaflow_version, namespace, parameters, plugins
from .cli_args import cli_args
Expand Down Expand Up @@ -698,15 +699,17 @@ def resume(
runtime.print_workflow_info()

runtime.persist_constants()
write_file(
runner_attribute_file,
"%s@%s:%s"
% (
obj.metadata.__class__.TYPE,
obj.metadata.__class__.INFO,
"/".join((obj.flow.name, runtime.run_id)),
),
)

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
json.dump(
{
"run_id": runtime.run_id,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
},
f,
)

# We may skip clone-only resume if this is not a resume leader,
# and clone is already complete.
Expand Down Expand Up @@ -774,15 +777,17 @@ def run(
obj.flow._set_constants(obj.graph, kwargs)
runtime.print_workflow_info()
runtime.persist_constants()
write_file(
runner_attribute_file,
"%s@%s:%s"
% (
obj.metadata.__class__.TYPE,
obj.metadata.__class__.INFO,
"/".join((obj.flow.name, runtime.run_id)),
),
)

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
json.dump(
{
"run_id": runtime.run_id,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
},
f,
)
runtime.execute()


Expand Down
49 changes: 2 additions & 47 deletions metaflow/runner/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,11 @@
import functools
import tempfile

from subprocess import CalledProcessError
from typing import Optional, Dict, ClassVar

from metaflow.exception import MetaflowNotFound
from metaflow.runner.subprocess_manager import CommandManager, SubprocessManager
from metaflow.runner.utils import read_from_file_when_ready


def handle_timeout(
tfp_runner_attribute, command_obj: CommandManager, file_read_timeout: int
):
"""
Handle the timeout for a running subprocess command that reads a file
and raises an error with appropriate logs if a TimeoutError occurs.
Parameters
----------
tfp_runner_attribute : NamedTemporaryFile
Temporary file that stores runner attribute data.
command_obj : CommandManager
Command manager object that encapsulates the running command details.
file_read_timeout : int
Timeout for reading the file.
Returns
-------
str
Content read from the temporary file.
Raises
------
RuntimeError
If a TimeoutError occurs, it raises a RuntimeError with the command's
stdout and stderr logs.
"""
try:
content = read_from_file_when_ready(
tfp_runner_attribute.name, command_obj, timeout=file_read_timeout
)
return content
except (CalledProcessError, TimeoutError) as e:
stdout_log = open(command_obj.log_files["stdout"]).read()
stderr_log = open(command_obj.log_files["stderr"]).read()
command = " ".join(command_obj.command)
error_message = "Error executing: '%s':\n" % command
if stdout_log.strip():
error_message += "\nStdout:\n%s\n" % stdout_log
if stderr_log.strip():
error_message += "\nStderr:\n%s\n" % stderr_log
raise RuntimeError(error_message) from e
from metaflow.runner.subprocess_manager import SubprocessManager
from metaflow.runner.utils import handle_timeout


def get_lower_level_group(
Expand Down
47 changes: 22 additions & 25 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import importlib
import os
import sys
import json
import tempfile

from subprocess import CalledProcessError
from typing import Dict, Iterator, Optional, Tuple

from metaflow import Run, metadata

from .utils import clear_and_set_os_environ, read_from_file_when_ready
from .utils import handle_timeout, clear_and_set_os_environ
from .subprocess_manager import CommandManager, SubprocessManager


Expand Down Expand Up @@ -102,16 +102,19 @@ def status(self) -> str:
for executing the run.
The return value is one of the following strings:
- `timeout` indicates that the run timed out.
- `running` indicates a currently executing run.
- `failed` indicates a failed run.
- `successful` a successful run.
- `successful` indicates a successful run.
Returns
-------
str
The current status of the run.
"""
if self.command_obj.process.returncode is None:
if self.command_obj.timeout:
return "timeout"
elif self.command_obj.process.returncode is None:
return "running"
elif self.command_obj.process.returncode != 0:
return "failed"
Expand Down Expand Up @@ -271,28 +274,22 @@ def __get_executing_run(self, tfp_runner_attribute, command_obj):

# It is thus necessary to set them to correct values before we return
# the Run object.
try:
# Set the environment variables to what they were before the run executed.
clear_and_set_os_environ(self.old_env)

# Set the correct metadata from the runner_attribute file corresponding to this run.
content = read_from_file_when_ready(
tfp_runner_attribute.name, command_obj, timeout=self.file_read_timeout
)
metadata_for_flow, pathspec = content.rsplit(":", maxsplit=1)
metadata(metadata_for_flow)
run_object = Run(pathspec, _namespace_check=False)
return ExecutingRun(self, command_obj, run_object)
except (CalledProcessError, TimeoutError) as e:
stdout_log = open(command_obj.log_files["stdout"]).read()
stderr_log = open(command_obj.log_files["stderr"]).read()
command = " ".join(command_obj.command)
error_message = "Error executing: '%s':\n" % command
if stdout_log.strip():
error_message += "\nStdout:\n%s\n" % stdout_log
if stderr_log.strip():
error_message += "\nStderr:\n%s\n" % stderr_log
raise RuntimeError(error_message) from e
content = handle_timeout(
tfp_runner_attribute, command_obj, self.file_read_timeout
)
content = json.loads(content)
pathspec = "%s/%s" % (content.get("flow_name"), content.get("run_id"))

# Set the environment variables to what they were before the run executed.
clear_and_set_os_environ(self.old_env)

# Set the correct metadata from the runner_attribute file corresponding to this run.
metadata_for_flow = content.get("metadata")
metadata(metadata_for_flow)

run_object = Run(pathspec, _namespace_check=False)
return ExecutingRun(self, command_obj, run_object)

def run(self, **kwargs) -> ExecutingRun:
"""
Expand Down
50 changes: 33 additions & 17 deletions metaflow/runner/subprocess_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ class SubprocessManager(object):
def __init__(self):
self.commands: Dict[int, CommandManager] = {}

try:
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT,
lambda: self._handle_sigint(signum=signal.SIGINT, frame=None),
)
except RuntimeError:
signal.signal(signal.SIGINT, self._handle_sigint)

def _handle_sigint(self, signum, frame):
for each_command in self.commands.values():
each_command.kill(termination_timeout=2)

async def __aenter__(self) -> "SubprocessManager":
return self

Expand Down Expand Up @@ -83,6 +96,7 @@ def run_command(
command_obj = CommandManager(command, env, cwd)
pid = command_obj.run(show_output=show_output)
self.commands[pid] = command_obj
command_obj.sync_wait()
return pid

async def async_run_command(
Expand Down Expand Up @@ -169,11 +183,12 @@ def __init__(
self.cwd = cwd if cwd is not None else os.getcwd()

self.process = None
self.stdout_thread = None
self.stderr_thread = None
self.run_called: bool = False
self.timeout: bool = False
self.log_files: Dict[str, str] = {}

signal.signal(signal.SIGINT, self._handle_sigint)

async def __aenter__(self) -> "CommandManager":
return self

Expand Down Expand Up @@ -214,13 +229,22 @@ async def wait(
else:
await asyncio.wait_for(self.emit_logs(stream), timeout)
except asyncio.TimeoutError:
self.timeout = True
command_string = " ".join(self.command)
await self.kill()
self.kill(termination_timeout=2)
print(
"Timeout: The process (PID %d; command: '%s') did not complete "
"within %s seconds." % (self.process.pid, command_string, timeout)
)

def sync_wait(self):
if not self.run_called:
raise RuntimeError("No command run yet to wait for...")

self.process.wait()
self.stdout_thread.join()
self.stderr_thread.join()

def run(self, show_output: bool = False):
"""
Run the subprocess synchronously. This can only be called once.
Expand Down Expand Up @@ -265,22 +289,17 @@ def stream_to_stdout_and_file(pipe, log_file):

self.run_called = True

stdout_thread = threading.Thread(
self.stdout_thread = threading.Thread(
target=stream_to_stdout_and_file,
args=(self.process.stdout, stdout_logfile),
)
stderr_thread = threading.Thread(
self.stderr_thread = threading.Thread(
target=stream_to_stdout_and_file,
args=(self.process.stderr, stderr_logfile),
)

stdout_thread.start()
stderr_thread.start()

self.process.wait()

stdout_thread.join()
stderr_thread.join()
self.stdout_thread.start()
self.stderr_thread.start()

return self.process.pid
except Exception as e:
Expand Down Expand Up @@ -441,13 +460,13 @@ def cleanup(self):
if self.run_called:
shutil.rmtree(self.temp_dir, ignore_errors=True)

async def kill(self, termination_timeout: float = 5):
def kill(self, termination_timeout: float = 2):
"""
Kill the subprocess and its descendants.
Parameters
----------
termination_timeout : float, default 5
termination_timeout : float, default 2
The time to wait after sending a SIGTERM to the process and its descendants
before sending a SIGKILL.
"""
Expand All @@ -457,9 +476,6 @@ async def kill(self, termination_timeout: float = 5):
else:
print("No process to kill.")

def _handle_sigint(self, signum, frame):
asyncio.create_task(self.kill())


async def main():
flow_file = "../try.py"
Expand Down
Loading

0 comments on commit cbfaea9

Please sign in to comment.