Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local runner nonblocking #96

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions workflows/plugins/workflow_runners/local/workflow_runner_local.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import json
import os
import subprocess
import sys
import tempfile
import os
import threading
from typing import List
from uuid import uuid4
import re
Expand Down Expand Up @@ -33,22 +35,21 @@ def description(self) -> str:
"""Returns a description of the workflow runner"""
return "Runs WDL workflows locally using miniWDL"

def detect_task_output(self, line: str) -> None:
def _detect_task_output(self, line: str) -> None:
if "INFO output :: job:" in line:
task = _search_group(r"job: (.*),", line, 1)
outputs = json.loads(_search_group(r"values: (\{.*\})", line, 1))
print(f"task complete: {task}")
for key, output in outputs.items():
print(f"{key}: {output}")

async def run_workflow(
async def _run_workflow_work(
self,
event_bus: EventBus,
workflow_run_id: str,
workflow_path: str,
inputs: dict,
) -> str:
runner_id = str(uuid4())
runner_id: str,
) -> None:
await event_bus.send(WorkflowStartedMessage(runner_id=runner_id))
# Running docker-in-docker requires the paths to files and outputs to be the same between
with tempfile.TemporaryDirectory(dir="/tmp") as tmpdir:
Expand All @@ -63,7 +64,7 @@ async def run_workflow(
while True:
assert p.stderr
line = p.stderr.readline().decode()
self.detect_task_output(line)
self._detect_task_output(line)
print(line, file=sys.stderr)
if not line:
break
Expand All @@ -75,4 +76,31 @@ async def run_workflow(
except subprocess.CalledProcessError as e:
print(e.output)
await event_bus.send(WorkflowFailedMessage(runner_id=runner_id))

def _run_workflow_sync(
self,
event_bus: EventBus,
workflow_path: str,
inputs: dict,
runner_id: str,
) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._run_workflow_work(event_bus, workflow_path, inputs, runner_id))
loop.close()

async def run_workflow(
self,
event_bus: EventBus,
workflow_run_id: str,
workflow_path: str,
inputs: dict,
) -> str:
runner_id = str(uuid4())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this eventually be set by workflow_run_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is supposed to be generated by the runner, it is an equivalent to the SFN execution ARN

# run workflow in a thread
thread = threading.Thread(
target=self._run_workflow_sync,
args=(event_bus, workflow_path, inputs, runner_id),
)
thread.start()
return runner_id