Skip to content

Commit

Permalink
Merge pull request #12 from geobeyond/fix_flow_run_details
Browse files Browse the repository at this point in the history
Improvements: fix several issues + add cli command to cancel flow run
  • Loading branch information
francbartoli authored Jul 12, 2022
2 parents 4360688 + d4c4058 commit 6f42ad6
Show file tree
Hide file tree
Showing 17 changed files with 223 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches:
- main
- master
- develop

jobs:
release:
Expand Down
43 changes: 34 additions & 9 deletions fastflows/cli/flow_runs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import typer
import sys
from fastflows.schemas.flow_run import StateBase, FlowRunStateEnum
from rich import print as rprint
from fastflows.core.flow_run import (
update_flow_run_state,
get_flow_run_details,
Expand All @@ -23,27 +25,50 @@ def list_flow_runs(
f"Get list flow runs for flow {'with name' if not flow_id else 'with id'}: {flow_name}"
)
result = get_flow_runs_list(flow_name, flow_id)
typer.echo(result)
rprint(result)


@flow_runs.command(name="state", help="Get details for flow_run_id")
@flow_runs.command(name="details", help="Get details for flow_run_id")
@catch_exceptions
def flow_run_state(flow_run_id: str):
typer.echo(f"Get flow run state for flow_run_id {flow_run_id}")
result = get_flow_run_details(flow_run_id)
typer.echo(result)
def flow_run_state(
flow_run_id: str,
graph: bool = typer.Option(False, help="Get graph for flow_run_id"),
):
if graph:
message = "Get graph for flow_run_id"
else:
message = "Get details for flow_run_id"
typer.echo(f"{message} {flow_run_id}")
result = get_flow_run_details(flow_run_id, graph)
rprint(result)


@flow_runs.command(name="cancel", help="Cancel flow run by flow_run_id")
@catch_exceptions
def cancel_flow_run(flow_run_id: str):
typer.echo(f"Cancel flow run with flow_run_id {flow_run_id}")
result = update_flow_run_state(
flow_run_id, state=StateBase(type=FlowRunStateEnum.CANCELLED.value)
)
rprint(result)


@flow_runs.command(name="update", help="Update flow run state by flow_run_id")
@catch_exceptions
def update_flow_run(
flow_run_id: str,
state: str = typer.Option(
FlowRunStateEnum.CANCELLED.value,
callback=lambda v: v.upper(),
None,
callback=lambda v: v.upper() if v else None,
help="State to set for new flow run - by default 'Scheduled' (mean will be runned imidiatly)",
),
):

if state is None:
typer.secho(
"You should provide state to set to Flow Run", fg=typer.colors.RED, err=True
)
sys.exit(1)
typer.echo(f"Set state {state} for flow_run_id {flow_run_id}")
result = update_flow_run_state(flow_run_id, state=StateBase(type=state))
typer.echo(result)
rprint(result)
6 changes: 2 additions & 4 deletions fastflows/cli/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fastflows.schemas.flow_run import FlowRunInput
from fastflows.schemas.deployment import DeploymentInputParams
from fastflows.core.utils.parse_data import parse_schedule_line, parse_tags_line
from fastflows.core.flow import run_flow, list_flows_from_file_home, deploy_flows
from fastflows.core.flow import run_flow, list_flows, deploy_flows
from fastflows.cli.utils import (
catch_exceptions,
process_params_from_str,
Expand Down Expand Up @@ -54,9 +54,7 @@ def run(
def list(flow_path: Optional[str] = configuration.FLOWS_HOME):
"""List all flows from FLOWS_HOME"""
typer.echo("\nAll flows from FLOWS_HOME: \n")
for flow in list_flows_from_file_home(flow_path):
typer.echo(flow)
typer.echo("\n")
typer.echo(f"\nAvailable flows: {list_flows(flow_path)}\n")


@flows_app.command()
Expand Down
2 changes: 1 addition & 1 deletion fastflows/core/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def register_and_deploy(

# get flows from Prefect
self.compare_cache_with_prefect()

flows_updated = False

flows_in_folder = self._get_flows_from_path(flow_input)
Expand Down
12 changes: 4 additions & 8 deletions fastflows/core/flow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
from fastflows.schemas.flow import Flow, FlowDeployInput
from fastflows.schemas.flow_run import FlowRunResponse, FlowRunInput
from fastflows.errors import FlowNotFound
Expand Down Expand Up @@ -63,14 +63,10 @@ def get_flow_runs_list(flow_name: str, by_id: bool) -> List[FlowRunResponse]:
return provider.list_flow_runs(flow_id)


def list_flows():
# flows that already was registered in Prefect
return catalog


def list_flows_from_file_home(flow_path: str) -> List[Flow]:
def list_flows(flow_path: Optional[str]) -> List[Flow]:
# they cannot be registered in Prefect, just list from FLOWS_HOME
return list(Catalog(flow_path).process_flows())
Catalog(flow_path).register_and_deploy()
return list(catalog.keys())


def deploy_flows(
Expand Down
12 changes: 8 additions & 4 deletions fastflows/core/flow_run.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from fastflows.schemas.flow_run import StateBase

from fastflows.schemas.flow_run import StateBase, FlowRunResponseGraph, FlowRunResponse
from typing import Union
from fastflows.providers import provider


def get_flow_run_details(flow_run_id: str):
def get_flow_run_details(
flow_run_id: str, graph: bool
) -> Union[FlowRunResponse, FlowRunResponseGraph]:
"""
:param flow_run_id: Flow Run Id in Prefect to get info about
"""
return provider.get_flow_run_details(flow_run_id)
if not graph:
return provider.get_flow_run_details(flow_run_id)
return provider.get_flow_run_graph(flow_run_id)


def update_flow_run_state(flow_run_id: str, state: StateBase):
Expand Down
10 changes: 10 additions & 0 deletions fastflows/providers/prefect.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
InitFlowRun,
StateBase,
UpdateStateResponse,
FlowRunResponseGraph,
)
from typing import List, Optional, Dict

Expand Down Expand Up @@ -113,6 +114,15 @@ def get_flow_run_details(self, flow_run_id: str) -> List[FlowRunResponse]:
response = self.client.get(f"{self.uri}/flow_runs/{flow_run_id}")
return response

@api_response_handler(
message="Error while getting FlowRun graph.",
append_api_error=True,
response_model=List[FlowRunResponseGraph],
)
def get_flow_run_graph(self, flow_run_id: str) -> List[FlowRunResponseGraph]:
response = self.client.get(f"{self.uri}/flow_runs/{flow_run_id}/graph")
return response

@api_response_handler(
message=f"Prefect does not answer on Healthcheck. Looks like Prefect server is unavailable on address {cfg.PREFECT_URI}",
)
Expand Down
19 changes: 17 additions & 2 deletions fastflows/routers/flow_runs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from fastapi import APIRouter
from fastflows.schemas.flow_run import FlowRunState, StateBase, UpdateStateResponse
from fastflows.schemas.flow_run import (
FlowRunResponse,
StateBase,
UpdateStateResponse,
FlowRunResponseGraph,
)
from fastflows.core.flow_run import (
update_flow_run_state,
get_flow_run_details,
)
from typing import List
from fastflows.routers import handle_rest_errors

router = APIRouter(prefix="/flow-runs", tags=["flows"])


@router.get("/{flow_run_id}", response_model=FlowRunState)
@router.get("/{flow_run_id}", response_model=FlowRunResponse)
@handle_rest_errors
async def get_flow_run_details_route(flow_run_id: str):
"""
Expand All @@ -18,6 +24,15 @@ async def get_flow_run_details_route(flow_run_id: str):
return get_flow_run_details(flow_run_id)


@router.get("/{flow_run_id}/graph", response_model=List[FlowRunResponseGraph])
@handle_rest_errors
async def get_flow_run_graph_route(flow_run_id: str):
"""
:param flow_run_id: Flow Run Id in Prefect to get graph for it
"""
return get_flow_run_details(flow_run_id, graph=True)


@router.patch("/{flow_run_id}/state", response_model=UpdateStateResponse)
@handle_rest_errors
async def update_flow_run_state_route(
Expand Down
26 changes: 23 additions & 3 deletions fastflows/schemas/flow_run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
from enum import Enum
from pydantic import BaseModel, Field
from typing import Optional, List
from pydantic import BaseModel, Field, Json
from typing import Optional, List, Union
from fastflows.schemas.deployment import FlowRunner
from fastflows.schemas.misc import Status, Details

Expand Down Expand Up @@ -35,12 +35,22 @@ class FlowRunInput(BaseModel):
parameters: Optional[dict] = Field(default_factory=dict)


class FlowRunBlobData(BaseModel):
data: str
block_document_id: Optional[str]


class DataObject(BaseModel):
encoding: str
blob: Json


class State(StateBase):
id: str
name: str
timestamp: datetime.datetime
message: Optional[str]
data: Optional[str]
data: Optional[Union[DataObject, str]]
state_details: Optional[StateDetails]


Expand Down Expand Up @@ -94,3 +104,13 @@ class UpdateStateResponse(BaseModel):
state: Optional[State]
status: Status
details: Details


class FlowRunResponseGraph(FlowRunState):
upstream_dependencies: List[Union[str, dict]]
state_details: Optional[StateDetails]
expected_start_time: datetime.datetime
start_time: datetime.datetime
end_time: datetime.datetime
total_run_time: int
estimated_run_time: int
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fastflows"
version = "0.1.0.dev.1657483278"
version = "0.1.0.dev.1657623062"
description = "FastFlows is a FastAPI server & command line tool to comunicate with Prefect 2.0 as a Workflow manager to deploy, run, track flows and more."
authors = ["Francesco Bartoli <[email protected]>", "Iuliia Volkova <[email protected]>"]
license = "MIT"
Expand Down
18 changes: 18 additions & 0 deletions tests/api_tests/flows/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,21 @@ def test_update_flow_run_state_422(client: TestClient) -> None:
response_body = response.json()
assert response.status_code == 422
assert "value is not a valid uui" in response_body["detail"]


def test_flow_run_by_name_with_params(client: TestClient) -> None:
response = client.post(
"/flows/name/Params Flow", json={"parameters": {"name": "value1"}}
)
response_body = response.json()
assert response.status_code == 200
assert response_body["id"]
assert response_body["parameters"] == {"name": "value1"}


def test_get_flow_run_graph(flow_run: FlowRunResponse, client: TestClient) -> None:
response = client.get(f"/flow-runs/{flow_run.id}/graph")
response_body = response.json()
assert response.status_code == 200
# need to wait to complete flow for graph to be generated
assert response_body == []
11 changes: 5 additions & 6 deletions tests/api_tests/flows/test_flows_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_flow_create_by_flow_path(client: TestClient) -> None:
response = client.post("/flows", json=payload)
response_body = response.json()[0]
assert response.status_code == 200
assert response_body["name"] == "Pipeline with Parameter"
assert response_body["name"] == "Params Flow"
assert response_body["id"]
assert response_body["deployment_id"]

Expand All @@ -88,8 +88,7 @@ def test_flow_deploy_all(client: TestClient) -> None:
response = client.post("/flows", json={"force": True})
response_body = response.json()
assert response.status_code == 200
assert len(response_body) == 2
assert [flow["name"] for flow in response_body] == [
"Pipeline with Parameter",
"Simple Flow2",
]
assert len(response_body) == 4
assert sorted([flow["name"] for flow in response_body]) == sorted(
["Params Flow", "Subflow", "With Subflow", "Simple Flow2"]
)
13 changes: 12 additions & 1 deletion tests/integration/cli/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,16 @@ def run_flow(runner: CliRunner):
["flows", "run", "Simple Flow2"],
)
assert result.exit_code == 0
assert "Run flow: Simple Flow2\nCreated flow run with id:" in result.stdout
assert "Created flow run with id:" in result.stdout
return result.stdout.split("id='")[1].split("'")[0]


@pytest.fixture
def run_flow_with_subflow(runner: CliRunner):
result = runner.invoke(
app,
["flows", "run", "With Subflow", "--params", "name=name"],
)
assert result.exit_code == 0
assert "Created flow run with id:" in result.stdout
return result.stdout.split("id='")[1].split("'")[0]
31 changes: 30 additions & 1 deletion tests/integration/cli/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,40 @@
from fastflows.schemas.flow import Flow


def test_change_state_flow_run(runner: CliRunner, run_flow: Flow):
def test_change_state_flow_run_error(runner: CliRunner, run_flow: Flow):
result = runner.invoke(
app,
["flow-runs", "update", run_flow],
)
assert result.exit_code == 1
assert "You should provide state to set to Flow Run" in result.stdout


def test_change_state_flow_run_ok(runner: CliRunner, run_flow: Flow):
result = runner.invoke(
app,
["flow-runs", "update", run_flow, "--state", "CANCELLED"],
)
assert result.exit_code == 0
assert "Set state CANCELLED for flow_run_id" in result.stdout
assert "status=<Status.ACCEPT: 'ACCEPT'>" in result.stdout


def test_cancel_flow_run(runner: CliRunner, run_flow: Flow):
result = runner.invoke(
app,
["flow-runs", "cancel", run_flow],
)
assert result.exit_code == 0
assert "Cancel flow run with flow_run_id " in result.stdout


def test_flow_run_with_sub_flow_and_graph_option(
runner: CliRunner, run_flow_with_subflow: Flow
):
result = runner.invoke(
app,
["flow-runs", "details", run_flow_with_subflow, "--graph"],
)
assert result.exit_code == 0
assert "Get graph for flow_run_id" in result.stdout
Loading

0 comments on commit 6f42ad6

Please sign in to comment.