diff --git a/workflows/plugins/workflow_runners/local/workflow_runner_local.py b/workflows/plugins/workflow_runners/local/workflow_runner_local.py index be36015e..2d86a08f 100644 --- a/workflows/plugins/workflow_runners/local/workflow_runner_local.py +++ b/workflows/plugins/workflow_runners/local/workflow_runner_local.py @@ -1,8 +1,10 @@ +import asyncio import json +import os import subprocess import sys import tempfile -import os +import threading from typing import List from uuid import uuid4 import re @@ -33,7 +35,7 @@ def description(self) -> str: """Returns a description of the workflow runner""" return "Runs WDL workflows locally using miniWDL" - def detect_task_output(self, line: str) -> None: + def _detect_task_output(self, line: str) -> None: if "INFO output :: job:" in line: task = _search_group(r"job: (.*),", line, 1) outputs = json.loads(_search_group(r"values: (\{.*\})", line, 1)) @@ -41,14 +43,13 @@ def detect_task_output(self, line: str) -> None: for key, output in outputs.items(): print(f"{key}: {output}") - async def run_workflow( + async def _run_workflow_work( self, event_bus: EventBus, - workflow_run_id: str, workflow_path: str, inputs: dict, - ) -> str: - runner_id = str(uuid4()) + runner_id: str, + ) -> None: await event_bus.send(WorkflowStartedMessage(runner_id=runner_id)) # Running docker-in-docker requires the paths to files and outputs to be the same between with tempfile.TemporaryDirectory(dir="/tmp") as tmpdir: @@ -63,7 +64,7 @@ async def run_workflow( while True: assert p.stderr line = p.stderr.readline().decode() - self.detect_task_output(line) + self._detect_task_output(line) print(line, file=sys.stderr) if not line: break @@ -75,4 +76,31 @@ async def run_workflow( except subprocess.CalledProcessError as e: print(e.output) await event_bus.send(WorkflowFailedMessage(runner_id=runner_id)) + + def _run_workflow_sync( + self, + event_bus: EventBus, + workflow_path: str, + inputs: dict, + runner_id: str, + ) -> None: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._run_workflow_work(event_bus, workflow_path, inputs, runner_id)) + loop.close() + + async def run_workflow( + self, + event_bus: EventBus, + workflow_run_id: str, + workflow_path: str, + inputs: dict, + ) -> str: + runner_id = str(uuid4()) + # run workflow in a thread + thread = threading.Thread( + target=self._run_workflow_sync, + args=(event_bus, workflow_path, inputs, runner_id), + ) + thread.start() return runner_id