From a7042c72179eb8fe966bd8288bc575d4216329ab Mon Sep 17 00:00:00 2001 From: Romain Date: Mon, 6 Jan 2025 23:19:55 -0800 Subject: [PATCH] Allow additional commands to be injected directly on the Runner() (#2192) This allows extensions to add classes with associated methods like Runner("myflow.py", ...).cmd().subcommand() As an example: ``` class RunnerCLI(object): name = "environment" def __init__(self, runner: "metaflow.Runner"): """ Environment command for the flow. Requires the conda environment. """ self.runner = runner def resolve( self, steps_to_resolve: Optional[List[str]] = None, arch: Optional[Union[str, List[str]]] = None, alias: Optional[List[str]] = None, force: bool = False, dry_run: bool = False, timeout: int = 1200, ) -> Dict[str, Dict[str, ResolvedEnvironment]]: ``` Would add a "environment" command which takes no argument and add the `resolve` subcommand. --- metaflow/extension_support/plugins.py | 1 + metaflow/plugins/__init__.py | 13 +++++++++++++ metaflow/runner/metaflow_runner.py | 28 ++++++++++++++++++++++++--- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/metaflow/extension_support/plugins.py b/metaflow/extension_support/plugins.py index 8007917793b..dcd151501a1 100644 --- a/metaflow/extension_support/plugins.py +++ b/metaflow/extension_support/plugins.py @@ -197,6 +197,7 @@ def resolve_plugins(category, path_only=False): "cli": lambda x: ( list(x.commands)[0] if len(x.commands) == 1 else "too many commands" ), + "runner_cli": lambda x: x.name, } diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 0e6ff820cba..4409fab1a35 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -19,6 +19,11 @@ ("logs", ".logs_cli.cli"), ] +# Add additional commands to the runner here +# These will be accessed using Runner().() +RUNNER_CLIS_DESC = [] + + from .test_unbounded_foreach_decorator import InternalTestUnboundedForeachInput # Add new step decorators here @@ -168,6 +173,14 @@ def get_plugin_cli_path(): return resolve_plugins("cli", path_only=True) +def get_runner_cli(): + return resolve_plugins("runner_cli") + + +def get_runner_cli_path(): + return resolve_plugins("runner_cli", path_only=True) + + STEP_DECORATORS = resolve_plugins("step_decorator") FLOW_DECORATORS = resolve_plugins("flow_decorator") ENVIRONMENTS = resolve_plugins("environment") diff --git a/metaflow/runner/metaflow_runner.py b/metaflow/runner/metaflow_runner.py index 835a6ddbf9c..d1656599d85 100644 --- a/metaflow/runner/metaflow_runner.py +++ b/metaflow/runner/metaflow_runner.py @@ -7,6 +7,8 @@ from metaflow import Run +from metaflow.plugins import get_runner_cli + from .utils import ( temporary_fifo, handle_timeout, @@ -187,7 +189,27 @@ async def stream_log( yield position, line -class Runner(object): +class RunnerMeta(type): + def __new__(mcs, name, bases, dct): + cls = super().__new__(mcs, name, bases, dct) + + def _injected_method(subcommand_name, runner_subcommand): + def f(self, *args, **kwargs): + return runner_subcommand(self, *args, **kwargs) + + f.__doc__ = runner_subcommand.__doc__ or "" + f.__name__ = subcommand_name + + return f + + for runner_subcommand in get_runner_cli(): + method_name = runner_subcommand.name.replace("-", "_") + setattr(cls, method_name, _injected_method(method_name, runner_subcommand)) + + return cls + + +class Runner(metaclass=RunnerMeta): """ Metaflow's Runner API that presents a programmatic interface to run flows and perform other operations either synchronously or asynchronously. @@ -337,7 +359,7 @@ def run(self, **kwargs) -> ExecutingRun: return self.__get_executing_run(attribute_file_fd, command_obj) - def resume(self, **kwargs): + def resume(self, **kwargs) -> ExecutingRun: """ Blocking resume execution of the run. This method will wait until the resumed run has completed execution. @@ -400,7 +422,7 @@ async def async_run(self, **kwargs) -> ExecutingRun: return await self.__async_get_executing_run(attribute_file_fd, command_obj) - async def async_resume(self, **kwargs): + async def async_resume(self, **kwargs) -> ExecutingRun: """ Non-blocking resume execution of the run. This method will return as soon as the resume has launched.