Skip to content

Commit

Permalink
Separate loader Service + Redis Bus (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
morsecodist authored Oct 18, 2023
1 parent b2475cb commit 3395747
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 75 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ clean:
$(MAKE) -C entities local-clean
$(MAKE) -C workflows local-clean
docker compose down
rm -f .moto_recording
rm -rf .moto_recording
18 changes: 17 additions & 1 deletion platformics/api/core/settings.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
from functools import cached_property

from jwcrypto import jwk
from pydantic_settings import BaseSettings
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict


class RedisEventBusSettings(BaseModel):
REDIS_URL: str
QUEUE_NAME: str


class EventBusSettings(BaseModel):
REDIS: RedisEventBusSettings


class Settings(BaseSettings):
"""Pydantic Settings object - do not instantiate it directly,
please use get_settings() as a dependency where possible"""
model_config = SettingsConfigDict(env_nested_delimiter='__')

SERVICE_NAME: str = "Platformics Entities"

Expand Down Expand Up @@ -36,6 +47,11 @@ class Settings(BaseSettings):
BOTO_ENDPOINT_URL: str
AWS_REGION: str

PLATFORMICS_WORKFLOW_RUNNER_PLUGIN: str

PLATFORMICS_EVENT_BUS_PLUGIN: str
PLATFORMICS_EVENT_BUS: EventBusSettings

############################################################################
# Computed properties

Expand Down
17 changes: 13 additions & 4 deletions workflows/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import strawberry
from cerbos.sdk.client import CerbosClient
from cerbos.sdk.model import Principal
from config import load_event_buses, load_workflow_runners
from fastapi import APIRouter, Depends, FastAPI
from config import load_event_bus, load_workflow_runners
from fastapi import APIRouter, Depends, FastAPI, Request
from platformics.api.core.deps import get_auth_principal, get_cerbos_client, get_db_session, get_engine
from platformics.api.core.settings import APISettings
from platformics.api.core.strawberry_extensions import DependencyExtension
Expand All @@ -16,6 +16,7 @@
from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper

from api.core.gql_loaders import WorkflowLoader, get_base_loader
from plugin_types import EventBus

###########
# Plugins #
Expand All @@ -26,7 +27,12 @@
default_workflow_runner_name = config.get("plugins", "default_workflow_runner")

workflow_runners = load_workflow_runners()
event_buses = load_event_buses()


def get_event_bus(request: Request) -> EventBus:
"""Get the event_bus object from the app state"""
return request.app.state.event_bus


######################
# Strawberry-GraphQL #
Expand Down Expand Up @@ -160,6 +166,7 @@ async def submit_workflow(
workflow_inputs: str,
workflow_runner: str = default_workflow_runner_name,
session: AsyncSession = Depends(get_db_session, use_cache=False),
event_bus: EventBus = Depends(get_event_bus),
) -> Run:
# TODO: how do we determine the docker_image_id? Argument to miniwdl, may not be defined,
# other devs may want to submit custom containers
Expand All @@ -177,7 +184,7 @@ async def submit_workflow(
"WDL" in _workflow_runner.supported_workflow_types()
), f"Workflow runner {workflow_runner} does not support WDL"
response = await _workflow_runner.run_workflow(
event_bus=event_buses["local"],
event_bus=event_bus,
workflow_run_id="1", # TODO: When we create the workflow run add the uuid here
# TODO: should come from the WorkflowVersion model
workflow_path="/workflows/test_workflows/static_sample/static_sample.wdl",
Expand Down Expand Up @@ -253,6 +260,7 @@ async def root() -> dict:
# Make sure tests can get their own instances of the app.
def get_app() -> FastAPI:
settings = APISettings.model_validate({}) # Workaround for https://github.com/pydantic/pydantic/issues/3753
event_bus = load_event_bus(settings)

# call finalize() before using the schema:
# (note that models that are related to models that are in the schema
Expand All @@ -272,6 +280,7 @@ def get_app() -> FastAPI:
_app = FastAPI()
# Add a global settings object to the app that we can use as a dependency
_app.state.entities_settings = settings
_app.state.event_bus = event_bus

graphql_app: GraphQLRouter = GraphQLRouter(schema, context_getter=get_context, graphiql=True)
_app.include_router(root_router)
Expand Down
14 changes: 9 additions & 5 deletions workflows/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from importlib.metadata import entry_points
from typing import Dict
from platformics.api.core.settings import APISettings
from plugin_types import EventBus, WorkflowRunner


Expand All @@ -12,10 +13,13 @@ def load_workflow_runners() -> Dict[str, WorkflowRunner]:
return workflow_runners_by_name


def load_event_buses() -> Dict[str, EventBus]:
event_buses_by_name: Dict[str, EventBus] = {}
def load_event_bus(settings: APISettings) -> EventBus:
for plugin in entry_points(group="czid.plugin.event_bus"):
event_bus = plugin.load()()
if plugin.name != settings.PLATFORMICS_EVENT_BUS_PLUGIN:
continue
event_bus = plugin.load()(
getattr(settings.PLATFORMICS_EVENT_BUS, settings.PLATFORMICS_EVENT_BUS_PLUGIN.upper())
)
assert isinstance(event_bus, EventBus)
event_buses_by_name[plugin.name] = event_bus
return event_buses_by_name
return event_bus
raise Exception(f"Event bus plugin {settings.PLATFORMICS_EVENT_BUS_PLUGIN} not found")
45 changes: 36 additions & 9 deletions workflows/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,31 @@ x-db-variables: &db-variables
? PLATFORMICS_DATABASE_USER=postgres
? PLATFORMICS_DATABASE_PASSWORD=password_postgres
? PLATFORMICS_DATABASE_NAME=workflows

x-redis-variables: &redis-variables
? PLATFORMICS_EVENT_BUS__REDIS__REDIS_URL=redis://redis.czidnet
? PLATFORMICS_EVENT_BUS__REDIS__QUEUE_NAME=workflow-events

x-workflow-variables: &workflow-variables
? ENVIRONMENT
? BOTO_ENDPOINT_URL=http://motoserver.czidnet:4000
? ENTITY_SERVICE_URL=http://entity-service:8008
? DEFAULT_UPLOAD_BUCKET=local-bucket
? ENTITY_SERVICE_AUTH_TOKEN=eyJhbGciOiJFQ0RILUVTIiwiZW5jIjoiQTI1NkNCQy1IUzUxMiIsImVwayI6eyJjcnYiOiJQLTM4NCIsImt0eSI6IkVDIiwieCI6Ik5Nc3ZKbXVuYnBXY0VsdVlJTmRVeVVIcUkzbjZCR2VQd2V3ajRXS0pVdEt0QXhmUUtrVE81M2kzQ2dSZkZYVEQiLCJ5IjoiYmp6TkJuZjExWjRIV3dBVm95UVpSOGRWSERicW9wTjhvQkJZYnIxQlBiU1llZHdaWkVuYzJqS21rY0xxcloxTiJ9LCJraWQiOiItQmx2bF9wVk5LU2JRQ2N5dGV4UzNfMk5MaHBia2J6LVk5VFFjbkY5S1drIiwidHlwIjoiSldFIn0..Ymjmtj6nXp8r8AFe8AgI1g.e_39w7OXGJaOVKL_QkP38rvlcEeSgFQsxT0rTdCgI5E-b328zlVHagLSFZ_Iqvqiy6Z8KcU4pLJ3NTaW3Ys_YQsnUn6yUEvwqOY2KESB0mT0Bp3qpNYRBZJVA8PW43YAbOnO7h7ZTwQZJfwMzwhcaaYeZW8pN9rvcNtQX3rgBubSuR-LHKL6k4uAMPh9A8ZxXKZgpI6tpJPxE-uspvYi-foW8VyjZtwOUMvMp3lfZPyL1oQIv_rEUhOGNO_lfi339QcT6F7DwBjXK6C_7U65F-dFZScnReLnVczPfHhJ7z3NnVt46sFcddgZpLIpQyzA6puCcDoRm5ZZCVvm8h-LHVy-9dGWLVxBRhGRdBwBhbiVu2O_CNeVabXl8JhAs3oeV2zDgYfOj_-kkHWsbgHZ0y-tc-HtgoXzsUqaRP1IXQ3g3VDES7UjsaKsfxgURH5EIsrdWwFrWHGoLNfGwwPSwTBI5Mul7LT10-Pg_uBBCiHfQIDqerRQeADRFhV_07GYatBDt-RxwNL4bO59V8ewCzhpdCYRpL363HGldT1Pic-SpTk2NsY2t8MA6__FhJU9JSKYwJpeKMaGLUHA_40PEQ.gb5q-WZTU-ZKpV7WYFbMGMEF2CZIBrFlCUeaZ5ffPDU
? DEFAULT_UPLOAD_PROTOCOL=S3 # don't need this in workflows
? PLATFORMICS_WORKFLOW_RUNNER_PLUGIN=local
? PLATFORMICS_EVENT_BUS_PLUGIN=redis

x-cerbos-variables: &cerbos-variables
? CERBOS_URL=http://wf-cerbos:3592
? JWK_PUBLIC_KEY_FILE=/workflows/test_infra/fixtures/public_key.pem
? JWK_PRIVATE_KEY_FILE=/workflows/test_infra/fixtures/private_key.pem

services:
redis:
image: redis:7
ports:
- "6379:6379"
postgres:
image: postgres:15
restart: always
Expand Down Expand Up @@ -53,15 +73,22 @@ services:
stdin_open: true # Helps with pdb
tty: true # Helps with pdb
environment:
<<: [*aws-variables, *db-variables, *workflow-variables]
CERBOS_URL: http://wf-cerbos:3592
JWK_PUBLIC_KEY_FILE: "/workflows/test_infra/fixtures/public_key.pem"
JWK_PRIVATE_KEY_FILE: "/workflows/test_infra/fixtures/private_key.pem"
DEFAULT_UPLOAD_BUCKET: "local-bucket"
BOTO_ENDPOINT_URL: "http://motoserver.czidnet:4000"
ENTITY_SERVICE_URL: "http://entity-service:8008"
ENTITY_SERVICE_AUTH_TOKEN: "eyJhbGciOiJFQ0RILUVTIiwiZW5jIjoiQTI1NkNCQy1IUzUxMiIsImVwayI6eyJjcnYiOiJQLTM4NCIsImt0eSI6IkVDIiwieCI6Ik5Nc3ZKbXVuYnBXY0VsdVlJTmRVeVVIcUkzbjZCR2VQd2V3ajRXS0pVdEt0QXhmUUtrVE81M2kzQ2dSZkZYVEQiLCJ5IjoiYmp6TkJuZjExWjRIV3dBVm95UVpSOGRWSERicW9wTjhvQkJZYnIxQlBiU1llZHdaWkVuYzJqS21rY0xxcloxTiJ9LCJraWQiOiItQmx2bF9wVk5LU2JRQ2N5dGV4UzNfMk5MaHBia2J6LVk5VFFjbkY5S1drIiwidHlwIjoiSldFIn0..Ymjmtj6nXp8r8AFe8AgI1g.e_39w7OXGJaOVKL_QkP38rvlcEeSgFQsxT0rTdCgI5E-b328zlVHagLSFZ_Iqvqiy6Z8KcU4pLJ3NTaW3Ys_YQsnUn6yUEvwqOY2KESB0mT0Bp3qpNYRBZJVA8PW43YAbOnO7h7ZTwQZJfwMzwhcaaYeZW8pN9rvcNtQX3rgBubSuR-LHKL6k4uAMPh9A8ZxXKZgpI6tpJPxE-uspvYi-foW8VyjZtwOUMvMp3lfZPyL1oQIv_rEUhOGNO_lfi339QcT6F7DwBjXK6C_7U65F-dFZScnReLnVczPfHhJ7z3NnVt46sFcddgZpLIpQyzA6puCcDoRm5ZZCVvm8h-LHVy-9dGWLVxBRhGRdBwBhbiVu2O_CNeVabXl8JhAs3oeV2zDgYfOj_-kkHWsbgHZ0y-tc-HtgoXzsUqaRP1IXQ3g3VDES7UjsaKsfxgURH5EIsrdWwFrWHGoLNfGwwPSwTBI5Mul7LT10-Pg_uBBCiHfQIDqerRQeADRFhV_07GYatBDt-RxwNL4bO59V8ewCzhpdCYRpL363HGldT1Pic-SpTk2NsY2t8MA6__FhJU9JSKYwJpeKMaGLUHA_40PEQ.gb5q-WZTU-ZKpV7WYFbMGMEF2CZIBrFlCUeaZ5ffPDU"
DEFAULT_UPLOAD_PROTOCOL: S3 # don't need this in workflows
<<: [*aws-variables, *db-variables, *redis-variables, *workflow-variables, *cerbos-variables]
workflows-worker:
build:
context: ".."
dockerfile: "workflows/Dockerfile"
args:
- BUILDKIT_INLINE_CACHE=1
image: "platformics-workflows"
command: ["python3", "run_loader.py"]
restart: always
volumes:
- .:/workflows
stdin_open: true # Helps with pdb
tty: true # Helps with pdb
environment:
<<: [*aws-variables, *db-variables, *redis-variables, *workflow-variables, *cerbos-variables]
wf-cerbos:
image: ghcr.io/cerbos/cerbos:0.29.0
volumes:
Expand Down
11 changes: 0 additions & 11 deletions workflows/etc/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,3 @@ autostart = true
autorestart = true
startretries = 9999

[program:loaders]
directory=/workflows
command=python3 run_loader.py
environment=PYTHONUNBUFFERED=1
stderr_logfile = /dev/stderr
stderr_logfile_maxbytes=0
stdout_logfile = /dev/stdout
stdout_logfile_maxbytes=0
autostart = true
autorestart = true
startretries = 9999
1 change: 1 addition & 0 deletions workflows/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ async def process_workflow_completed(
async def main(self) -> None:
while True:
for event in await self.bus.poll():
print("event", event, file=sys.stderr)
if isinstance(event, WorkflowSucceededMessage):
manifest = load_manifest(open("sequence_manifest.json").read())
_event: WorkflowSucceededMessage = event
Expand Down
33 changes: 20 additions & 13 deletions workflows/plugin_types.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pydantic import BaseModel
from typing import Dict, List, Literal, Any

WorkflowStatus = Literal["WORKFLOW_STARTED", "WORKFLOW_SUCCESS", "WORKFLOW_FAILURE"]

@dataclass
class WorkflowStatusMessage:

class WorkflowStatusMessage(BaseModel):
runner_id: str
status: Literal["WORKFLOW_STARTED", "WORKFLOW_SUCCESS", "WORKFLOW_FAILURE"]
status: WorkflowStatus


@dataclass
class WorkflowStartedMessage(WorkflowStatusMessage):
status: Literal["WORKFLOW_STARTED"]
status: Literal["WORKFLOW_STARTED"] = "WORKFLOW_STARTED"


class WorkflowSucceededMessage(WorkflowStatusMessage):
status: Literal["WORKFLOW_SUCCESS"] = "WORKFLOW_SUCCESS"
outputs: Dict[str, str]

def __init__(self, runner_id: str, outputs: Dict[str, str]):
self.runner_id = runner_id
self.outputs = outputs
outputs: Dict[str, str] = {}


@dataclass
class WorkflowFailedMessage(WorkflowStatusMessage):
status: Literal["WORKFLOW_FAILURE"]
status: Literal["WORKFLOW_FAILURE"] = "WORKFLOW_FAILURE"


def parse_workflow_status_message(obj: dict) -> WorkflowStatusMessage:
status = obj["status"]
if status == "WORKFLOW_STARTED":
return WorkflowStartedMessage(**obj)
elif status == "WORKFLOW_SUCCESS":
return WorkflowSucceededMessage(**obj)
elif status == "WORKFLOW_FAILURE":
return WorkflowFailedMessage(**obj)
else:
raise Exception(f"Unknown workflow status: {status}")


class EventBus(ABC):
Expand Down
20 changes: 0 additions & 20 deletions workflows/plugins/event_bus/local/event_bus_local.py

This file was deleted.

20 changes: 20 additions & 0 deletions workflows/plugins/event_bus/redis/event_bus_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import json
from typing import List

from platformics.api.core.settings import RedisEventBusSettings
from plugin_types import EventBus, WorkflowStatusMessage, parse_workflow_status_message

import redis.asyncio as aioredis


class EventBusRedis(EventBus):
def __init__(self, setings: RedisEventBusSettings) -> None:
self.settings = setings
self.redis = aioredis.from_url(self.settings.REDIS_URL)

async def send(self, message: WorkflowStatusMessage) -> None:
await self.redis.lpush(self.settings.QUEUE_NAME, message.model_dump_json())

async def poll(self) -> List[WorkflowStatusMessage]:
_, message = await self.redis.brpop(self.settings.QUEUE_NAME)
return [parse_workflow_status_message(json.loads(message))]
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
from setuptools import setup

setup(
name="event-bus-local",
name="event-bus-redis",
version="0.0.1",
description="A local event bus",
description="A redis event bus",
url="",
project_urls={"Documentation": "", "Source Code": "", "Issue Tracker": ""},
long_description="",
long_description_content_type="text/markdown",
author="Todd Morse",
py_modules=["event_bus_local"],
py_modules=["event_bus_redis"],
python_requires=">=3.6",
setup_requires=[],
install_requires=["miniwdl"],
reentry_register=True,
entry_points={
"czid.plugin.event_bus": [
"local = event_bus_local:EventBusLocal",
"redis = event_bus_redis:EventBusRedis",
],
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def run_workflow(
inputs: dict,
) -> str:
runner_id = str(uuid4())
await event_bus.send(WorkflowStartedMessage(runner_id, "WORKFLOW_STARTED"))
await event_bus.send(WorkflowStartedMessage(runner_id=runner_id))
# Running docker-in-docker requires the paths to files and outputs to be the same between
with tempfile.TemporaryDirectory(dir="/tmp") as tmpdir:
try:
Expand All @@ -70,9 +70,9 @@ async def run_workflow(

assert p.stdout
outputs = json.loads(p.stdout.read().decode())["outputs"]
await event_bus.send(WorkflowSucceededMessage(runner_id, outputs))
await event_bus.send(WorkflowSucceededMessage(runner_id=runner_id, outputs=outputs))

except subprocess.CalledProcessError as e:
print(e.output)
await event_bus.send(WorkflowFailedMessage(runner_id, "WORKFLOW_FAILURE"))
await event_bus.send(WorkflowFailedMessage(runner_id=runner_id))
return runner_id
20 changes: 19 additions & 1 deletion workflows/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3395747

Please sign in to comment.