Skip to content

Commit

Permalink
Merge pull request #10 from bento-platform/new-chord-services
Browse files Browse the repository at this point in the history
for bentoV2 2.11: New chord_services format
  • Loading branch information
davidlougheed authored Feb 8, 2023
2 parents 5ee2011 + cd620cd commit acd7188
Show file tree
Hide file tree
Showing 10 changed files with 1,124 additions and 1,061 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
uses: actions/checkout@v3

- name: Run Bento build action
uses: bento-platform/bento_build_action@v0.6
uses: bento-platform/bento_build_action@v0.10.1
with:
registry: ghcr.io
registry-username: ${{ github.actor }}
Expand Down
22 changes: 14 additions & 8 deletions bento_service_registry/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import annotations

import os
import subprocess
import sys
Expand All @@ -18,24 +16,32 @@
from .constants import SERVICE_TYPE, SERVICE_NAME
from .routes import service_registry

TRUTH_VALUES = ("true", "1")


def get_bento_debug():
# This is a function to allow monkey-patching the environment on app startup.
return os.environ.get(
"CHORD_DEBUG", os.environ.get("BENTO_DEBUG", os.environ.get("QUART_ENV", "production"))
).strip().lower() in ("true", "1", "development")
).strip().lower() in (*TRUTH_VALUES, "development")


def create_app():
app = Quart(__name__)
bento_url = os.environ.get("CHORD_URL", os.environ.get("BENTO_URL", "http://0.0.0.0:5000/")) # Own node's URL

bento_debug = get_bento_debug()
validate_ssl = os.environ.get("BENTO_VALIDATE_SSL", str(not bento_debug)).strip().lower() in TRUTH_VALUES

app.config.from_mapping(
BENTO_DEBUG=get_bento_debug(),
BENTO_VALIDATE_SSL=not get_bento_debug(),
CHORD_SERVICES=os.environ.get("CHORD_SERVICES", os.environ.get("BENTO_SERVICES", "chord_services.json")),
CHORD_URL=os.environ.get("CHORD_URL", os.environ.get("BENTO_URL", "http://0.0.0.0:5000/")), # Own node's URL
BENTO_DEBUG=bento_debug,
BENTO_VALIDATE_SSL=validate_ssl,
BENTO_SERVICES=os.environ.get("CHORD_SERVICES", os.environ.get("BENTO_SERVICES", "bento_services.json")),
BENTO_URL=bento_url,
BENTO_PUBLIC_URL=os.environ.get("BENTO_PUBLIC_URL", bento_url),
BENTO_PORTAL_PUBLIC_URL=os.environ.get("BENTO_PORTAL_PUBLIC_URL", bento_url),
CONTACT_TIMEOUT=int(os.environ.get("CONTACT_TIMEOUT", 5)),
SERVICE_ID=os.environ.get("SERVICE_ID", ":".join(SERVICE_TYPE.values())),
URL_PATH_FORMAT=os.environ.get("URL_PATH_FORMAT", "api/{artifact}"),
)

path_for_git = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
Expand Down
4 changes: 2 additions & 2 deletions bento_service_registry/constants.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from __future__ import annotations

from bento_lib.types import GA4GHServiceType
from bento_service_registry import __version__

__all__ = [
"BENTO_SERVICE_KIND",
"SERVICE_ARTIFACT",
"SERVICE_TYPE",
"SERVICE_NAME",
]

BENTO_SERVICE_KIND: str = "service-registry"
SERVICE_ARTIFACT: str = "service-registry"

# For exact implementations, this should be org.ga4gh/service-registry/1.0.0.
Expand Down
141 changes: 89 additions & 52 deletions bento_service_registry/routes.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,72 @@
from __future__ import annotations

import aiofiles
import aiohttp
import asyncio
import sys

from bento_lib.responses.quart_errors import quart_not_found_error
import quart
from bento_lib.responses.quart_errors import quart_not_found_error, quart_internal_server_error
from bento_lib.types import GA4GHServiceInfo
from bento_service_registry import __version__
from datetime import datetime
from json.decoder import JSONDecodeError
from quart import Blueprint, current_app, json, request
from typing import Optional
from typing import Dict, List, Optional, Union
from urllib.parse import urljoin

from .constants import SERVICE_NAME, SERVICE_TYPE, SERVICE_ARTIFACT
from .constants import BENTO_SERVICE_KIND, SERVICE_NAME, SERVICE_TYPE, SERVICE_ARTIFACT
from .types import BentoService

service_registry = Blueprint("service_registry", __name__)


def get_service_url(artifact: str) -> str:
return urljoin(current_app.config["CHORD_URL"], current_app.config["URL_PATH_FORMAT"].format(artifact=artifact))


