Skip to content

Commit

Permalink
Allow additional commands to be injected directly on the Runner() (#2192
Browse files Browse the repository at this point in the history
)

This allows extensions to add classes with associated methods like
Runner("myflow.py", ...).cmd().subcommand()

As an example:
```
class RunnerCLI(object):
    name = "environment"

    def __init__(self, runner: "metaflow.Runner"):
        """
        Environment command for the flow. Requires the conda environment.
        """
        self.runner = runner

    def resolve(
        self,
        steps_to_resolve: Optional[List[str]] = None,
        arch: Optional[Union[str, List[str]]] = None,
        alias: Optional[List[str]] = None,
        force: bool = False,
        dry_run: bool = False,
        timeout: int = 1200,
    ) -> Dict[str, Dict[str, ResolvedEnvironment]]:
```

Would add a "environment" command which takes no argument and add the
`resolve` subcommand.
  • Loading branch information
romain-intel authored Jan 7, 2025
1 parent 950428e commit a7042c7
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
1 change: 1 addition & 0 deletions metaflow/extension_support/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def resolve_plugins(category, path_only=False):
"cli": lambda x: (
list(x.commands)[0] if len(x.commands) == 1 else "too many commands"
),
"runner_cli": lambda x: x.name,
}


Expand Down
13 changes: 13 additions & 0 deletions metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
("logs", ".logs_cli.cli"),
]

# Add additional commands to the runner here
# These will be accessed using Runner().<command>()
RUNNER_CLIS_DESC = []


from .test_unbounded_foreach_decorator import InternalTestUnboundedForeachInput

# Add new step decorators here
Expand Down Expand Up @@ -168,6 +173,14 @@ def get_plugin_cli_path():
return resolve_plugins("cli", path_only=True)


def get_runner_cli():
return resolve_plugins("runner_cli")


def get_runner_cli_path():
return resolve_plugins("runner_cli", path_only=True)


STEP_DECORATORS = resolve_plugins("step_decorator")
FLOW_DECORATORS = resolve_plugins("flow_decorator")
ENVIRONMENTS = resolve_plugins("environment")
Expand Down
28 changes: 25 additions & 3 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from metaflow import Run

from metaflow.plugins import get_runner_cli

from .utils import (
temporary_fifo,
handle_timeout,
Expand Down Expand Up @@ -187,7 +189,27 @@ async def stream_log(
yield position, line


class Runner(object):
class RunnerMeta(type):
def __new__(mcs, name, bases, dct):
cls = super().__new__(mcs, name, bases, dct)

def _injected_method(subcommand_name, runner_subcommand):
def f(self, *args, **kwargs):
return runner_subcommand(self, *args, **kwargs)

f.__doc__ = runner_subcommand.__doc__ or ""
f.__name__ = subcommand_name

return f

for runner_subcommand in get_runner_cli():
method_name = runner_subcommand.name.replace("-", "_")
setattr(cls, method_name, _injected_method(method_name, runner_subcommand))

return cls


class Runner(metaclass=RunnerMeta):
"""
Metaflow's Runner API that presents a programmatic interface
to run flows and perform other operations either synchronously or asynchronously.
Expand Down Expand Up @@ -337,7 +359,7 @@ def run(self, **kwargs) -> ExecutingRun:

return self.__get_executing_run(attribute_file_fd, command_obj)

def resume(self, **kwargs):
def resume(self, **kwargs) -> ExecutingRun:
"""
Blocking resume execution of the run.
This method will wait until the resumed run has completed execution.
Expand Down Expand Up @@ -400,7 +422,7 @@ async def async_run(self, **kwargs) -> ExecutingRun:

return await self.__async_get_executing_run(attribute_file_fd, command_obj)

async def async_resume(self, **kwargs):
async def async_resume(self, **kwargs) -> ExecutingRun:
"""
Non-blocking resume execution of the run.
This method will return as soon as the resume has launched.
Expand Down

0 comments on commit a7042c7

Please sign in to comment.