Skip to content

Commit

Permalink
Websockets backend (#887)
Browse files Browse the repository at this point in the history
Add websocket processing to backend

---------

Co-authored-by: nickbristow <[email protected]>
  • Loading branch information
gordonfarrell and nickbristow authored Oct 30, 2023
1 parent 73ab64d commit cd7efb5
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 8 deletions.
51 changes: 47 additions & 4 deletions containers/orchestration/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@
Request,
File,
HTTPException,
WebSocket,
)
from typing import Annotated, Optional
from pathlib import Path
from zipfile import is_zipfile
from app.utils import load_processing_config, unzip, load_config_assets
from zipfile import is_zipfile, ZipFile
from app.utils import (
load_processing_config,
unzip_ws,
unzip_http,
load_config_assets,
)
from app.config import get_settings
from app.services import call_apis
from app.models import (
Expand All @@ -29,6 +35,7 @@
sample_list_configs_response,
)
import json
import io
import os

# Read settings immediately to fail fast in case there are invalid values.
Expand All @@ -46,6 +53,42 @@
)


class WS_File:
# Constructor method (init method)
def __init__(self, file):
# Instance attributes
self.file = file


@app.websocket("/process-ws")
async def process_message_endpoint_ws(
websocket: WebSocket,
) -> ProcessMessageResponse:
"""
Creates a websocket connection with the client and accepts a zipped XML file.
The file is processed by the building blocks according to the currently
loaded configuration and emits websocket updates to the client as each
processing step completes.
"""

await websocket.accept()
while True:
file = await websocket.receive_bytes()

zipped_file = ZipFile(io.BytesIO(file), "r")
unzipped_file = unzip_ws(zipped_file)

# Hardcoded message_type for MVP
input = {
"message_type": "eicr",
"include_error_types": "errors",
"message": unzipped_file,
}

processing_config = load_processing_config("sample-orchestration-config.json")
await call_apis(config=processing_config, input=input, websocket=websocket)


@app.post("/process", status_code=200, responses=process_message_response_examples)
async def process_message_endpoint(
request: Request,
Expand All @@ -60,7 +103,7 @@ async def process_message_endpoint(
content = ""

if upload_file and is_zipfile(upload_file.file):
content = unzip(upload_file)
content = unzip_http(upload_file)
else:
try:
data = await request.json()
Expand All @@ -83,7 +126,7 @@ async def process_message_endpoint(
"message": content,
}

response, responses = call_apis(config=processing_config, input=input)
response, responses = await call_apis(config=processing_config, input=input)

if response.status_code == 200:
# Parse and work with the API response data (JSON, XML, etc.)
Expand Down
19 changes: 17 additions & 2 deletions containers/orchestration/app/services.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import requests
from fastapi import HTTPException
import json
from fastapi import HTTPException, WebSocket


service_urls = {
Expand Down Expand Up @@ -70,12 +71,16 @@ def post_request(url, payload):
return requests.post(url, json=payload)


def call_apis(
async def call_apis(
config,
input,
websocket: WebSocket = None,
) -> tuple:
response = input
responses = {}

progress_dict = {"steps": config["steps"]}

for step in config["steps"]:
service = step["service"]
endpoint = step["endpoint"]
Expand All @@ -89,6 +94,16 @@ def call_apis(
print(f"Url: {url}")
response = post_request(url, payload)
print(f"Status Code: {response.status_code}")

if websocket:
# Write service responses into websocket message
progress_dict[f"{response.url.split('/')[-1]}"] = {
"status_code": response.status_code,
"Message": response.reason,
}

await websocket.send_text(json.dumps(progress_dict))

responses[endpoint] = response
else:
raise HTTPException(
Expand Down
15 changes: 13 additions & 2 deletions containers/orchestration/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import pathlib
from functools import cache
from pathlib import Path
from zipfile import ZipFile
from typing import Dict
from zipfile import ZipFile


@cache
Expand All @@ -13,6 +13,7 @@ def load_processing_config(config_name: str) -> dict:
first. If no custom configs match the provided name, check the configs provided by
default with this service in the 'default_configs/' directory.
:param config_name: Name of config file
:param path: The path to an extraction config file.
:return: A dictionary containing the extraction config.
"""
Expand All @@ -39,7 +40,17 @@ def read_json_from_assets(filename: str):
return json.load(open((pathlib.Path(__file__).parent.parent / "assets" / filename)))


def unzip(zipped_file) -> Dict:
def unzip_ws(zipped_file) -> Dict:
my_zipfile = zipped_file
if my_zipfile.namelist:
file_to_open = [
file for file in my_zipfile.namelist() if "/CDA_eICR.xml" in file
][0]
f = my_zipfile.open(file_to_open)
return f.read().decode("utf-8")


def unzip_http(zipped_file) -> Dict:
my_zipfile = ZipFile(zipped_file.file)
file_to_open = [file for file in my_zipfile.namelist() if "/CDA_eICR.xml" in file][
0
Expand Down
2 changes: 2 additions & 0 deletions containers/orchestration/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ python-dotenv
python-multipart
pytest-env
testcontainers[compose]
websockets
asyncio

0 comments on commit cd7efb5

Please sign in to comment.