From f5d508709363d8b8f79a177c9136276b6781ecaa Mon Sep 17 00:00:00 2001 From: taxe10 Date: Tue, 5 Mar 2024 13:38:18 -0800 Subject: [PATCH 01/11] added prefect client to requirement list --- .pre-commit-config.yaml | 6 ++++++ requirements.txt | 1 + 2 files changed, 7 insertions(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cc2ea1e..6df81cc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,6 +12,12 @@ repos: - id: check-symlinks - id: check-yaml - id: debug-statements + - repo: https://github.com/gitguardian/ggshield + rev: v1.25.0 + hooks: + - id: ggshield + language_version: python3 + stages: [commit] # Using this mirror lets us use mypyc-compiled black, which is about 2x faster - repo: https://github.com/psf/black-pre-commit-mirror rev: 24.2.0 diff --git a/requirements.txt b/requirements.txt index 7a2fbcc..07675be 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ numpy packaging==23.1 pandas==2.0.2 plotly==5.17.0 +prefect-client==2.14.21 python-dateutil==2.8.2 pytz==2023.3 six==1.16.0 From 849b9ce4d6e00fae13b54291db3b45db3661a67a Mon Sep 17 00:00:00 2001 From: taxe10 Date: Tue, 5 Mar 2024 13:40:39 -0800 Subject: [PATCH 02/11] added prefect-related variables --- .env.example | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.env.example b/.env.example index dd72de4..35f00f4 100644 --- a/.env.example +++ b/.env.example @@ -30,6 +30,10 @@ SEG_TILED_URI= # Replace with your API key SEG_TILED_API_KEY= +# Directory where the segmentation application will store trained models and segmentation +# results. If using podman, this is the directory that will be mounted as a volume. +RESULTS_DIR=${PWD}/data/results + # Development environment variables, to be removed in upcoming versions DASH_DEPLOYMENT_LOC='Local' EXPORT_FILE_PATH='data/exported_annotations.json' @@ -38,3 +42,8 @@ MODE='dev' # Basic authentication for segmentation application when deploying on a publicly accessible server USER_NAME= USER_PASSWORD= + +# Prefect environment variables +PREFECT_API_URL=http://prefect:4200/api +FLOW_NAME="Parent flow/launch_parent_flow" +TIMEZONE="US/Pacific" From 790b39d0749f27dbfd476e2948a93345edf5552f Mon Sep 17 00:00:00 2001 From: taxe10 Date: Tue, 5 Mar 2024 13:40:59 -0800 Subject: [PATCH 03/11] added extra icon for job submission --- constants.py | 1 + 1 file changed, 1 insertion(+) diff --git a/constants.py b/constants.py index 544a1ed..f7b441f 100644 --- a/constants.py +++ b/constants.py @@ -29,6 +29,7 @@ "export-annotation": "entypo:export", "no-more-slices": "pajamas:warning-solid", "export": "entypo:export", + "submit": "formkit:submit", } ANNOT_NOTIFICATION_MSGS = { From 560a0dabfccd2d0273cd88032daf6467f1bbeab1 Mon Sep 17 00:00:00 2001 From: taxe10 Date: Tue, 5 Mar 2024 13:42:54 -0800 Subject: [PATCH 04/11] added prefect_utils --- utils/prefect.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 utils/prefect.py diff --git a/utils/prefect.py b/utils/prefect.py new file mode 100644 index 0000000..66495ed --- /dev/null +++ b/utils/prefect.py @@ -0,0 +1,82 @@ +import asyncio +from typing import Optional + +from prefect import get_client +from prefect.client.schemas.filters import ( + FlowRunFilter, + FlowRunFilterName, + FlowRunFilterTags, +) + + +async def _schedule( + deployment_name: str, + flow_run_name: str, + parameters: Optional[dict] = None, + tags: Optional[list] = [], +): + async with get_client() as client: + deployment = await client.read_deployment_by_name(deployment_name) + assert ( + deployment + ), f"No deployment found in config for deployment_name {deployment_name}" + flow_run = await client.create_flow_run_from_deployment( + deployment.id, + parameters=parameters, + name=flow_run_name, + tags=tags, + ) + return flow_run.id + + +def schedule_prefect_flow( + deployment_name: str, + parameters: Optional[dict] = None, + flow_run_name: Optional[str] = None, + tags: Optional[list] = [], +): + if not flow_run_name: + model_name = parameters["model_name"] + flow_run_name = f"{deployment_name}: {model_name}" + flow_run_id = asyncio.run( + _schedule(deployment_name, flow_run_name, parameters, tags) + ) + return flow_run_id + + +async def _get_name(flow_run_id): + async with get_client() as client: + flow_run = await client.read_flow_run(flow_run_id) + if flow_run.state.is_final(): + if flow_run.state.is_completed(): + return flow_run.name + return None + + +def get_flow_run_name(flow_run_id): + """Retrieves the name of the flow with the given id.""" + return asyncio.run(_get_name(flow_run_id)) + + +async def _flow_run_query(tags, flow_run_name=None): + flow_runs_by_name = [] + async with get_client() as client: + flow_runs = await client.read_flow_runs( + flow_run_filter=FlowRunFilter( + name=FlowRunFilterName(like_=flow_run_name), + tags=FlowRunFilterTags(all_=tags), + ) + ) + for flow_run in flow_runs: + if flow_run.state_name == "Failed": + flow_name = f"❌ {flow_run.name}" + elif flow_run.state_name == "Completed": + flow_name = f"✅ {flow_run.name}" + else: + flow_name = f"🕑 {flow_run.name}" + flow_runs_by_name.append({"label": flow_name, "value": str(flow_run.id)}) + return flow_runs_by_name + + +def query_flow_run(tags, flow_run_name=None): + return _flow_run_query(tags, flow_run_name) From 35a203e9854845c4243b867051b1190a957dc1ac Mon Sep 17 00:00:00 2001 From: taxe10 Date: Tue, 5 Mar 2024 13:44:41 -0800 Subject: [PATCH 05/11] parsing prefect-related variables --- docker-compose.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index c0a7f2b..9002b3c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,10 @@ services: environment: DATA_TILED_URI: '${DATA_TILED_URI}' DATA_TILED_API_KEY: '${DATA_TILED_API_KEY}' + RESULTS_DIR: '${RESULTS_DIR}' + PREFECT_API_URL: '${PREFECT_API_URL}' + FLOW_NAME: '${FLOW_NAME}' + TIMEZONE: "${TIMEZONE}" volumes: - ./app.py:/app/app.py - ./constants.py:/app/constants.py From 55edede09e6cee54649490acecafc51c2e7b2532 Mon Sep 17 00:00:00 2001 From: taxe10 Date: Tue, 5 Mar 2024 14:14:31 -0800 Subject: [PATCH 06/11] submit jobs to prefect, display jobs in dropdowns, and sort flow runs --- callbacks/segmentation.py | 291 +++++++++++++++++++++++++------------- components/control_bar.py | 47 ++++-- utils/prefect.py | 3 +- 3 files changed, 234 insertions(+), 107 deletions(-) diff --git a/callbacks/segmentation.py b/callbacks/segmentation.py index 69a206c..2ca8c35 100644 --- a/callbacks/segmentation.py +++ b/callbacks/segmentation.py @@ -1,145 +1,240 @@ +import asyncio import os -import time +import traceback import uuid +from datetime import datetime -import dash_mantine_components as dmc -import requests +import pytz from dash import ALL, Input, Output, State, callback, no_update -from dash.exceptions import PreventUpdate +from constants import ANNOT_ICONS from utils.data_utils import tiled_dataset +from utils.plot_utils import generate_notification +from utils.prefect import get_flow_run_name, query_flow_run, schedule_prefect_flow MODE = os.getenv("MODE", "") +RESULTS_DIR = os.getenv("RESULTS_DIR", "") +FLOW_NAME = os.getenv("FLOW_NAME", "") +PREFECT_TAGS = os.getenv("PREFECT_TAGS", ["high-res-segmentation"]) -DEMO_WORKFLOW = { - "user_uid": "high_res_user", - "job_list": [ +# TODO: Retrieve timezone from browser +TIMEZONE = os.getenv("TIMEZONE", "US/Pacific") + +# TODO: Get model parameters from UI +TRAIN_PARAMS_EXAMPLE = { + "flow_type": "podman", + "params_list": [ + { + "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", + "image_tag": "main", + "command": 'python -c \\"import time; time.sleep(30)\\"', + "params": {"test": "test"}, + "volumes": [f"{RESULTS_DIR}:/app/work/results"], + }, { - "mlex_app": "high-res-segmentation", - "description": "test_1", - "service_type": "backend", - "working_directory": "/data/mlex_repo/mlex_tiled/data", - "job_kwargs": { - "uri": "mlexchange1/random-forest-dc:1.1", - "type": "docker", - "cmd": 'python random_forest.py data/seg-results/spiral/image-train data/seg-results-test/spiral/feature data/seg-results/spiral/mask data/seg-results-test/spiral/model \'{"n_estimators": 30, "oob_score": true, "max_depth": 8}\'', # noqa: E501 - "kwargs": { - "job_type": "train", - "experiment_id": "123", - "dataset": "name_of_dataset", - "params": '{"n_estimators": 30, "oob_score": true, "max_depth": 8}', - }, - }, + "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", + "image_tag": "main", + "command": 'python -c \\"import time; time.sleep(10)\\"', + "params": {"test": "test"}, + "volumes": [f"{RESULTS_DIR}:/app/work/results"], }, + ], +} + +INFERENCE_PARAMS_EXAMPLE = { + "flow_type": "podman", + "params_list": [ { - "mlex_app": "high-res-segmentation", - "description": "test_1", - "service_type": "backend", - "working_directory": "/data/mlex_repo/mlex_tiled/data", - "job_kwargs": { - "uri": "mlexchange1/random-forest-dc:1.1", - "type": "docker", - "cmd": "python segment.py data/data/20221222_085501_looking_from_above_spiralUP_CounterClockwise_endPointAtDoor_0-1000 data/seg-results-test/spiral/model/random-forest.model data/seg-results-test/spiral/output '{\"show_progress\": 1}'", # noqa: E501 - "kwargs": { - "job_type": "train", - "experiment_id": "124", - "dataset": "name_of_dataset", - "params": '{"show_progress": 1}', - }, - }, + "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", + "image_tag": "main", + "command": 'python -c \\"import time; time.sleep(30)\\"', + "params": {"test": "test"}, + "volumes": [f"{RESULTS_DIR}:/app/work/results"], }, ], - "host_list": ["vaughan.als.lbl.gov"], - "dependencies": {"0": [], "1": [0]}, - "requirements": {"num_processors": 2, "num_gpus": 0, "num_nodes": 1}, } @callback( - Output("output-details", "children"), - Output("submitted-job-id", "data"), - Input("run-model", "n_clicks"), + Output("notifications-container", "children", allow_duplicate=True), + Input("run-train", "n_clicks"), State("annotation-store", "data"), State({"type": "annotation-class-store", "index": ALL}, "data"), State("project-name-src", "value"), + State("job-name", "value"), + prevent_initial_call=True, ) -def run_job(n_clicks, global_store, all_annotations, project_name): +def run_train(n_clicks, global_store, all_annotations, project_name, job_name): """ - This callback collects parameters from the UI and submits a job to the computing api. + This callback collects parameters from the UI and submits a training job to Prefect. If the app is run from "dev" mode, then only a placeholder job_uid will be created. - The job_uid is saved in a dcc.Store for reference by the check_job callback below. - # TODO: Appropriately paramaterize the DEMO_WORKFLOW json depending on user inputs + # TODO: Appropriately paramaterize the job json depending on user inputs and relevant file paths """ if n_clicks: if MODE == "dev": job_uid = str(uuid.uuid4()) - return ( - dmc.Text( - f"Workflow has been succesfully submitted with uid: {job_uid}", - size="sm", - ), - job_uid, - ) + job_message = f"Job has been succesfully submitted with uid: {job_uid}" + notification_color = "indigo" else: - tiled_dataset.save_annotations_data( global_store, all_annotations, project_name ) - job_submitted = requests.post( - "http://job-service:8080/api/v0/workflows", json=DEMO_WORKFLOW - ) - job_uid = job_submitted.json() - if job_submitted.status_code == 200: - return ( - dmc.Text( - f"Workflow has been succesfully submitted with uid: {job_uid}", - size="sm", - ), - job_uid, + try: + # Schedule job + current_time = datetime.now(pytz.timezone(TIMEZONE)).strftime( + "%Y/%m/%d %H:%M:%S" ) - else: - return ( - dmc.Text( - f"Workflow presented error code: {job_submitted.status_code}", - size="sm", - ), - job_uid, + job_uid = schedule_prefect_flow( + FLOW_NAME, + parameters=TRAIN_PARAMS_EXAMPLE, + flow_run_name=f"{job_name} {current_time}", + tags=PREFECT_TAGS + ["train"], ) - return no_update, no_update + job_message = f"Job has been succesfully submitted with uid: {job_uid}" + notification_color = "indigo" + except Exception as e: + # Print the traceback to the console + traceback.print_exc() + job_uid = None + job_message = f"Job presented error: {e}" + notification_color = "red" + + notification = generate_notification( + "Job Submission", notification_color, ANNOT_ICONS["submit"], job_message + ) + + return notification + return no_update @callback( - Output("output-details", "children", allow_duplicate=True), - Output("submitted-job-id", "data", allow_duplicate=True), - Input("submitted-job-id", "data"), - Input("model-check", "n_intervals"), + Output("notifications-container", "children", allow_duplicate=True), + Input("run-inference", "n_clicks"), + State("train-job-selector", "value"), prevent_initial_call=True, ) -def check_job(job_id, n_intervals): +def run_inference(n_clicks, train_job_id): + """ + This callback collects parameters from the UI and submits an inference job to Prefect. + If the app is run from "dev" mode, then only a placeholder job_uid will be created. + + # TODO: Appropriately paramaterize the job json depending on user inputs + and relevant file paths """ - This callback checks to see if a job has completed successfully and will only - update if there is a job_id present in the submitted-job-id dcc.Store. Will - wait 3sec in "dev" mode to simulate. + if n_clicks: + if MODE == "dev": + job_uid = str(uuid.uuid4()) + job_message = f"Job has been succesfully submitted with uid: {job_uid}" + notification_color = "indigo" + else: + if train_job_id is not None: + job_name = get_flow_run_name(train_job_id) + if job_name is not None: + try: + # Schedule job + current_time = datetime.now(pytz.timezone(TIMEZONE)).strftime( + "%Y/%m/%d %H:%M:%S" + ) + job_uid = schedule_prefect_flow( + FLOW_NAME, + parameters=INFERENCE_PARAMS_EXAMPLE, + flow_run_name=f"{job_name} {current_time}", + tags=PREFECT_TAGS + ["inference"], + ) + job_message = ( + f"Job has been succesfully submitted with uid: {job_uid}" + ) + notification_color = "indigo" + except Exception as e: + # Print the traceback to the console + traceback.print_exc() + job_uid = None + job_message = f"Job presented error: {e}" + else: + job_message = "Please select a valid train job" + notification_color = "red" + else: + job_message = "Please select a train job from the dropdown" + notification_color = "red" + + notification = generate_notification( + "Job Submission", notification_color, ANNOT_ICONS["submit"], job_message + ) + + return notification - # TODO: Connect with the computing API when not in "dev" mode + return no_update + + +@callback( + Output("train-job-selector", "data"), + Input("model-check", "n_intervals"), +) +def check_train_job(n_intervals): """ - output_layout = [ - dmc.Text( - f"Workflow {job_id} completed successfully. Click button below to view segmentation results.", - size="sm", - ), - ] + This callback populates the train job selector dropdown with job names and ids from Prefect. + This callback displays the current status of the job as part of the job name in the dropdown. + In "dev" mode, the dropdown is populated with the sample data below. + """ + if MODE == "dev": + data = [ + {"label": "❌ DLSIA ABC 03/11/2024 15:38PM", "value": "uid0001"}, + {"label": "🕑 DLSIA XYC 03/11/2024 14:21PM", "value": "uid0002"}, + {"label": "✅ DLSIA CBA 03/11/2024 10:02AM", "value": "uid0003"}, + ] + else: + data = asyncio.run(query_flow_run(PREFECT_TAGS + ["train"])) + return data + +@callback( + Output("inference-job-selector", "data"), + Output("inference-job-selector", "value"), + Input("model-check", "n_intervals"), + Input("train-job-selector", "value"), +) +def check_inference_job(n_intervals, train_job_id): + """ + This callback populates the inference job selector dropdown with job names and ids from Prefect. + The list of jobs is filtered by the selected train job in the train job selector dropdown. + The selected value is set to None if the list of jobs is empty. + This callback displays the current status of the job as part of the job name in the dropdown. + In "dev" mode, the dropdown is populated with the sample data below. + """ if MODE == "dev": - if job_id: - time.sleep(3) - return ( - output_layout, - None, - ) - raise PreventUpdate + data = [ + {"label": "❌ DLSIA ABC 03/11/2024 15:38PM", "value": "uid0001"}, + {"label": "🕑 DLSIA XYC 03/11/2024 14:21PM", "value": "uid0002"}, + {"label": "✅ DLSIA CBA 03/11/2024 10:02AM", "value": "uid0003"}, + ] + return data, None else: - # TODO - connect with API - raise PreventUpdate + if train_job_id is not None: + job_name = get_flow_run_name(train_job_id) + if job_name is not None: + if MODE == "dev": + data = [ + { + "label": "❌ DLSIA ABC 03/11/2024 15:38PM", + "value": "uid0001", + }, + { + "label": "🕑 DLSIA XYC 03/11/2024 14:21PM", + "value": "uid0002", + }, + { + "label": "✅ DLSIA CBA 03/11/2024 10:02AM", + "value": "uid0003", + }, + ] + else: + data = asyncio.run( + query_flow_run( + PREFECT_TAGS + ["inference"], flow_run_name=job_name + ) + ) + selected_value = None if len(data) == 0 else no_update + return data, selected_value + return [], None diff --git a/components/control_bar.py b/components/control_bar.py index b4ab2b0..968b8d5 100644 --- a/components/control_bar.py +++ b/components/control_bar.py @@ -603,15 +603,46 @@ def layout(): "run-model", id="model-configuration", children=[ - dmc.Center( - dmc.Button( - "Run model", - id="run-model", - variant="light", - style={"width": "160px", "margin": "5px"}, - ) + _control_item( + "Name", + "job-name-input", + dmc.TextInput( + placeholder="Name your job...", + id="job-name", + ), + ), + dmc.Space(h=10), + dmc.Button( + "Train", + id="run-train", + variant="light", + style={"width": "100%", "margin": "5px"}, + ), + dmc.Space(h=10), + _control_item( + "Train Jobs", + "selected-train-job", + dmc.Select( + placeholder="Select a job...", + id="train-job-selector", + ), + ), + dmc.Space(h=10), + dmc.Button( + "Inference", + id="run-inference", + variant="light", + style={"width": "100%", "margin": "5px"}, + ), + dmc.Space(h=10), + _control_item( + "Inference Jobs", + "selected-inference-job", + dmc.Select( + placeholder="Select a job...", + id="inference-job-selector", + ), ), - html.Div(id="output-details"), dmc.Space(h=25), dmc.Switch( id="show-result-overlay-toggle", diff --git a/utils/prefect.py b/utils/prefect.py index 66495ed..ecd49d3 100644 --- a/utils/prefect.py +++ b/utils/prefect.py @@ -65,7 +65,8 @@ async def _flow_run_query(tags, flow_run_name=None): flow_run_filter=FlowRunFilter( name=FlowRunFilterName(like_=flow_run_name), tags=FlowRunFilterTags(all_=tags), - ) + ), + sort="START_TIME_DESC", ) for flow_run in flow_runs: if flow_run.state_name == "Failed": From 6d688c72f9918917ad9ccae39539797147e30e60 Mon Sep 17 00:00:00 2001 From: taxe10 Date: Wed, 6 Mar 2024 11:31:06 -0800 Subject: [PATCH 07/11] modified example flow to match latest schema --- callbacks/segmentation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/callbacks/segmentation.py b/callbacks/segmentation.py index 2ca8c35..185e9a5 100644 --- a/callbacks/segmentation.py +++ b/callbacks/segmentation.py @@ -28,14 +28,14 @@ "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", "image_tag": "main", "command": 'python -c \\"import time; time.sleep(30)\\"', - "params": {"test": "test"}, + "model_params": {"test": "test"}, "volumes": [f"{RESULTS_DIR}:/app/work/results"], }, { "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", "image_tag": "main", "command": 'python -c \\"import time; time.sleep(10)\\"', - "params": {"test": "test"}, + "model_params": {"test": "test"}, "volumes": [f"{RESULTS_DIR}:/app/work/results"], }, ], @@ -48,7 +48,7 @@ "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", "image_tag": "main", "command": 'python -c \\"import time; time.sleep(30)\\"', - "params": {"test": "test"}, + "model_params": {"test": "test"}, "volumes": [f"{RESULTS_DIR}:/app/work/results"], }, ], From 19742ca956f49993cf6f61089874f5298604fa6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wiebke=20K=C3=B6pp?= Date: Wed, 6 Mar 2024 17:54:22 -0800 Subject: [PATCH 08/11] :art: Reformat --- callbacks/segmentation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/callbacks/segmentation.py b/callbacks/segmentation.py index ecc062d..70fbaeb 100644 --- a/callbacks/segmentation.py +++ b/callbacks/segmentation.py @@ -66,7 +66,9 @@ State("job-name", "value"), prevent_initial_call=True, ) -def run_train(n_clicks, global_store, all_annotations, project_name, model_parameters, job_name): +def run_train( + n_clicks, global_store, all_annotations, project_name, model_parameters, job_name +): """ This callback collects parameters from the UI and submits a training job to Prefect. If the app is run from "dev" mode, then only a placeholder job_uid will be created. From e6fb1e5d4a8ff99a2a14d9c4b6bec9f38b8975b9 Mon Sep 17 00:00:00 2001 From: taxe10 Date: Wed, 6 Mar 2024 18:41:07 -0800 Subject: [PATCH 09/11] moved asyncio dependency to prefect.py and updated model_params in sample jobs to match dlsia --- callbacks/segmentation.py | 15 ++++++--------- utils/prefect.py | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/callbacks/segmentation.py b/callbacks/segmentation.py index 70fbaeb..f5496e8 100644 --- a/callbacks/segmentation.py +++ b/callbacks/segmentation.py @@ -1,4 +1,3 @@ -import asyncio import os import traceback import uuid @@ -28,14 +27,14 @@ "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", "image_tag": "main", "command": 'python -c \\"import time; time.sleep(30)\\"', - "model_params": {"test": "test"}, + "model_params": {"io_parameters": {"uid": "uid0001"}}, "volumes": [f"{RESULTS_DIR}:/app/work/results"], }, { "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", "image_tag": "main", "command": 'python -c \\"import time; time.sleep(10)\\"', - "model_params": {"test": "test"}, + "model_params": {"io_parameters": {"uid": "uid0001"}}, "volumes": [f"{RESULTS_DIR}:/app/work/results"], }, ], @@ -48,7 +47,7 @@ "image_name": "ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype", "image_tag": "main", "command": 'python -c \\"import time; time.sleep(30)\\"', - "model_params": {"test": "test"}, + "model_params": {"io_parameters": {"uid": "uid0001"}}, "volumes": [f"{RESULTS_DIR}:/app/work/results"], }, ], @@ -196,7 +195,7 @@ def check_train_job(n_intervals): {"label": "✅ DLSIA CBA 03/11/2024 10:02AM", "value": "uid0003"}, ] else: - data = asyncio.run(query_flow_run(PREFECT_TAGS + ["train"])) + data = query_flow_run(PREFECT_TAGS + ["train"]) return data @@ -241,11 +240,9 @@ def check_inference_job(n_intervals, train_job_id): }, ] else: - data = asyncio.run( - query_flow_run( - PREFECT_TAGS + ["inference"], flow_run_name=job_name + data = query_flow_run( + PREFECT_TAGS + ["inference"], flow_run_name=job_name ) - ) selected_value = None if len(data) == 0 else no_update return data, selected_value return [], None diff --git a/utils/prefect.py b/utils/prefect.py index ecd49d3..d837309 100644 --- a/utils/prefect.py +++ b/utils/prefect.py @@ -80,4 +80,4 @@ async def _flow_run_query(tags, flow_run_name=None): def query_flow_run(tags, flow_run_name=None): - return _flow_run_query(tags, flow_run_name) + return asyncio.run(_flow_run_query(tags, flow_run_name)) From 0f995bde3cd198675415ed129e569240c860c737 Mon Sep 17 00:00:00 2001 From: taxe10 Date: Wed, 6 Mar 2024 18:42:27 -0800 Subject: [PATCH 10/11] typo --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index b4b5672..127c653 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: DATA_TILED_URI: '${DATA_TILED_URI}' DATA_TILED_API_KEY: '${DATA_TILED_API_KEY}' MASK_TILED_URI: '${MASK_TILED_URI}' - MASK_TILED_API_KEY: '${TILED_API_KEY}' + MASK_TILED_API_KEY: '${MASK_TILED_API_KEY}' SEG_TILED_URI: '${SEG_TILED_URI}' SEG_TILED_API_KEY: '${SEG_TILED_API_KEY}' USER_NAME: '${USER_NAME}' From e516b8bf6ec65b18eeff20cb95c5dc4982dbcb36 Mon Sep 17 00:00:00 2001 From: taxe10 Date: Wed, 6 Mar 2024 18:53:05 -0800 Subject: [PATCH 11/11] reformatting --- callbacks/segmentation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/callbacks/segmentation.py b/callbacks/segmentation.py index f5496e8..7323f73 100644 --- a/callbacks/segmentation.py +++ b/callbacks/segmentation.py @@ -242,7 +242,7 @@ def check_inference_job(n_intervals, train_job_id): else: data = query_flow_run( PREFECT_TAGS + ["inference"], flow_run_name=job_name - ) + ) selected_value = None if len(data) == 0 else no_update return data, selected_value return [], None