async def get_chord_services() -> list[dict]:
async def get_chord_services_by_compose_id() -> Dict[str, BentoService]:
"""
Reads the list of services from the chord_services.json file
"""

# Load bento_services.json data from the filesystem
try:
async with aiofiles.open(current_app.config["CHORD_SERVICES"], "r") as f:
return [s for s in json.loads(await f.read()) if not s.get("disabled")] # Skip disabled services
async with aiofiles.open(current_app.config["BENTO_SERVICES"], "r") as f:
# Return dictionary of services (id: configuration) Skip disabled services
chord_services_data: Dict[str, BentoService] = json.loads(await f.read())
except Exception as e:
except_name = type(e).__name__
print("Error retrieving information from chord_services JSON file:", except_name)
return []
print("Error retrieving information from chord_services JSON file:", except_name, file=sys.stderr)
return {}

return {
sk: BentoService(
**sv,
url=sv["url_template"].format(
BENTO_URL=current_app.config["BENTO_URL"],
BENTO_PUBLIC_URL=current_app.config["BENTO_PUBLIC_URL"],
BENTO_PORTAL_PUBLIC_URL=current_app.config["BENTO_PORTAL_PUBLIC_URL"],
**sv,
),
) # type: ignore
for sk, sv in chord_services_data.items()
if not sv.get("disabled")
}


async def get_chord_services_by_kind() -> Dict[str, BentoService]:
return {sv["service_kind"]: sv for sv in (await get_chord_services_by_compose_id()).values()}


async def get_service_url(service_kind: str) -> str:
return (await get_chord_services_by_kind())[service_kind]["url"]


async def get_service(session: aiohttp.ClientSession, service_artifact: str) -> Optional[dict[str, dict]]:
async def get_service(session: aiohttp.ClientSession, service_metadata: BentoService) -> Optional[Dict[str, dict]]:
kind = service_metadata["service_kind"]

# special case: requesting info about the current service. Skip networking / self-connect.
if service_artifact == SERVICE_ARTIFACT:
if kind == BENTO_SERVICE_KIND:
return await get_service_info()

timeout = aiohttp.ClientTimeout(total=current_app.config["CONTACT_TIMEOUT"])

s_url: str = get_service_url(service_artifact)
s_url: str = service_metadata["url"]
service_info_url: str = urljoin(f"{s_url}/", "service-info")

# Optional Authorization HTTP header to forward to nested requests
Expand All @@ -54,13 +77,13 @@ async def get_service(session: aiohttp.ClientSession, service_artifact: str) ->
dt = datetime.now()
print(f"[{SERVICE_NAME}] Contacting {service_info_url}{' with bearer token' if auth_header else ''}", flush=True)

service_resp: dict[str, dict] = {}
service_resp: Dict[str, dict] = {}

try:
async with session.get(service_info_url, headers=headers, timeout=timeout) as r:
if r.status != 200:
r_text = await r.text()
print(f"[{SERVICE_NAME}] Non-200 status code on {service_artifact}: {r.status}\n"
print(f"[{SERVICE_NAME}] Non-200 status code on {kind}: {r.status}\n"
f" Content: {r_text}", file=sys.stderr, flush=True)

# If we have the special case where we got a JWT error from the proxy script, we can safely print out
Expand All @@ -72,9 +95,10 @@ async def get_service(session: aiohttp.ClientSession, service_artifact: str) ->
return None

try:
service_resp[service_artifact] = {**(await r.json()), "url": s_url}
except JSONDecodeError:
print(f"[{SERVICE_NAME}] Encountered invalid response from {service_info_url}: {await r.text()}")
service_resp[kind] = {**(await r.json()), "url": s_url}
except (JSONDecodeError, aiohttp.ContentTypeError) as e:
print(f"[{SERVICE_NAME}] Encountered invalid response ({str(e)}) from {service_info_url}: "
f"{await r.text()}")

print(f"[{SERVICE_NAME}] {service_info_url}: Took {(datetime.now() - dt).total_seconds():.1f}s", flush=True)

Expand All @@ -85,22 +109,22 @@ async def get_service(session: aiohttp.ClientSession, service_artifact: str) ->
print(f"[{SERVICE_NAME}] Encountered connection error with {service_info_url}: {str(e)}",
file=sys.stderr, flush=True)

return service_resp.get(service_artifact)
return service_resp.get(kind)


@service_registry.route("/bento-services")
@service_registry.route("/chord-services")
async def chord_services():
return json.jsonify(await get_chord_services())
return json.jsonify(await get_chord_services_by_compose_id())


async def get_services() -> list[dict]:
async def get_services() -> List[dict]:
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=current_app.config["BENTO_VALIDATE_SSL"])) as session:
# noinspection PyTypeChecker
service_list: list[Optional[dict]] = await asyncio.gather(*[
get_service(session, s["type"]["artifact"])
for s in (await get_chord_services())
service_list: List[Optional[dict]] = await asyncio.gather(*[
get_service(session, s)
for s in (await get_chord_services_by_compose_id()).values()
])
return [s for s in service_list if s is not None]

Expand All @@ -111,26 +135,44 @@ async def services():


@service_registry.route("/services/<string:service_id>")
async def service_by_id(service_id: str):
async def service_by_id(service_id: str) -> Union[quart.Response, dict]:
services_by_id = {s["id"]: s for s in (await get_services())}
chord_services_by_kind = await get_chord_services_by_kind()

if service_id not in services_by_id:
return quart_not_found_error(f"Service with ID {service_id} was not found in registry")

svc = services_by_id[service_id]

async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=current_app.config["BENTO_VALIDATE_SSL"])) as session:
return await get_service(session, services_by_id[service_id]["type"]["artifact"])
# Get service by bento.serviceKind, using type.artifact as a backup for legacy reasons
service_data = await get_service(
session, chord_services_by_kind[svc.get("bento", {}).get("serviceKind", svc["type"]["artifact"])])

