diff --git a/containers/orchestration/app/main.py b/containers/orchestration/app/main.py index 6053bba9aa..448d174297 100644 --- a/containers/orchestration/app/main.py +++ b/containers/orchestration/app/main.py @@ -8,11 +8,17 @@ Request, File, HTTPException, + WebSocket, ) from typing import Annotated, Optional from pathlib import Path -from zipfile import is_zipfile -from app.utils import load_processing_config, unzip, load_config_assets +from zipfile import is_zipfile, ZipFile +from app.utils import ( + load_processing_config, + unzip_ws, + unzip_http, + load_config_assets, +) from app.config import get_settings from app.services import call_apis from app.models import ( @@ -29,6 +35,7 @@ sample_list_configs_response, ) import json +import io import os # Read settings immediately to fail fast in case there are invalid values. @@ -46,6 +53,42 @@ ) +class WS_File: + # Constructor method (init method) + def __init__(self, file): + # Instance attributes + self.file = file + + +@app.websocket("/process-ws") +async def process_message_endpoint_ws( + websocket: WebSocket, +) -> ProcessMessageResponse: + """ + Creates a websocket connection with the client and accepts a zipped XML file. + The file is processed by the building blocks according to the currently + loaded configuration and emits websocket updates to the client as each + processing step completes. + """ + + await websocket.accept() + while True: + file = await websocket.receive_bytes() + + zipped_file = ZipFile(io.BytesIO(file), "r") + unzipped_file = unzip_ws(zipped_file) + + # Hardcoded message_type for MVP + input = { + "message_type": "eicr", + "include_error_types": "errors", + "message": unzipped_file, + } + + processing_config = load_processing_config("sample-orchestration-config.json") + await call_apis(config=processing_config, input=input, websocket=websocket) + + @app.post("/process", status_code=200, responses=process_message_response_examples) async def process_message_endpoint( request: Request, @@ -60,7 +103,7 @@ async def process_message_endpoint( content = "" if upload_file and is_zipfile(upload_file.file): - content = unzip(upload_file) + content = unzip_http(upload_file) else: try: data = await request.json() @@ -83,7 +126,7 @@ async def process_message_endpoint( "message": content, } - response, responses = call_apis(config=processing_config, input=input) + response, responses = await call_apis(config=processing_config, input=input) if response.status_code == 200: # Parse and work with the API response data (JSON, XML, etc.) diff --git a/containers/orchestration/app/services.py b/containers/orchestration/app/services.py index 7241c2b61a..8036e84582 100644 --- a/containers/orchestration/app/services.py +++ b/containers/orchestration/app/services.py @@ -1,6 +1,7 @@ import os import requests -from fastapi import HTTPException +import json +from fastapi import HTTPException, WebSocket service_urls = { @@ -70,12 +71,16 @@ def post_request(url, payload): return requests.post(url, json=payload) -def call_apis( +async def call_apis( config, input, + websocket: WebSocket = None, ) -> tuple: response = input responses = {} + + progress_dict = {"steps": config["steps"]} + for step in config["steps"]: service = step["service"] endpoint = step["endpoint"] @@ -89,6 +94,16 @@ def call_apis( print(f"Url: {url}") response = post_request(url, payload) print(f"Status Code: {response.status_code}") + + if websocket: + # Write service responses into websocket message + progress_dict[f"{response.url.split('/')[-1]}"] = { + "status_code": response.status_code, + "Message": response.reason, + } + + await websocket.send_text(json.dumps(progress_dict)) + responses[endpoint] = response else: raise HTTPException( diff --git a/containers/orchestration/app/utils.py b/containers/orchestration/app/utils.py index c80689d8f3..d5b8bf8367 100644 --- a/containers/orchestration/app/utils.py +++ b/containers/orchestration/app/utils.py @@ -2,8 +2,8 @@ import pathlib from functools import cache from pathlib import Path -from zipfile import ZipFile from typing import Dict +from zipfile import ZipFile @cache @@ -13,6 +13,7 @@ def load_processing_config(config_name: str) -> dict: first. If no custom configs match the provided name, check the configs provided by default with this service in the 'default_configs/' directory. + :param config_name: Name of config file :param path: The path to an extraction config file. :return: A dictionary containing the extraction config. """ @@ -39,7 +40,17 @@ def read_json_from_assets(filename: str): return json.load(open((pathlib.Path(__file__).parent.parent / "assets" / filename))) -def unzip(zipped_file) -> Dict: +def unzip_ws(zipped_file) -> Dict: + my_zipfile = zipped_file + if my_zipfile.namelist: + file_to_open = [ + file for file in my_zipfile.namelist() if "/CDA_eICR.xml" in file + ][0] + f = my_zipfile.open(file_to_open) + return f.read().decode("utf-8") + + +def unzip_http(zipped_file) -> Dict: my_zipfile = ZipFile(zipped_file.file) file_to_open = [file for file in my_zipfile.namelist() if "/CDA_eICR.xml" in file][ 0 diff --git a/containers/orchestration/requirements.txt b/containers/orchestration/requirements.txt index b84bbbd442..93ea1dd063 100644 --- a/containers/orchestration/requirements.txt +++ b/containers/orchestration/requirements.txt @@ -11,3 +11,5 @@ python-dotenv python-multipart pytest-env testcontainers[compose] +websockets +asyncio