From cbfaea9e469093f06eaeeada04919ca0f71170f5 Mon Sep 17 00:00:00 2001 From: madhur-ob <155637867+madhur-ob@users.noreply.github.com> Date: Tue, 24 Sep 2024 22:51:39 +0530 Subject: [PATCH] runner fixes and enhancements (#2053) * fixes * fixes to handler * try removing clear and set os envs and metadata * only reset metadata * still need to reset os envs --- metaflow/cli.py | 43 +++++++++++---------- metaflow/runner/deployer.py | 49 +----------------------- metaflow/runner/metaflow_runner.py | 47 +++++++++++------------ metaflow/runner/subprocess_manager.py | 50 ++++++++++++++++--------- metaflow/runner/utils.py | 54 ++++++++++++++++++++++++++- 5 files changed, 134 insertions(+), 109 deletions(-) diff --git a/metaflow/cli.py b/metaflow/cli.py index 29c6701cc71..270a1bd6216 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -1,5 +1,5 @@ import inspect -import os +import json import sys import traceback from datetime import datetime @@ -7,6 +7,7 @@ import metaflow.tracing as tracing from metaflow._vendor import click +from metaflow.client.core import get_metadata from . import decorators, lint, metaflow_version, namespace, parameters, plugins from .cli_args import cli_args @@ -698,15 +699,17 @@ def resume( runtime.print_workflow_info() runtime.persist_constants() - write_file( - runner_attribute_file, - "%s@%s:%s" - % ( - obj.metadata.__class__.TYPE, - obj.metadata.__class__.INFO, - "/".join((obj.flow.name, runtime.run_id)), - ), - ) + + if runner_attribute_file: + with open(runner_attribute_file, "w") as f: + json.dump( + { + "run_id": runtime.run_id, + "flow_name": obj.flow.name, + "metadata": get_metadata(), + }, + f, + ) # We may skip clone-only resume if this is not a resume leader, # and clone is already complete. @@ -774,15 +777,17 @@ def run( obj.flow._set_constants(obj.graph, kwargs) runtime.print_workflow_info() runtime.persist_constants() - write_file( - runner_attribute_file, - "%s@%s:%s" - % ( - obj.metadata.__class__.TYPE, - obj.metadata.__class__.INFO, - "/".join((obj.flow.name, runtime.run_id)), - ), - ) + + if runner_attribute_file: + with open(runner_attribute_file, "w") as f: + json.dump( + { + "run_id": runtime.run_id, + "flow_name": obj.flow.name, + "metadata": get_metadata(), + }, + f, + ) runtime.execute() diff --git a/metaflow/runner/deployer.py b/metaflow/runner/deployer.py index 5d19d641b97..87b0bcb140c 100644 --- a/metaflow/runner/deployer.py +++ b/metaflow/runner/deployer.py @@ -6,56 +6,11 @@ import functools import tempfile -from subprocess import CalledProcessError from typing import Optional, Dict, ClassVar from metaflow.exception import MetaflowNotFound -from metaflow.runner.subprocess_manager import CommandManager, SubprocessManager -from metaflow.runner.utils import read_from_file_when_ready - - -def handle_timeout( - tfp_runner_attribute, command_obj: CommandManager, file_read_timeout: int -): - """ - Handle the timeout for a running subprocess command that reads a file - and raises an error with appropriate logs if a TimeoutError occurs. - - Parameters - ---------- - tfp_runner_attribute : NamedTemporaryFile - Temporary file that stores runner attribute data. - command_obj : CommandManager - Command manager object that encapsulates the running command details. - file_read_timeout : int - Timeout for reading the file. - - Returns - ------- - str - Content read from the temporary file. - - Raises - ------ - RuntimeError - If a TimeoutError occurs, it raises a RuntimeError with the command's - stdout and stderr logs. - """ - try: - content = read_from_file_when_ready( - tfp_runner_attribute.name, command_obj, timeout=file_read_timeout - ) - return content - except (CalledProcessError, TimeoutError) as e: - stdout_log = open(command_obj.log_files["stdout"]).read() - stderr_log = open(command_obj.log_files["stderr"]).read() - command = " ".join(command_obj.command) - error_message = "Error executing: '%s':\n" % command - if stdout_log.strip(): - error_message += "\nStdout:\n%s\n" % stdout_log - if stderr_log.strip(): - error_message += "\nStderr:\n%s\n" % stderr_log - raise RuntimeError(error_message) from e +from metaflow.runner.subprocess_manager import SubprocessManager +from metaflow.runner.utils import handle_timeout def get_lower_level_group( diff --git a/metaflow/runner/metaflow_runner.py b/metaflow/runner/metaflow_runner.py index 61247d19cfb..161d1706d62 100644 --- a/metaflow/runner/metaflow_runner.py +++ b/metaflow/runner/metaflow_runner.py @@ -1,14 +1,14 @@ import importlib import os import sys +import json import tempfile -from subprocess import CalledProcessError from typing import Dict, Iterator, Optional, Tuple from metaflow import Run, metadata -from .utils import clear_and_set_os_environ, read_from_file_when_ready +from .utils import handle_timeout, clear_and_set_os_environ from .subprocess_manager import CommandManager, SubprocessManager @@ -102,16 +102,19 @@ def status(self) -> str: for executing the run. The return value is one of the following strings: + - `timeout` indicates that the run timed out. - `running` indicates a currently executing run. - `failed` indicates a failed run. - - `successful` a successful run. + - `successful` indicates a successful run. Returns ------- str The current status of the run. """ - if self.command_obj.process.returncode is None: + if self.command_obj.timeout: + return "timeout" + elif self.command_obj.process.returncode is None: return "running" elif self.command_obj.process.returncode != 0: return "failed" @@ -271,28 +274,22 @@ def __get_executing_run(self, tfp_runner_attribute, command_obj): # It is thus necessary to set them to correct values before we return # the Run object. - try: - # Set the environment variables to what they were before the run executed. - clear_and_set_os_environ(self.old_env) - # Set the correct metadata from the runner_attribute file corresponding to this run. - content = read_from_file_when_ready( - tfp_runner_attribute.name, command_obj, timeout=self.file_read_timeout - ) - metadata_for_flow, pathspec = content.rsplit(":", maxsplit=1) - metadata(metadata_for_flow) - run_object = Run(pathspec, _namespace_check=False) - return ExecutingRun(self, command_obj, run_object) - except (CalledProcessError, TimeoutError) as e: - stdout_log = open(command_obj.log_files["stdout"]).read() - stderr_log = open(command_obj.log_files["stderr"]).read() - command = " ".join(command_obj.command) - error_message = "Error executing: '%s':\n" % command - if stdout_log.strip(): - error_message += "\nStdout:\n%s\n" % stdout_log - if stderr_log.strip(): - error_message += "\nStderr:\n%s\n" % stderr_log - raise RuntimeError(error_message) from e + content = handle_timeout( + tfp_runner_attribute, command_obj, self.file_read_timeout + ) + content = json.loads(content) + pathspec = "%s/%s" % (content.get("flow_name"), content.get("run_id")) + + # Set the environment variables to what they were before the run executed. + clear_and_set_os_environ(self.old_env) + + # Set the correct metadata from the runner_attribute file corresponding to this run. + metadata_for_flow = content.get("metadata") + metadata(metadata_for_flow) + + run_object = Run(pathspec, _namespace_check=False) + return ExecutingRun(self, command_obj, run_object) def run(self, **kwargs) -> ExecutingRun: """ diff --git a/metaflow/runner/subprocess_manager.py b/metaflow/runner/subprocess_manager.py index 12468ae450b..c8016244ea0 100644 --- a/metaflow/runner/subprocess_manager.py +++ b/metaflow/runner/subprocess_manager.py @@ -42,6 +42,19 @@ class SubprocessManager(object): def __init__(self): self.commands: Dict[int, CommandManager] = {} + try: + loop = asyncio.get_running_loop() + loop.add_signal_handler( + signal.SIGINT, + lambda: self._handle_sigint(signum=signal.SIGINT, frame=None), + ) + except RuntimeError: + signal.signal(signal.SIGINT, self._handle_sigint) + + def _handle_sigint(self, signum, frame): + for each_command in self.commands.values(): + each_command.kill(termination_timeout=2) + async def __aenter__(self) -> "SubprocessManager": return self @@ -83,6 +96,7 @@ def run_command( command_obj = CommandManager(command, env, cwd) pid = command_obj.run(show_output=show_output) self.commands[pid] = command_obj + command_obj.sync_wait() return pid async def async_run_command( @@ -169,11 +183,12 @@ def __init__( self.cwd = cwd if cwd is not None else os.getcwd() self.process = None + self.stdout_thread = None + self.stderr_thread = None self.run_called: bool = False + self.timeout: bool = False self.log_files: Dict[str, str] = {} - signal.signal(signal.SIGINT, self._handle_sigint) - async def __aenter__(self) -> "CommandManager": return self @@ -214,13 +229,22 @@ async def wait( else: await asyncio.wait_for(self.emit_logs(stream), timeout) except asyncio.TimeoutError: + self.timeout = True command_string = " ".join(self.command) - await self.kill() + self.kill(termination_timeout=2) print( "Timeout: The process (PID %d; command: '%s') did not complete " "within %s seconds." % (self.process.pid, command_string, timeout) ) + def sync_wait(self): + if not self.run_called: + raise RuntimeError("No command run yet to wait for...") + + self.process.wait() + self.stdout_thread.join() + self.stderr_thread.join() + def run(self, show_output: bool = False): """ Run the subprocess synchronously. This can only be called once. @@ -265,22 +289,17 @@ def stream_to_stdout_and_file(pipe, log_file): self.run_called = True - stdout_thread = threading.Thread( + self.stdout_thread = threading.Thread( target=stream_to_stdout_and_file, args=(self.process.stdout, stdout_logfile), ) - stderr_thread = threading.Thread( + self.stderr_thread = threading.Thread( target=stream_to_stdout_and_file, args=(self.process.stderr, stderr_logfile), ) - stdout_thread.start() - stderr_thread.start() - - self.process.wait() - - stdout_thread.join() - stderr_thread.join() + self.stdout_thread.start() + self.stderr_thread.start() return self.process.pid except Exception as e: @@ -441,13 +460,13 @@ def cleanup(self): if self.run_called: shutil.rmtree(self.temp_dir, ignore_errors=True) - async def kill(self, termination_timeout: float = 5): + def kill(self, termination_timeout: float = 2): """ Kill the subprocess and its descendants. Parameters ---------- - termination_timeout : float, default 5 + termination_timeout : float, default 2 The time to wait after sending a SIGTERM to the process and its descendants before sending a SIGKILL. """ @@ -457,9 +476,6 @@ async def kill(self, termination_timeout: float = 5): else: print("No process to kill.") - def _handle_sigint(self, signum, frame): - asyncio.create_task(self.kill()) - async def main(): flow_file = "../try.py" diff --git a/metaflow/runner/utils.py b/metaflow/runner/utils.py index 2a24e7ef8a6..4a644bd9ca4 100644 --- a/metaflow/runner/utils.py +++ b/metaflow/runner/utils.py @@ -1,6 +1,7 @@ import os import ast import time +import asyncio from subprocess import CalledProcessError from typing import Dict, TYPE_CHECKING @@ -40,6 +41,13 @@ def clear_and_set_os_environ(env: Dict): os.environ.update(env) +def check_process_status(command_obj: "CommandManager"): + if isinstance(command_obj.process, asyncio.subprocess.Process): + return command_obj.process.returncode is not None + else: + return command_obj.process.poll() is not None + + def read_from_file_when_ready( file_path: str, command_obj: "CommandManager", timeout: float = 5 ): @@ -47,7 +55,7 @@ def read_from_file_when_ready( with open(file_path, "r", encoding="utf-8") as file_pointer: content = file_pointer.read() while not content: - if command_obj.process.poll() is not None: + if check_process_status(command_obj): # Check to make sure the file hasn't been read yet to avoid a race # where the file is written between the end of this while loop and the # poll call above. @@ -64,3 +72,47 @@ def read_from_file_when_ready( time.sleep(0.1) content = file_pointer.read() return content + + +def handle_timeout( + tfp_runner_attribute, command_obj: "CommandManager", file_read_timeout: int +): + """ + Handle the timeout for a running subprocess command that reads a file + and raises an error with appropriate logs if a TimeoutError occurs. + + Parameters + ---------- + tfp_runner_attribute : NamedTemporaryFile + Temporary file that stores runner attribute data. + command_obj : CommandManager + Command manager object that encapsulates the running command details. + file_read_timeout : int + Timeout for reading the file. + + Returns + ------- + str + Content read from the temporary file. + + Raises + ------ + RuntimeError + If a TimeoutError occurs, it raises a RuntimeError with the command's + stdout and stderr logs. + """ + try: + content = read_from_file_when_ready( + tfp_runner_attribute.name, command_obj, timeout=file_read_timeout + ) + return content + except (CalledProcessError, TimeoutError) as e: + stdout_log = open(command_obj.log_files["stdout"]).read() + stderr_log = open(command_obj.log_files["stderr"]).read() + command = " ".join(command_obj.command) + error_message = "Error executing: '%s':\n" % command + if stdout_log.strip(): + error_message += "\nStdout:\n%s\n" % stdout_log + if stderr_log.strip(): + error_message += "\nStderr:\n%s\n" % stderr_log + raise RuntimeError(error_message) from e