Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add route for fetching available versions of a pipeline across n-APIs #125

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions app/api/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,9 @@ async def get_terms(data_element_URI: str):
node_errors = []
unique_terms_dict = {}

params = {data_element_URI: data_element_URI}
tasks = [
util.send_get_request(
node_url + "attributes/" + data_element_URI, params
node_url + "attributes/" + data_element_URI,
)
for node_url in util.FEDERATION_NODES
]
Expand Down Expand Up @@ -187,3 +186,49 @@ async def get_terms(data_element_URI: str):
cross_node_results=cross_node_results,
node_errors=node_errors,
)


async def get_pipeline_versions(pipeline_term: str):
"""
Make a GET request to all available node APIs for available versions of a specified pipeline.

Parameters
----------
pipeline_term : str
Controlled term of pipeline for which all the available terms should be retrieved.

Returns
-------
dict
Dictionary where the key is the pipeline term and the value is the list of unique available (i.e. used) versions of the pipeline.
"""
# TODO: The logic in this function is very similar to get_terms. Consider refactoring to reduce code duplication.
node_errors = []
all_pipe_versions = []

tasks = [
util.send_get_request(f"{node_url}pipelines/{pipeline_term}/versions")
for node_url in util.FEDERATION_NODES
]
responses = await asyncio.gather(*tasks, return_exceptions=True)

for (node_url, node_name), response in zip(
util.FEDERATION_NODES.items(), responses
):
if isinstance(response, HTTPException):
node_errors.append(
{"node_name": node_name, "error": response.detail}
)
logging.warning(
f"Request to node {node_name} ({node_url}) did not succeed: {response.detail}"
)
else:
all_pipe_versions.extend(response[pipeline_term])

cross_node_results = {pipeline_term: sorted(list(set(all_pipe_versions)))}

return build_combined_response(
total_nodes=len(util.FEDERATION_NODES),
cross_node_results=cross_node_results,
node_errors=node_errors,
)
25 changes: 25 additions & 0 deletions app/api/routers/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from fastapi import APIRouter, Response, status
from pydantic import constr

from .. import crud
from ..models import CONTROLLED_TERM_REGEX, CombinedAttributeResponse

router = APIRouter(prefix="/pipelines", tags=["pipelines"])


@router.get(
"/{pipeline_term}/versions", response_model=CombinedAttributeResponse
)
async def get_pipeline_versions(
pipeline_term: constr(regex=CONTROLLED_TERM_REGEX), response: Response
):
"""
When a GET request is sent, return a dict where the key is the pipeline term and the value
is a list containing all available versions of a pipeline, across all nodes known to the f-API.
"""
response_dict = await crud.get_pipeline_versions(pipeline_term)

if response_dict["errors"]:
response.status_code = status.HTTP_207_MULTI_STATUS

return response_dict
9 changes: 5 additions & 4 deletions app/api/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def validate_query_node_url_list(node_urls: list) -> list:


async def send_get_request(
url: str, params: list, timeout: float = None
url: str, params: list = None, timeout: float = None
) -> dict:
"""
Makes a GET request to one or more Neurobagel nodes.
Expand All @@ -229,8 +229,10 @@ async def send_get_request(
----------
url : str
URL of Neurobagel node API.
params : list
Neurobagel query parameters.
params : list, optional
Neurobagel query parameters, by default None.
timeout : float, optional
Timeout for the request, by default None.

Returns
-------
Expand All @@ -253,7 +255,6 @@ async def send_get_request(
# APIs behind a proxy can be reached
follow_redirects=True,
)

if not response.is_success:
raise HTTPException(
status_code=response.status_code,
Expand Down
3 changes: 2 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fastapi.responses import HTMLResponse, ORJSONResponse, RedirectResponse

from .api import utility as util
from .api.routers import attributes, nodes, query
from .api.routers import attributes, nodes, pipelines, query
from .api.security import check_client_id

logger = logging.getLogger("nb-f-API")
Expand Down Expand Up @@ -99,6 +99,7 @@ def overridden_redoc():

app.include_router(query.router)
app.include_router(attributes.router)
app.include_router(pipelines.router)
app.include_router(nodes.router)

# Automatically start uvicorn server on execution of main.py
Expand Down
34 changes: 34 additions & 0 deletions tests/test_pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import httpx
from fastapi import status


def test_unique_pipeline_versions_returned_from_nodes(
test_app, monkeypatch, set_valid_test_federation_nodes
):
"""
Test that given a successful request to two nodes for versions of a specific pipeline term,
the API correctly returns a list of unique versions across both nodes.
"""

# Predefine the responses from the mocked n-APIs set using the fixture set_valid_test_federation_nodes
async def mock_httpx_get(self, **kwargs):
if "https://firstpublicnode.org/" in kwargs["url"]:
mocked_response_json = {"np:pipeline1": ["1.0.0", "1.0.1"]}
else:
mocked_response_json = {"np:pipeline1": ["1.0.1", "1.0.2"]}
return httpx.Response(
status_code=200,
json=mocked_response_json,
)

monkeypatch.setattr(httpx.AsyncClient, "get", mock_httpx_get)

response = test_app.get("/pipelines/np:pipeline1/versions")
assert response.status_code == status.HTTP_200_OK

response_object = response.json()
assert len(response_object["errors"]) == 0
assert response_object["responses"] == {
"np:pipeline1": ["1.0.0", "1.0.1", "1.0.2"]
}
assert response_object["nodes_response_status"] == "success"