Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add comments to workflows #267

Merged
merged 10 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions platformics/client/impersonation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async def impersonate(self, user_id: int) -> str:
resp = await client.get(
f"{IDENTITY_SERVICE_URL}/impersonate/?user_id={user_id}",
headers={"Authorization": f"Bearer {self.get_token()}"},
timeout=8,
)
try:
return resp.json()["token"]
Expand Down
5 changes: 5 additions & 0 deletions workflows/api/loader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ def _bind_handle_message(self) -> Callable[[WorkflowStatusMessage], Awaitable[No
async def handle_message(event: WorkflowStatusMessage) -> None:
print("event", event, file=sys.stderr)
if isinstance(event, WorkflowStartedMessage):
# Set the workflow run to running
workflow_run = (
await self.session.execute(select(WorkflowRun).where(WorkflowRun.execution_id == event.runner_id))
).scalar_one_or_none()
# If workflow_run is not None and the status is CREATED, PENDING, or STARTED
# then set the status to RUNNING
if workflow_run and workflow_run.status in [
WorkflowRunStatus.CREATED,
WorkflowRunStatus.PENDING,
Expand All @@ -90,6 +93,8 @@ async def handle_message(event: WorkflowStatusMessage) -> None:
await self.session.commit()

if isinstance(event, WorkflowSucceededMessage):
# If the event is a WorkflowSucceededMessage, then set the workflow run to SUCCEEDED
# and run the output loaders
_event: WorkflowSucceededMessage = event
result = await self.session.execute(
select(WorkflowRun)
Expand Down
10 changes: 10 additions & 0 deletions workflows/manifest/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def _check_references(
loader_name: str,
inputs: set[str],
) -> typing.Generator[InvalidInputReference, None, None]:
"""checks that all input references are present and yields errors if they are not"""
for k, input_name in self.inputs.items():
if input_name not in inputs:
yield InvalidInputReference(
Expand Down Expand Up @@ -283,6 +284,7 @@ def normalize_inputs(inputs: Iterable[tuple[str, T]]) -> dict[str, T | list[T]]:

@model_validator(mode="after")
def _unique_input_names(self): # type: ignore
"""ensures that raw input names do not duplicate entity input names"""
input_names = set(self.entity_inputs.keys())
duplicate_names = [k for k in self.raw_inputs if k in input_names]
if duplicate_names:
Expand Down Expand Up @@ -318,18 +320,26 @@ def validate_inputs(
entity_inputs: dict[str, EntityInput | list[EntityInput]],
raw_inputs: dict[str, Primitive | list[Primitive]],
) -> _InputValidationErrors:
"""Validates the inputs against the manifest"""

# for both entity and raw inputs
for entity_or_raw, inputs, input_arguments in [
("entity", entity_inputs, self.entity_inputs),
("raw", raw_inputs, self.raw_inputs),
]:
required_inputs = {k: False for k, v in input_arguments.items() if v.required} # type: ignore

# loop through the inputs and validate them
for name, input in inputs.items(): # type: ignore
# check if input_argument exists for the input name
input_argument = input_arguments.get(name) # type: ignore
if not input_argument:
yield InputNotSupported(name, entity_or_raw) # type: ignore
continue
# mark required input as found
if name in required_inputs:
required_inputs[name] = True
# validate the input
for error in input_argument.validate_input(input):
yield error
for required_input in [k for k, v in required_inputs.items() if not v]:
Expand Down
75 changes: 75 additions & 0 deletions workflows/scripts/dev_seeder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import argparse
from sqlalchemy.orm import joinedload, aliased
from platformics.util.seed_utils import (
TEST_USER_ID,
TEST_COLLECTION_ID,
)
from platformics.database.connect import init_sync_db
from test_infra.factories.main import SessionStorage
from database.models import WorkflowRun, WorkflowVersion, Workflow
from test_infra.factories.workflow_run import WorkflowRunFactory
from settings import APISettings

# maps the message type to the status of the workflow run
# before the message is sent
message_type_prestatus_map = {"started": "PENDING", "failed": "RUNNING", "succeeded": "RUNNING"}


class DevSeeder:
"""Seed the database with mock data for local development"""

def __init__(self, user_id: int = TEST_USER_ID, collection_id: int = TEST_COLLECTION_ID) -> None:
self.user_id = user_id
self.collection_id = collection_id

# set up the database session
settings = APISettings.model_validate({})
app_db = init_sync_db(settings.SYNC_DB_URI)
self.sess = app_db.session()
SessionStorage.set_session(self.sess) # needed for seed functions

def seed_workflow_runs(self, message_type: str, runner_id: str, workflow_name: str) -> None:
"""seed workflow runs for testing loaders"""
# check if a workflow run with the same runner_id exists
existing_workflow_run = self.sess.query(WorkflowRun).filter(WorkflowRun.execution_id == runner_id).one_or_none()
if existing_workflow_run:
print(f"Workflow run with runner_id {runner_id} already exists")
else:
workflow_alias = aliased(Workflow)
workflow_version = (
self.sess.query(WorkflowVersion)
.join(workflow_alias, WorkflowVersion.workflow)
.options(joinedload(WorkflowVersion.workflow))
.filter(workflow_alias.name == workflow_name)
.first()
)
workflow_run = WorkflowRunFactory.create(
owner_user_id=self.user_id,
collection_id=self.collection_id,
execution_id=runner_id,
workflow_version=workflow_version,
status=message_type_prestatus_map[message_type],
raw_inputs_json="{}",
outputs_json=None,
error_message=None,
)

self.sess.add(workflow_run)
self.sess.commit()


def main(function: str, user_id: int, collection_id: int) -> None:
seeder = DevSeeder(user_id, collection_id)
if function == "workflow_runs":
seeder.seed_workflow_runs("started", "test_runner_id", "Simple Manifest") # change "Simple Manifest" as needed


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="create dev seed data",
)
parser.add_argument("function", type=str, help="seed function to run")
parser.add_argument("--user", type=int, default=TEST_USER_ID)
parser.add_argument("--project", type=int, default=TEST_COLLECTION_ID)
args = parser.parse_args()
main(args.function, args.user, args.project)
61 changes: 61 additions & 0 deletions workflows/scripts/test_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from api.config import load_event_bus
import argparse
from settings import APISettings
import asyncio
from plugins.plugin_types import WorkflowStartedMessage, WorkflowFailedMessage, WorkflowSucceededMessage
import json
from dev_seeder import DevSeeder

message_type_map = {
"started": WorkflowStartedMessage,
"failed": WorkflowFailedMessage,
"succeeded": WorkflowSucceededMessage,
}


async def main(
collection_id: int,
user_id: int,
message_type: str,
runner_id: str,
outputs_json: str,
seed: bool,
workflow_name: str,
) -> None:
settings = APISettings.model_validate({})
event_bus = load_event_bus(settings)

if seed:
ds = DevSeeder(user_id=user_id, collection_id=collection_id)
ds.seed_workflow_runs(message_type, runner_id, workflow_name)

message = message_type_map[message_type]
outputs = json.loads(outputs_json)
await event_bus.send(message(runner_id=runner_id, outputs=outputs))


if __name__ == "__main__":
parser = argparse.ArgumentParser("Script to test the loaders and listeners")
parser.add_argument(
"message_type", type=str, choices=["started", "failed", "succeeded"], help="Type of message to send"
)
parser.add_argument("--collection_id", type=int, default=444, help="Runner ID to send")
parser.add_argument("--user_id", type=int, default=111, help="Runner ID to send")
parser.add_argument("--runner_id", type=str, default="11111", help="Runner ID to send")
parser.add_argument("--outputs_json", type=str, default="{}", help="Outputs JSON to send")
parser.add_argument("--seed", type=bool, default=False, help="Seed the database")
parser.add_argument("--workflow_name", type=str, default="Simple Manifest", help="Name of the workflow to run")
args = parser.parse_args()

loop = asyncio.get_event_loop()
loop.run_until_complete(
main(
args.collection_id,
args.user_id,
args.message_type,
args.runner_id,
args.outputs_json,
args.seed,
args.workflow_name,
)
)
Loading