Skip to content

Commit

Permalink
Initial commit for spin steps
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre committed Jan 15, 2025
1 parent a82d761 commit 75f3202
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 31 deletions.
50 changes: 44 additions & 6 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ def config_merge_cb(ctx, param, value):
"step": "metaflow.cli_components.step_cmd.step",
"run": "metaflow.cli_components.run_cmds.run",
"resume": "metaflow.cli_components.run_cmds.resume",
"spin": "metaflow.cli_components.run_cmds.spin",
"spin-internal": "metaflow.cli_components.step_cmd.spin_internal",
},
)
def cli(ctx):
Expand Down Expand Up @@ -384,7 +386,6 @@ def start(
# second one processed will return the actual options. The order of processing
# depends on what (and in what order) the user specifies on the command line.
config_options = config_file or config_value

if (
hasattr(ctx, "saved_args")
and ctx.saved_args
Expand Down Expand Up @@ -462,14 +463,10 @@ def start(
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor = MONITOR_SIDECARS[monitor](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
Expand All @@ -485,6 +482,47 @@ def start(

ctx.obj.config_options = config_options

# Override values for spin
if hasattr(ctx, "saved_args") and ctx.saved_args and ctx.saved_args[0] == "spin":
# For spin, we will only use the local metadata provider, datastore, environment
# and null event logger and monitor
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)
ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == "local"][0]
datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config(
ctx.obj.echo
)
ctx.obj.datastore_impl.datastore_root = datastore_root

FlowDataStore.default_storage_impl = ctx.obj.datastore_impl
ctx.obj.flow_datastore = FlowDataStore(
ctx.obj.flow.name,
ctx.obj.environment,
ctx.obj.metadata,
ctx.obj.event_logger,
ctx.obj.monitor,
)
echo(
"Using local metadata provider, datastore, environment, and null event logger and monitor for spin."
)
print(f"Using metadata provider: {ctx.obj.metadata}")
echo(f"Using Datastore root: {datastore_root}")
echo(f"Using Flow Datastore: {ctx.obj.flow_datastore}")

# Start event logger and monitor
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

decorators._init(ctx.obj.flow)

# It is important to initialize flow decorators early as some of the
Expand Down Expand Up @@ -528,7 +566,7 @@ def start(
if (
hasattr(ctx, "saved_args")
and ctx.saved_args
and ctx.saved_args[0] not in ("run", "resume")
and ctx.saved_args[0] not in ("run", "resume", "spin")
):
# run/resume are special cases because they can add more decorators with --with,
# so they have to take care of themselves.
Expand Down
97 changes: 81 additions & 16 deletions metaflow/cli_components/run_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from ..graph import FlowGraph
from ..metaflow_current import current
from ..package import MetaflowPackage
from ..runtime import NativeRuntime
from ..runtime import NativeRuntime, SpinRuntime
from ..system import _system_logger

from ..tagging_util import validate_tags
from ..util import get_latest_run_id, write_latest_run_id
from ..util import get_latest_run_id, write_latest_run_id, get_latest_task_pathspec


def before_run(obj, tags, decospecs):
Expand Down Expand Up @@ -70,6 +70,28 @@ def write_file(file_path, content):
f.write(str(content))


def common_runner_options(func):
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)

return wrapper


def common_run_options(func):
@click.option(
"--tag",
Expand Down Expand Up @@ -110,20 +132,6 @@ def common_run_options(func):
"option multiple times to attach multiple decorators "
"in steps.",
)
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
Expand Down Expand Up @@ -167,6 +175,7 @@ def wrapper(*args, **kwargs):
@click.argument("step-to-rerun", required=False)
@click.command(help="Resume execution of a previous run of this flow.")
@common_run_options
@common_runner_options
@click.pass_obj
def resume(
obj,
Expand Down Expand Up @@ -285,6 +294,7 @@ def resume(
@click.command(help="Run the workflow locally.")
@tracing.cli_entrypoint("cli/run")
@common_run_options
@common_runner_options
@click.option(
"--namespace",
"user_namespace",
Expand Down Expand Up @@ -360,3 +370,58 @@ def run(
f,
)
runtime.execute()


@click.command(help="Spins up a step locally")
@click.argument(
"step-name",
required=True,
type=str,
)
@click.option(
"--task-pathspec",
default=None,
show_default=True,
help="Task ID to use when spinning up the step. The spinned up step will use the artifacts"
"corresponding to this task ID. If not provided, an arbitrary task ID from the latest run will be used.",
)
@common_runner_options
@click.pass_obj
def spin(
obj,
step_name,
task_pathspec=None,
run_id_file=None,
runner_attribute_file=None,
**kwargs
):
before_run(obj, [], [])
if task_pathspec is None:
task_pathspec = get_latest_task_pathspec(obj.flow.name, step_name)

obj.echo(
f"Spinning up step *{step_name}* locally with task pathspec *{task_pathspec}*"
)
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
step_func = getattr(obj.flow, step_name)

spin_runtime = SpinRuntime(
obj.flow,
obj.graph,
obj.flow_datastore,
obj.metadata,
obj.environment,
obj.package,
obj.logger,
obj.entrypoint,
obj.event_logger,
obj.monitor,
step_func,
task_pathspec,
)

# write_latest_run_id(obj, runtime.run_id)
# write_file(run_id_file, runtime.run_id)

spin_runtime.execute()
pass
75 changes: 75 additions & 0 deletions metaflow/cli_components/step_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,78 @@ def step(
)

echo("Success", fg="green", bold=True, indent=True)


@click.command(help="Internal command to spin a single task.", hidden=True)
@click.argument("step-name")
@click.option(
"--run-id",
default=None,
required=True,
help="Run ID for the step that's about to be spun",
)
@click.option(
"--task-id",
default=None,
required=True,
help="Task ID for the step that's about to be spun",
)
@click.option(
"--input-paths",
help="A comma-separated list of pathspecs specifying inputs for this step.",
)
@click.option(
"--split-index",
type=int,
default=None,
show_default=True,
help="Index of this foreach split.",
)
@click.option(
"--retry-count",
default=0,
help="How many times we have attempted to run this task.",
)
@click.option(
"--max-user-code-retries",
default=0,
help="How many times we should attempt running the user code.",
)
@click.option(
"--namespace",
"namespace",
default=None,
help="Change namespace from the default (your username) to the specified tag.",
)
@click.pass_context
def spin_internal(
ctx,
step_name,
run_id=None,
task_id=None,
input_paths=None,
split_index=None,
retry_count=None,
max_user_code_retries=None,
namespace=None,
):
if ctx.obj.is_quiet:
echo = echo_dev_null
else:
echo = echo_always
print("I am here 1")
print("I am here 2")
# echo("Spinning a task, *%s*" % step_name, fg="magenta", bold=False)

task = MetaflowTask(
ctx.obj.flow,
ctx.obj.flow_datastore, # local datastore
ctx.obj.metadata, # local metadata provider
ctx.obj.environment, # local environment
ctx.obj.echo,
ctx.obj.event_logger, # null logger
ctx.obj.monitor, # null monitor
None, # no unbounded foreach context
)
# echo("Task is: ", task)
# pass
8 changes: 8 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@
"DEFAULT_FROM_DEPLOYMENT_IMPL", "argo-workflows"
)

###
# Spin configuration
###
SPIN_ALLOWED_DECORATORS = from_conf(
"SPIN_ALLOWED_DECORATORS", ["conda", "pypi", "environment"]
)


###
# User configuration
###
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/pypi/conda_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def task_pre_step(
def runtime_step_cli(
self, cli_args, retry_count, max_user_code_retries, ubf_context
):
print("Let's go - I am here")
if self.disabled:
return
# Ensure local installation of Metaflow is visible to user code
Expand Down
Loading

0 comments on commit 75f3202

Please sign in to comment.