Skip to content

Introduces fanout, rework execution model, further additions #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Mar 24, 2025

Conversation

merschformann
Copy link
Member

@merschformann merschformann commented Jan 26, 2025

Description

Changes the step execution model to a simple thread pool based one and introduces fanout.

Changes

  • Uses a simpler thread-based execution engine for better control and data exchange between steps.
    • We are aware of Python's parallel CPU utilization limitations due to the GIL, but are looking forward to newer Python releases removing the limitation. The advantages appear to outweigh the disadvantages long-term.
  • Adds own thread-pool implementation for full control.
  • Introduces @foreach and @join for dynamic fanout.
  • Adds support for defining app run options from results of steps via AppRunConfig.
  • Adds support for getting the full nextmv.cloud.AppResult (including run metadata) instead of just the "output" (full_result=True).
  • Prepares graph state communication.
  • Introduces a pipeline config.
  • Improves readmes.
  • Reduces unnecessary time when waiting for runs.

Preview

This change allows modeling pipelines that dynamically fanout. E.g., the following pipeline spins up 3 app runs for by cloning the input but modifying the app parameter per run.

class Flow(FlowSpec):
    @foreach()  # Run the successor step for each item in the result list of this step
    @step
    def prepare(data: dict):
        """
        Creates 3 copies of the input and configures them for 3 different app parameters.
        """
        inputs = [copy.deepcopy(data) for _ in range(3)]
        run_configs = [AppRunConfig(input, [AppOption("param", i)]) for i, input in enumerate(inputs)]
        return run_configs

    @app(app_id="echo")
    @needs(predecessors=[prepare])
    @step
    def solve():
        """
        Runs the model.
        """
        pass

    @needs(predecessors=[solve])
    @join()  # Collect the results from the previous 'foreach' step and combine them into a list passed as the arg
    @step
    def merge(results: list[dict]):
        """Merges the results."""
        return results

Resolves ENG-5614, ENG-5642

@merschformann merschformann changed the title Reworks thread execution model Introduces fanout, rework execution model, further additions Mar 21, 2025
@merschformann merschformann marked this pull request as ready for review March 23, 2025 16:01
@merschformann merschformann merged commit d02d8a7 into develop Mar 24, 2025
8 checks passed
@merschformann merschformann deleted the merschformann/simplify-step-execution branch March 24, 2025 13:53
@cnpryer cnpryer mentioned this pull request Mar 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants