diff --git a/app/api/crud.py b/app/api/crud.py index cc7f2da..33987c2 100644 --- a/app/api/crud.py +++ b/app/api/crud.py @@ -156,10 +156,9 @@ async def get_terms(data_element_URI: str): node_errors = [] unique_terms_dict = {} - params = {data_element_URI: data_element_URI} tasks = [ util.send_get_request( - node_url + "attributes/" + data_element_URI, params + node_url + "attributes/" + data_element_URI, ) for node_url in util.FEDERATION_NODES ] @@ -187,3 +186,49 @@ async def get_terms(data_element_URI: str): cross_node_results=cross_node_results, node_errors=node_errors, ) + + +async def get_pipeline_versions(pipeline_term: str): + """ + Make a GET request to all available node APIs for available versions of a specified pipeline. + + Parameters + ---------- + pipeline_term : str + Controlled term of pipeline for which all the available terms should be retrieved. + + Returns + ------- + dict + Dictionary where the key is the pipeline term and the value is the list of unique available (i.e. used) versions of the pipeline. + """ + # TODO: The logic in this function is very similar to get_terms. Consider refactoring to reduce code duplication. + node_errors = [] + all_pipe_versions = [] + + tasks = [ + util.send_get_request(f"{node_url}pipelines/{pipeline_term}/versions") + for node_url in util.FEDERATION_NODES + ] + responses = await asyncio.gather(*tasks, return_exceptions=True) + + for (node_url, node_name), response in zip( + util.FEDERATION_NODES.items(), responses + ): + if isinstance(response, HTTPException): + node_errors.append( + {"node_name": node_name, "error": response.detail} + ) + logging.warning( + f"Request to node {node_name} ({node_url}) did not succeed: {response.detail}" + ) + else: + all_pipe_versions.extend(response[pipeline_term]) + + cross_node_results = {pipeline_term: sorted(list(set(all_pipe_versions)))} + + return build_combined_response( + total_nodes=len(util.FEDERATION_NODES), + cross_node_results=cross_node_results, + node_errors=node_errors, + ) diff --git a/app/api/routers/pipelines.py b/app/api/routers/pipelines.py new file mode 100644 index 0000000..af11762 --- /dev/null +++ b/app/api/routers/pipelines.py @@ -0,0 +1,25 @@ +from fastapi import APIRouter, Response, status +from pydantic import constr + +from .. import crud +from ..models import CONTROLLED_TERM_REGEX, CombinedAttributeResponse + +router = APIRouter(prefix="/pipelines", tags=["pipelines"]) + + +@router.get( + "/{pipeline_term}/versions", response_model=CombinedAttributeResponse +) +async def get_pipeline_versions( + pipeline_term: constr(regex=CONTROLLED_TERM_REGEX), response: Response +): + """ + When a GET request is sent, return a dict where the key is the pipeline term and the value + is a list containing all available versions of a pipeline, across all nodes known to the f-API. + """ + response_dict = await crud.get_pipeline_versions(pipeline_term) + + if response_dict["errors"]: + response.status_code = status.HTTP_207_MULTI_STATUS + + return response_dict diff --git a/app/api/utility.py b/app/api/utility.py index 9aa2bdd..1dd63ff 100644 --- a/app/api/utility.py +++ b/app/api/utility.py @@ -220,7 +220,7 @@ def validate_query_node_url_list(node_urls: list) -> list: async def send_get_request( - url: str, params: list, timeout: float = None + url: str, params: list = None, timeout: float = None ) -> dict: """ Makes a GET request to one or more Neurobagel nodes. @@ -229,8 +229,10 @@ async def send_get_request( ---------- url : str URL of Neurobagel node API. - params : list - Neurobagel query parameters. + params : list, optional + Neurobagel query parameters, by default None. + timeout : float, optional + Timeout for the request, by default None. Returns ------- @@ -253,7 +255,6 @@ async def send_get_request( # APIs behind a proxy can be reached follow_redirects=True, ) - if not response.is_success: raise HTTPException( status_code=response.status_code, diff --git a/app/main.py b/app/main.py index 60432db..74a491c 100644 --- a/app/main.py +++ b/app/main.py @@ -10,7 +10,7 @@ from fastapi.responses import HTMLResponse, ORJSONResponse, RedirectResponse from .api import utility as util -from .api.routers import attributes, nodes, query +from .api.routers import attributes, nodes, pipelines, query from .api.security import check_client_id logger = logging.getLogger("nb-f-API") @@ -99,6 +99,7 @@ def overridden_redoc(): app.include_router(query.router) app.include_router(attributes.router) +app.include_router(pipelines.router) app.include_router(nodes.router) # Automatically start uvicorn server on execution of main.py diff --git a/tests/test_pipelines.py b/tests/test_pipelines.py new file mode 100644 index 0000000..93c4246 --- /dev/null +++ b/tests/test_pipelines.py @@ -0,0 +1,34 @@ +import httpx +from fastapi import status + + +def test_unique_pipeline_versions_returned_from_nodes( + test_app, monkeypatch, set_valid_test_federation_nodes +): + """ + Test that given a successful request to two nodes for versions of a specific pipeline term, + the API correctly returns a list of unique versions across both nodes. + """ + + # Predefine the responses from the mocked n-APIs set using the fixture set_valid_test_federation_nodes + async def mock_httpx_get(self, **kwargs): + if "https://firstpublicnode.org/" in kwargs["url"]: + mocked_response_json = {"np:pipeline1": ["1.0.0", "1.0.1"]} + else: + mocked_response_json = {"np:pipeline1": ["1.0.1", "1.0.2"]} + return httpx.Response( + status_code=200, + json=mocked_response_json, + ) + + monkeypatch.setattr(httpx.AsyncClient, "get", mock_httpx_get) + + response = test_app.get("/pipelines/np:pipeline1/versions") + assert response.status_code == status.HTTP_200_OK + + response_object = response.json() + assert len(response_object["errors"]) == 0 + assert response_object["responses"] == { + "np:pipeline1": ["1.0.0", "1.0.1", "1.0.2"] + } + assert response_object["nodes_response_status"] == "success"