Skip to content

Commit

Permalink
settings update
Browse files Browse the repository at this point in the history
  • Loading branch information
morsecodist committed Oct 18, 2023
1 parent 575b6e0 commit 606d02b
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 23 deletions.
18 changes: 17 additions & 1 deletion platformics/api/core/settings.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
from functools import cached_property

from jwcrypto import jwk
from pydantic_settings import BaseSettings
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict


class RedisEventBusSettings(BaseModel):
REDIS_URL: str
QUEUE_NAME: str


class EventBusSettings(BaseModel):
REDIS: RedisEventBusSettings


class Settings(BaseSettings):
"""Pydantic Settings object - do not instantiate it directly,
please use get_settings() as a dependency where possible"""
model_config = SettingsConfigDict(env_nested_delimiter='__')

SERVICE_NAME: str = "Platformics Entities"

Expand Down Expand Up @@ -36,6 +47,11 @@ class Settings(BaseSettings):
BOTO_ENDPOINT_URL: str
AWS_REGION: str

PLATFORMICS_WORKFLOW_RUNNER_PLUGIN: str

PLATFORMICS_EVENT_BUS_PLUGIN: str
PLATFORMICS_EVENT_BUS: EventBusSettings

############################################################################
# Computed properties

Expand Down
17 changes: 13 additions & 4 deletions workflows/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import strawberry
from cerbos.sdk.client import CerbosClient
from cerbos.sdk.model import Principal
from config import load_event_buses, load_workflow_runners
from fastapi import APIRouter, Depends, FastAPI
from config import load_event_bus, load_workflow_runners
from fastapi import APIRouter, Depends, FastAPI, Request
from platformics.api.core.deps import get_auth_principal, get_cerbos_client, get_db_session, get_engine
from platformics.api.core.settings import APISettings
from platformics.api.core.strawberry_extensions import DependencyExtension
Expand All @@ -16,6 +16,7 @@
from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper

from api.core.gql_loaders import WorkflowLoader, get_base_loader
from plugin_types import EventBus

###########
# Plugins #
Expand All @@ -26,7 +27,12 @@
default_workflow_runner_name = config.get("plugins", "default_workflow_runner")

workflow_runners = load_workflow_runners()
event_buses = load_event_buses()


def get_event_bus(request: Request) -> EventBus:
"""Get the event_bus object from the app state"""
return request.app.state.event_bus


