Skip to content

Commit

Permalink
Make it work with subprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre committed Jan 15, 2025
1 parent 75f3202 commit ff67cc6
Showing 1 changed file with 58 additions and 66 deletions.
124 changes: 58 additions & 66 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
from functools import partial
from concurrent import futures


from metaflow.datastore.exceptions import DataException
from itertools import chain
from contextlib import contextmanager

from . import get_namespace
from .metadata_provider import MetaDatum
from .metaflow_config import MAX_ATTEMPTS, UI_URL
from .metaflow_config import SPIN_ALLOWED_DECORATORS
from .exception import (
MetaflowException,
MetaflowInternalError,
Expand Down Expand Up @@ -102,8 +105,6 @@ def __init__(
self._entrypoint = entrypoint
self._event_logger = event_logger
self._monitor = monitor
self._args = args or []
self._kwargs = kwargs or {}

self._step_func = step_func
self._task_pathspec = task_pathspec
Expand All @@ -113,19 +114,25 @@ def __init__(
self._whitelist_decorators = None
self._config_file_name = None
self._max_log_size = max_log_size
self._run_queue = []
self._poll = procpoll.make_poll()
self._workers = {} # fd -> subprocess mapping
self._finished = {}

# Create a new run_id for the spin task
self._run_id = self._metadata.new_run_id()
print(
f"New run_id for spin task: {self._run_id} and step func: {self._step_func.name}"
)

print(
f"Decorators for {self._step_func.name}: {list(self._step_func.decorators)}"
)

for deco in self._step_func.decorators:
print(
f"Running runtime_init for {deco.__class__.__name__} at {self._step_func.name}"
)
print("-" * 100)
deco.runtime_init(flow, graph, package, self._run_id)
if hasattr(deco, "_metaflow_home"):
print(f"Metaflow home is {deco._metaflow_home}")

print(f"Input paths: {self.input_paths}")

Expand Down Expand Up @@ -208,83 +215,68 @@ def execute(self):
else:
self._config_file_name = None

task = self._new_task(self._step_func.name, {})
self.task = self._new_task(self._step_func.name, {})
_ds = self._flow_datastore.get_task_datastore(
self._run_id, self._step_func.name, task.task_id, attempt=0, mode="w"
self._run_id, self._step_func.name, self.task.task_id, attempt=0, mode="w"
)

for deco in self.whitelist_decorators:
deco.runtime_task_created(
_ds,
task.task_id,
self.task.task_id,
self.split_index,
self.input_paths,
is_cloned=False,
ubf_context=None,
)

# Start a new worker to spin a step
worker = Worker(task, self._max_log_size, self._config_file_name, spin=True)
for fd in worker.fds():
self._workers[fd] = worker
self._poll.add(fd)
self.launch_spin()

finished_tasks = list(self._poll_workers())
try:
pass
except KeyboardInterrupt as ex:
self._logger("Workflow interrupted.", system_msg=True, bad=True)
self._killall()
exception = ex
raise

except Exception as ex:
self._logger("Workflow failed.", system_msg=True, bad=True)
self._killall()
exception = ex
raise
finally:
# on finish clean tasks
for step in self._flow:
for deco in step.decorators:
deco.runtime_finished(exception)

def _launch_spin(self):
args = CLIArgs(self.task, self.spin)
env = dict(os.environ)
# Start a new worker to spin a step
# on finish clean tasks
exception = None
for deco in self.whitelist_decorators:
deco.runtime_finished(exception)

for deco in self.task.decos:
deco.runtime_step_cli(
args,
self.task.retries,
self.task.user_code_retries,
self.task.ubf_context,
)
def launch_spin(self):
args = CLIArgs(self.task, spin=True)
env = dict(os.environ)

# Add user configurations using a file to avoid using up too much space on the
# command line
if self._config_file_name:
args.top_level_options["local-config-file"] = self._config_file_name

print(f"Args Entrypoint updated is {args.entrypoint}")
env.update(args.get_env())
env["PYTHONUNBUFFERED"] = "x"
cmdline = args.get_args()
print(f"Command line is: {cmdline}")

process = subprocess.Popen(
cmdline,
env=env,
bufsize=1,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
for deco in self.task.decos:
deco.runtime_step_cli(
args,
self.task.retries,
self.task.user_code_retries,
self.task.ubf_context,
)

# Read and print subprocess output
stdout, stderr = process.communicate()
print(f"stdout: {stdout.decode()}")
print(f"stderr: {stderr.decode()}")
# Add user configurations using a file to avoid using up too much space on the
# command line
if self._config_file_name:
args.top_level_options["local-config-file"] = self._config_file_name

print(f"Args Entrypoint updated is {args.entrypoint}")
env.update(args.get_env())
env["PYTHONUNBUFFERED"] = "x"
cmdline = args.get_args()
print(f"Command line is: {cmdline}")

process = subprocess.Popen(
cmdline,
env=env,
bufsize=1,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)

# Read and print subprocess output
stdout, stderr = process.communicate()
print("STDOUT:\n")
print(f"{stdout.decode()}")
print("-" * 100)
print("STDERR:\n")
print(f"stderr: {stderr.decode()}")


class NativeRuntime(object):
Expand Down

0 comments on commit ff67cc6

Please sign in to comment.