Workflow orchestration - what goes where? #233
Replies: 1 comment
-
Checks: Persistent data/historyDoes it allow for persistent storing of workflow data? (E.g., what is the data retention policy. Prefect Cloud is 7 days for flow run history, self-hosted Prefect is linked back to a database specified by the user operated) With Prefect, I might be able to send history/logs to MongoDB or similar. See logging (Prefect docs), how to stream prefect logs to a file and Related:
Human-in-the-loopDoes it allow for manual input/interaction from users? See https://docs.prefect.io/latest/concepts/flows/#waiting-for-input-when-pausing-or-suspending-a-flow-run for example and https://docs.prefect.io/guides/creating-human-in-the-loop-workflows/. See also this Jan 30, 2024 blog post: https://www.prefect.io/blog/unveiling-interactive-workflows. This is really key for almost fully autonomous workflows, I think from prefect import flow, get_run_logger, pause_flow_run
from prefect.input import RunInput
class UserNameInput(RunInput):
name: str
@flow
async def greet_user():
logger = get_run_logger()
user_input = await pause_flow_run(
wait_for_input=UserNameInput
)
logger.info(f"Hello, {user_input.name}!") Pause/resume and cancel/restartDoes it allow for smooth restarts, where it can cache results prior to the failed step, so it doesn't have to be recalculated? (See https://docs.prefect.io/latest/concepts/tasks/#caching) from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def hello_task(name_input):
# Doing some work
print("Saying hello")
return "hello " + name_input
@flow
def hello_flow(name_input):
hello_task(name_input) Avoiding accidental spending
from prefect import task, get_run_logger
import time
@task(timeout_seconds=1)
def show_timeouts():
logger = get_run_logger()
logger.info("I will execute")
time.sleep(5)
logger.info("I will not execute") DependenciesDoes it have the ability to specify conditions/dependencies (i.e., when/without needing to pass info back and forth between them)? https://docs.prefect.io/latest/concepts/tasks/#wait-for @task
def task_1():
pass
@task
def task_2():
pass
@flow
def my_flow():
x = task_1()
# task 2 will wait for task_1 to complete
y = task_2(wait_for=[x]) Visualizing workflowsHow to handle visualizing dynamic workflows? See visualizing flow structure and mapping (i.e., like a for loop, but where there's no interaction between iterations, and can be handled by Prefect natively I think) AsynchronyAside: Native support for await and async (see a general explanation in the context of wetlab experiments) See also https://discourse.prefect.io/t/best-practices-for-using-async-in-prefect-2-0/1361. Free-tier hosting and long runsDoes it come with free-tier hosting? Does it allow for long-running executions (i.e., like a Bayesian optimization script that can run for weeks). Integrations with cloud providersAWS, Google Cloud, etc. As an aside, it's nice that Prefect allows you to submit work to certain cloud providers without having to have a separate worker for it. Not exactly sure how this plays with the expiration limits on flow runs. If it shows up as a flow run, then it's still limited to the 7 or 14 days. |
Beta Was this translation helpful? Give feedback.
-
Let's take the case of using a Bayesian optimizer with a workflow orchestration package. Assuming Ax as the BO package and Prefect as the workflow orchestration, with the following minimal working examples:
Ax
Prefect
and knowing that Prefect has a UI and offers a free-tier Prefect managed server, where does Ax fit within Prefect, and vice-versa? At a higher level, this depends on how a user is going to interact with the system (oversight, maintenance, cooperation, etc.). It is also important to consider more complex cases, such as when asynchronous evaluations come into play, or the Prefect managed server doesn't have the computational power required to build the BO models.
It gets more complex when we start thinking about how hardware/software communication and data management plays into this (e.g., MQTT and MongoDB). For example, the Ax script could be modified to pull all pertinent data from a database before suggesting the next experiment. The hardware/software communication is relatively straightforward; after an experiment has been suggested, the function that runs the experiment is the one that communicates with the hardware.
Ax has its own implementation of interacting with SQL databases, and I struggled with whether to use MongoDB or a SQL solution that would be directly compatible with Ax. Even now, I still think my leaning towards MongoDB is a reasonable decision because of the free-tier and the flexible schema.
Some questions:
task
decorator on existing Ax functions (i.e., use decorators functionally viatask(some_ax_function)
)? What happens when this is a class method? (ax_client.get_next_trial
,ax_client.complete_trial
,ax_client.get_best_parameters
) (NOTE: the recommendation seems to be to use a new class with inheritance)Ax also has a Scheduler API, for which there is some overlap with Prefect (scheduling to external systems, resuming from a database backend). Typically I've avoided the Scheduler API because it's quite a bit more complicated than the Service API.
Guiding principles
(mildly and somewhat jokingly based on the language style in the Zen of Python)
Aside: consider how/whether Ray would fit into the stack (and also how Python queues with MQTT might help simplify/reduce some of the complexity of running AxClient, i.e., using the queue's
get()
method and keeping the trial data handy to then pass tocomplete_trial
)Beta Was this translation helpful? Give feedback.
All reactions