if service_data is None:
return quart_internal_server_error(f"An internal error was encountered with service with ID {service_id}")

return service_data


@service_registry.route("/services/types")
async def service_types():
types_by_key: dict[str, dict] = {}
async def service_types() -> quart.Response:
types_by_key: Dict[str, dict] = {}
for st in (s["type"] for s in await get_services()):
sk = ":".join(st.values())
types_by_key[sk] = st

return json.jsonify(list(types_by_key.values()))


async def _git_stdout(*args) -> str:
git_proc = await asyncio.create_subprocess_exec(
"git", *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
res, _ = await git_proc.communicate()
return res.decode().rstrip()


async def get_service_info() -> GA4GHServiceInfo:
service_id = current_app.config["SERVICE_ID"]
service_info_dict: GA4GHServiceInfo = {
Expand All @@ -144,8 +186,11 @@ async def get_service_info() -> GA4GHServiceInfo:
},
"contactUrl": "mailto:[email protected]",
"version": __version__,
"url": get_service_url(SERVICE_ARTIFACT),
"environment": "prod"
"url": await get_service_url(SERVICE_ARTIFACT),
"environment": "prod",
"bento": {
"serviceKind": BENTO_SERVICE_KIND,
},
}

if not current_app.config["BENTO_DEBUG"]:
Expand All @@ -154,23 +199,15 @@ async def get_service_info() -> GA4GHServiceInfo:
service_info_dict["environment"] = "dev"

try:
git_proc = await asyncio.create_subprocess_exec(
"git", "describe", "--tags", "--abbrev=0",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
res_tag, _ = await git_proc.communicate()
if res_tag:
service_info_dict["git_tag"] = res_tag.decode().rstrip()

git_proc = await asyncio.create_subprocess_exec(
"git", "branch", "--show-current",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
res_branch, _ = await git_proc.communicate()
if res_branch:
service_info_dict["git_branch"] = res_branch.decode().rstrip()
if res_tag := await _git_stdout("describe", "--tags", "--abbrev=0"):
# noinspection PyTypeChecker
service_info_dict["bento"]["gitTag"] = res_tag
if res_branch := await _git_stdout("branch", "--show-current"):
# noinspection PyTypeChecker
service_info_dict["bento"]["gitBranch"] = res_branch
if res_commit := await _git_stdout("rev-parse", "HEAD"):
# noinspection PyTypeChecker
service_info_dict["bento"]["gitCommit"] = res_commit

except Exception as e:
except_name = type(e).__name__
Expand All @@ -180,6 +217,6 @@ async def get_service_info() -> GA4GHServiceInfo:


@service_registry.route("/service-info")
async def service_info():
async def service_info() -> quart.Response:
# Spec: https://github.com/ga4gh-discovery/ga4gh-service-info
return json.jsonify(await get_service_info())
21 changes: 21 additions & 0 deletions bento_service_registry/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Dict, TypedDict


# TODO: py3.10(?): optional TypedDict props

# required props for chord_services.json entries
class BaseBentoService(TypedDict):
url_template: str
repository: str
data_service: bool


# optional props for chord_services.json entries
class BentoService(BaseBentoService, total=False):
service_kind: str
url: str
manageable_tables: bool
disabled: bool


BentoServices = Dict[str, BentoService]
Loading

0 comments on commit acd7188

Please sign in to comment.