######################
# Strawberry-GraphQL #
Expand Down Expand Up @@ -160,6 +166,7 @@ async def submit_workflow(
workflow_inputs: str,
workflow_runner: str = default_workflow_runner_name,
session: AsyncSession = Depends(get_db_session, use_cache=False),
event_bus: EventBus = Depends(get_event_bus),
) -> Run:
# TODO: how do we determine the docker_image_id? Argument to miniwdl, may not be defined,
# other devs may want to submit custom containers
Expand All @@ -177,7 +184,7 @@ async def submit_workflow(
"WDL" in _workflow_runner.supported_workflow_types()
), f"Workflow runner {workflow_runner} does not support WDL"
response = await _workflow_runner.run_workflow(
event_bus=event_buses["redis"],
event_bus=event_bus,
workflow_run_id="1", # TODO: When we create the workflow run add the uuid here
# TODO: should come from the WorkflowVersion model
workflow_path="/workflows/test_workflows/static_sample/static_sample.wdl",
Expand Down Expand Up @@ -253,6 +260,7 @@ async def root() -> dict:
# Make sure tests can get their own instances of the app.
def get_app() -> FastAPI:
settings = APISettings.model_validate({}) # Workaround for https://github.com/pydantic/pydantic/issues/3753
event_bus = load_event_bus(settings)

# call finalize() before using the schema:
# (note that models that are related to models that are in the schema
Expand All @@ -272,6 +280,7 @@ def get_app() -> FastAPI:
_app = FastAPI()
# Add a global settings object to the app that we can use as a dependency
_app.state.entities_settings = settings
_app.state.event_bus = event_bus

graphql_app: GraphQLRouter = GraphQLRouter(schema, context_getter=get_context, graphiql=True)
_app.include_router(root_router)
Expand Down
14 changes: 9 additions & 5 deletions workflows/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from importlib.metadata import entry_points
from typing import Dict
from platformics.api.core.settings import APISettings
from plugin_types import EventBus, WorkflowRunner


Expand All @@ -12,10 +13,13 @@ def load_workflow_runners() -> Dict[str, WorkflowRunner]:
return workflow_runners_by_name


def load_event_buses() -> Dict[str, EventBus]:
event_buses_by_name: Dict[str, EventBus] = {}
def load_event_bus(settings: APISettings) -> EventBus:
for plugin in entry_points(group="czid.plugin.event_bus"):
event_bus = plugin.load()()
if plugin.name != settings.PLATFORMICS_EVENT_BUS_PLUGIN:
continue
event_bus = plugin.load()(
getattr(settings.PLATFORMICS_EVENT_BUS, settings.PLATFORMICS_EVENT_BUS_PLUGIN.upper())
)
assert isinstance(event_bus, EventBus)
event_buses_by_name[plugin.name] = event_bus
return event_buses_by_name
return event_bus
raise Exception(f"Event bus plugin {settings.PLATFORMICS_EVENT_BUS_PLUGIN} not found")
6 changes: 4 additions & 2 deletions workflows/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ x-db-variables: &db-variables
? PLATFORMICS_DATABASE_NAME=workflows

x-redis-variables: &redis-variables
? CZID__EVENT_BUS_REDIS__REDIS_URL=redis://redis.czidnet
? CZID__EVENT_BUS_REDIS__QUEUE_NAME=workflow-events
? PLATFORMICS_EVENT_BUS__REDIS__REDIS_URL=redis://redis.czidnet
? PLATFORMICS_EVENT_BUS__REDIS__QUEUE_NAME=workflow-events

x-workflow-variables: &workflow-variables
? ENVIRONMENT
Expand All @@ -25,6 +25,8 @@ x-workflow-variables: &workflow-variables
? DEFAULT_UPLOAD_BUCKET=local-bucket
? ENTITY_SERVICE_AUTH_TOKEN=eyJhbGciOiJFQ0RILUVTIiwiZW5jIjoiQTI1NkNCQy1IUzUxMiIsImVwayI6eyJjcnYiOiJQLTM4NCIsImt0eSI6IkVDIiwieCI6Ik5Nc3ZKbXVuYnBXY0VsdVlJTmRVeVVIcUkzbjZCR2VQd2V3ajRXS0pVdEt0QXhmUUtrVE81M2kzQ2dSZkZYVEQiLCJ5IjoiYmp6TkJuZjExWjRIV3dBVm95UVpSOGRWSERicW9wTjhvQkJZYnIxQlBiU1llZHdaWkVuYzJqS21rY0xxcloxTiJ9LCJraWQiOiItQmx2bF9wVk5LU2JRQ2N5dGV4UzNfMk5MaHBia2J6LVk5VFFjbkY5S1drIiwidHlwIjoiSldFIn0..Ymjmtj6nXp8r8AFe8AgI1g.e_39w7OXGJaOVKL_QkP38rvlcEeSgFQsxT0rTdCgI5E-b328zlVHagLSFZ_Iqvqiy6Z8KcU4pLJ3NTaW3Ys_YQsnUn6yUEvwqOY2KESB0mT0Bp3qpNYRBZJVA8PW43YAbOnO7h7ZTwQZJfwMzwhcaaYeZW8pN9rvcNtQX3rgBubSuR-LHKL6k4uAMPh9A8ZxXKZgpI6tpJPxE-uspvYi-foW8VyjZtwOUMvMp3lfZPyL1oQIv_rEUhOGNO_lfi339QcT6F7DwBjXK6C_7U65F-dFZScnReLnVczPfHhJ7z3NnVt46sFcddgZpLIpQyzA6puCcDoRm5ZZCVvm8h-LHVy-9dGWLVxBRhGRdBwBhbiVu2O_CNeVabXl8JhAs3oeV2zDgYfOj_-kkHWsbgHZ0y-tc-HtgoXzsUqaRP1IXQ3g3VDES7UjsaKsfxgURH5EIsrdWwFrWHGoLNfGwwPSwTBI5Mul7LT10-Pg_uBBCiHfQIDqerRQeADRFhV_07GYatBDt-RxwNL4bO59V8ewCzhpdCYRpL363HGldT1Pic-SpTk2NsY2t8MA6__FhJU9JSKYwJpeKMaGLUHA_40PEQ.gb5q-WZTU-ZKpV7WYFbMGMEF2CZIBrFlCUeaZ5ffPDU
? DEFAULT_UPLOAD_PROTOCOL=S3 # don't need this in workflows
? PLATFORMICS_WORKFLOW_RUNNER_PLUGIN=local
? PLATFORMICS_EVENT_BUS_PLUGIN=redis

x-cerbos-variables: &cerbos-variables
? CERBOS_URL=http://wf-cerbos:3592
Expand Down
14 changes: 6 additions & 8 deletions workflows/plugins/event_bus/redis/event_bus_redis.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import json
import os
from typing import List

from platformics.api.core.settings import RedisEventBusSettings
from plugin_types import EventBus, WorkflowStatusMessage, parse_workflow_status_message

import redis.asyncio as aioredis

REDIS_URL = os.environ.get("CZID__EVENT_BUS_REDIS__REDIS_URL", "redis://localhost")
QUEUE_NAME = os.environ.get("CZID__EVENT_BUS_REDIS__QUEUE_NAME", "workflow_status")


class EventBusRedis(EventBus):
def __init__(self) -> None:
self.redis = aioredis.from_url(REDIS_URL)
def __init__(self, setings: RedisEventBusSettings) -> None:
self.settings = setings
self.redis = aioredis.from_url(self.settings.REDIS_URL)

async def send(self, message: WorkflowStatusMessage) -> None:
await self.redis.lpush(QUEUE_NAME, json.dumps(message.asdict()))
await self.redis.lpush(self.settings.QUEUE_NAME, json.dumps(message.asdict()))

async def poll(self) -> List[WorkflowStatusMessage]:
_, message = await self.redis.brpop(QUEUE_NAME)
_, message = await self.redis.brpop(self.settings.QUEUE_NAME)
return [parse_workflow_status_message(json.loads(message))]
6 changes: 3 additions & 3 deletions workflows/run_loader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from config import load_event_buses
from config import load_event_bus
from loader import LoaderDriver
from platformics.api.core.settings import APISettings
from platformics.database.connect import init_async_db
Expand All @@ -11,8 +11,8 @@
app_db = init_async_db(settings.DB_URI)
session = app_db.session()

event_buses = load_event_buses()
loader = LoaderDriver(session, event_buses["redis"])
event_bus = load_event_bus(settings)
loader = LoaderDriver(session, event_bus)

# call main in it's own thread
loop = asyncio.get_event_loop()
Expand Down

0 comments on commit 606d02b

Please sign in to comment.