From 5f0839d534977d5b10d9f9ea82289e8a7ccfb0d1 Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Tue, 13 Jul 2021 12:44:54 -0700 Subject: [PATCH 01/10] Add a first sketch of an alert database server Add a FastAPI-based server which provides alerts and schemas according to the routes laid out in DMTN-183. This is done in a package structure which is separate from the LSST stack because I do not expect it to depend upon the stack, and the stack's complexity is not warranted here. --- alertdb/bin/alertdb.py | 51 ++++++++++++++++++++++++++++++++++ alertdb/server.py | 62 ++++++++++++++++++++++++++++++++++++++++++ setup.cfg | 25 +++++++++++++++++ setup.py | 3 ++ 4 files changed, 141 insertions(+) create mode 100644 alertdb/bin/alertdb.py create mode 100644 alertdb/server.py create mode 100644 setup.cfg create mode 100644 setup.py diff --git a/alertdb/bin/alertdb.py b/alertdb/bin/alertdb.py new file mode 100644 index 0000000..0aa45b7 --- /dev/null +++ b/alertdb/bin/alertdb.py @@ -0,0 +1,51 @@ +import uvicorn +import argparse + +from alertdb.server import create_server, FileBackend, GoogleObjectStorageBackend + + +def main(): + parser = argparse.ArgumentParser("alertdb") + parser.add_argument( + "--listen-host", type=str, default="127.0.0.1", + help="host address to listen on for requests", + ) + parser.add_argument( + "--listen-port", type=int, default=5000, + help="host port to listen on for requests", + ) + parser.add_argument( + "--backend", type=str, choices=("local-files", "google-cloud"), default="local-files", + help="backend to use to source alerts", + ) + parser.add_argument( + "--local-file-root", type=str, default=None, + help="when using the local-files backend, the root directory where alerts should be found", + ) + parser.add_argument( + "--gcp-project", type=str, default=None, + help="when using the google-cloud backend, the name of the GCP project", + ) + parser.add_argument( + "--gcp-bucket", type=str, default=None, + help="when using the google-cloud backend, the name of the Google Cloud Storage bucket", + ) + args = parser.parse_args() + + # Configure the right backend + if args.backend == "local-files": + if args.local_file_root is None: + parser.error("--backend=local-files requires --local-file-root be set") + backend = FileBackend(args.local_file_root) + elif args.backend == "google-cloud": + if args.gcp_project is None: + parser.error("--backend=google-cloud requires --gcp-project be set") + if args.gcp_bucket is None: + parser.error("--backend=google-cloud requires --gcp-bucket be set") + backend = GoogleObjectStorageBackend(args.gcp_project, args.gcp_bucket) + else: + # Shouldn't be possible if argparse is using the choices parameter as expected... + raise AsertionError("only valid --backend choices are local-files and google-cloud") + + server = create_server(backend) + uvicorn.run(server, host=args.listen_host, port=args.listen_port, log_level="info") diff --git a/alertdb/server.py b/alertdb/server.py new file mode 100644 index 0000000..e6e3dae --- /dev/null +++ b/alertdb/server.py @@ -0,0 +1,62 @@ +import abc +import os.path + +import google.cloud.storage as gcs +from fastapi import FastAPI, HTTPException + + +class AlertDatabaseBackend(abc.ABC): + @abc.abstractmethod + def get_alert(self, alert_id: str) -> bytes: + raise NotImplementedError() + + @abc.abstractmethod + def get_schema(self, schema_id: str) -> bytes: + raise NotImplementedError() + + +class FileBackend(AlertDatabaseBackend): + def __init__(self, root_dir: str): + self.root_dir = root_dir + + def get_alert(self, alert_id: str) -> bytes: + try: + with open(os.path.join(self.root_dir, "alerts", alert_id)) as f: + return f.read() + except FileNotFoundError: + raise HTTPException(status_code=404, detail="alert not found") + + def get_schema(self, schema_id: str) -> bytes: + try: + with open(os.path.join(self.root_dir, "schemas", schema_id)) as f: + return f.read() + except FileNotFoundError: + raise HTTPException(status_code=404, detail="alert not found") + + +class GoogleObjectStorageBackend(AlertDatabaseBackend): + def __init__(self, gcp_project: str, bucket_name: str): + self.object_store_client = gcs.Client(project=gcp_project) + self.bucket = self.object_store_client.bucket(bucket_name) + + def get_alert(self, alert_id: str) -> bytes: + blob = self.bucket.blob(f"/alert_archive/v1/alerts/{alert_id}.avro.gz") + return blob.download_as_bytes() + + def get_schema(self, schema_id: str) -> bytes: + blob = self.bucket.blob(f"/alert_archive/v1/schemas/{schema_id}.json") + return blob.download_as_bytes() + + +def create_server(backend: AlertDatabaseBackend): + app = FastAPI() + + @app.get("/v1/schemas/{schema_id}") + def get_schema(schema_id: str): + return backend.get_schema(schema_id) + + @app.get("/v1/alerts/{alert_id}") + def get_alert(alert_id: str): + return backend.get_alert(alert_id) + + return app diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..4947d84 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,25 @@ +[metadata] +name = lsst-alert-database-server +version = 0.1.0 +description = A server for the Rubin Observatory alert database +url = https://github.com/lsst-dm/alert_database_server +classifiers = + Programming Language :: Python :: 3 + License :: OSI Approved :: GNU General Public License v3 (GPLv3) + Development Status :: 3 - Alpha +author = Spencer Nelson +author_email = swnelson@uw.edu +license = GPLv3 + +[options] +install_requires = + fastapi + uvicorn + google-cloud-storage + +packages = + alertdb + +[options.entry_points] +console_scripts = + alertdb = alertdb.bin.alertdb:main diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..6068493 --- /dev/null +++ b/setup.py @@ -0,0 +1,3 @@ +from setuptools import setup + +setup() From e7210a88983f3df9ec5fb32531c8ace3cce268bd Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Thu, 15 Jul 2021 16:09:39 -0700 Subject: [PATCH 02/10] Refactor, add first integration test Moved the Alert DB's backend implementations into alertdb/storage.py. Fixed the server's responses so they provide raw bytes rather than JSON-encoding the byte sequences. Added an integration test. Added requests dependency for the integration test. --- alertdb/__init__.py | 0 alertdb/server.py | 63 ++++++--------------- alertdb/storage.py | 51 +++++++++++++++++ setup.cfg | 5 ++ tests/test_integration.py | 115 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 187 insertions(+), 47 deletions(-) create mode 100644 alertdb/__init__.py create mode 100644 alertdb/storage.py create mode 100644 tests/test_integration.py diff --git a/alertdb/__init__.py b/alertdb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alertdb/server.py b/alertdb/server.py index e6e3dae..c9f80d2 100644 --- a/alertdb/server.py +++ b/alertdb/server.py @@ -1,51 +1,10 @@ -import abc -import os.path +from fastapi import FastAPI, HTTPException, Response -import google.cloud.storage as gcs -from fastapi import FastAPI, HTTPException +from alertdb.storage import AlertDatabaseBackend, NotFoundError -class AlertDatabaseBackend(abc.ABC): - @abc.abstractmethod - def get_alert(self, alert_id: str) -> bytes: - raise NotImplementedError() - - @abc.abstractmethod - def get_schema(self, schema_id: str) -> bytes: - raise NotImplementedError() - - -class FileBackend(AlertDatabaseBackend): - def __init__(self, root_dir: str): - self.root_dir = root_dir - - def get_alert(self, alert_id: str) -> bytes: - try: - with open(os.path.join(self.root_dir, "alerts", alert_id)) as f: - return f.read() - except FileNotFoundError: - raise HTTPException(status_code=404, detail="alert not found") - - def get_schema(self, schema_id: str) -> bytes: - try: - with open(os.path.join(self.root_dir, "schemas", schema_id)) as f: - return f.read() - except FileNotFoundError: - raise HTTPException(status_code=404, detail="alert not found") - - -class GoogleObjectStorageBackend(AlertDatabaseBackend): - def __init__(self, gcp_project: str, bucket_name: str): - self.object_store_client = gcs.Client(project=gcp_project) - self.bucket = self.object_store_client.bucket(bucket_name) - - def get_alert(self, alert_id: str) -> bytes: - blob = self.bucket.blob(f"/alert_archive/v1/alerts/{alert_id}.avro.gz") - return blob.download_as_bytes() - - def get_schema(self, schema_id: str) -> bytes: - blob = self.bucket.blob(f"/alert_archive/v1/schemas/{schema_id}.json") - return blob.download_as_bytes() +SCHEMA_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json" +ALERT_CONTENT_TYPE = "application/octet-stream" def create_server(backend: AlertDatabaseBackend): @@ -53,10 +12,20 @@ def create_server(backend: AlertDatabaseBackend): @app.get("/v1/schemas/{schema_id}") def get_schema(schema_id: str): - return backend.get_schema(schema_id) + try: + schema_bytes = backend.get_schema(schema_id) + except NotFoundError as nfe: + raise HTTPException(status_code=404, detail="schema not found") from nfe + + return Response(content=schema_bytes, media_type=SCHEMA_CONTENT_TYPE) @app.get("/v1/alerts/{alert_id}") def get_alert(alert_id: str): - return backend.get_alert(alert_id) + try: + alert_bytes = backend.get_alert(alert_id) + except NotFoundError as nfe: + raise HTTPException(status_code=404, detail="alert not found") from nfe + + return Response(content=alert_bytes, media_type=ALERT_CONTENT_TYPE) return app diff --git a/alertdb/storage.py b/alertdb/storage.py new file mode 100644 index 0000000..dd0c31d --- /dev/null +++ b/alertdb/storage.py @@ -0,0 +1,51 @@ +import abc +import os.path + +import google.cloud.storage as gcs + + +class AlertDatabaseBackend(abc.ABC): + @abc.abstractmethod + def get_alert(self, alert_id: str) -> bytes: + raise NotImplementedError() + + @abc.abstractmethod + def get_schema(self, schema_id: str) -> bytes: + raise NotImplementedError() + + +class FileBackend(AlertDatabaseBackend): + def __init__(self, root_dir: str): + self.root_dir = root_dir + + def get_alert(self, alert_id: str) -> bytes: + try: + with open(os.path.join(self.root_dir, "alerts", alert_id), "rb") as f: + return f.read() + except FileNotFoundError as file_not_found: + raise NotFoundError("alert not found") from file_not_found + + def get_schema(self, schema_id: str) -> bytes: + try: + with open(os.path.join(self.root_dir, "schemas", schema_id), "rb") as f: + return f.read() + except FileNotFoundError as file_not_found: + raise NotFoundError("schema not found") from file_not_found + + +class GoogleObjectStorageBackend(AlertDatabaseBackend): + def __init__(self, gcp_project: str, bucket_name: str): + self.object_store_client = gcs.Client(project=gcp_project) + self.bucket = self.object_store_client.bucket(bucket_name) + + def get_alert(self, alert_id: str) -> bytes: + blob = self.bucket.blob(f"/alert_archive/v1/alerts/{alert_id}.avro.gz") + return blob.download_as_bytes() + + def get_schema(self, schema_id: str) -> bytes: + blob = self.bucket.blob(f"/alert_archive/v1/schemas/{schema_id}.json") + return blob.download_as_bytes() + + +class NotFoundError(Exception): + pass diff --git a/setup.cfg b/setup.cfg index 4947d84..1514c8e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,10 +12,15 @@ author_email = swnelson@uw.edu license = GPLv3 [options] +python_requires = >= 3.8 install_requires = fastapi uvicorn google-cloud-storage + requests + +tests_require = + pytest packages = alertdb diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..12b6d32 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,115 @@ +import os +import sys +import unittest +import google.cloud.storage as gcs +import multiprocessing +import time +import uvicorn +import requests +import logging + + +from alertdb.server import create_server +from alertdb.storage import GoogleObjectStorageBackend + + +logger = logging.getLogger(__name__) +logger.level = logging.DEBUG + + +class ServerIntegrationTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + """ + Create a test bucket, and populate it with three alerts and three schemas. + + The alerts and schemas don't have valid payloads. + """ + gcp_project = os.environ.get("ALERTDB_TEST_GCP_PROJECT", None) + if gcp_project is None: + raise unittest.SkipTest("the $ALERTDB_TEST_GCP_PROJECT environment variable must be set") + bucket_name = "alertdb_server_integration_test_bucket" + client = gcs.Client(project=gcp_project) + + logger.info("creating bucket %s", bucket_name) + bucket = client.create_bucket(bucket_name) + + def delete_bucket(): + logger.info("deleting bucket %s", bucket_name) + bucket.delete() + cls.addClassCleanup(delete_bucket) + + # Populate the test bucket with a few objects in the expected locations. + def delete_blob(blob): + # Callback to delete a blob during cleanup. + logger.info("deleting blob %s", blob.name) + blob.delete() + + alerts = { + "alert-id-1": b"payload-1", + "alert-id-2": b"payload-2", + "alert-id-3": b"payload-3" + } + for alert_id, alert_payload in alerts.items(): + blob = bucket.blob(f"/alert_archive/v1/alerts/{alert_id}.avro.gz") + logger.info("uploading blob %s", blob.name) + # N.B. this method is poorly named; it accepts bytes: + blob.upload_from_string(alert_payload) + cls.addClassCleanup(delete_blob, blob) + + schemas = { + "1": b"schema-payload-1", + "2": b"schema-payload-2", + "3": b"schema-payload-3" + } + for schema_id, schema_payload in schemas.items(): + blob = bucket.blob(f"/alert_archive/v1/schemas/{schema_id}.json") + logger.info("uploading blob %s", blob.name) + # N.B. this method is poorly named; it accepts bytes: + blob.upload_from_string(schema_payload) + cls.addClassCleanup(delete_blob, blob) + + cls.gcp_project = gcp_project + cls.bucket_name = bucket_name + cls.stored_alerts = alerts + cls.stored_schemas = schemas + + def setUp(self): + """ + Run a local instance of the server. + """ + backend = GoogleObjectStorageBackend(self.gcp_project, self.bucket_name) + self.server = create_server(backend) + self.server_host = "127.0.0.1" + self.server_port = 14541 + self.server_process = multiprocessing.Process( + target=uvicorn.run, + args=(self.server, ), + kwargs={ + "host": self.server_host, + "port": self.server_port, + }, + daemon=True, + ) + logger.info("launching server process") + self.server_process.start() + logger.info("server process pid: %s", self.server_process.pid) + time.sleep(0.5) # Time for the server to start up + + def tearDown(self): + logger.info("terminating server") + self.server_process.terminate() + + def test_get_existing_alerts(self): + for alert_id, alert in self.stored_alerts.items(): + have = self._get_alert(alert_id) + assert have.content == alert + + def _get_alert(self, alert_id: str): + host = self.server_host + port = self.server_port + url = f"http://{host}:{port}/v1/alerts/{alert_id}" + logger.info("fetching %s", url) + response = requests.get(url) + response.raise_for_status() + return response From 64ada4cfc6686df1a554d9d47177b6ec8c3cec6d Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Thu, 15 Jul 2021 16:22:14 -0700 Subject: [PATCH 03/10] Provide a 404 when an alert or schema is not found --- alertdb/storage.py | 15 +++++++++++---- tests/test_integration.py | 32 ++++++++++++++++++++++++++------ 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/alertdb/storage.py b/alertdb/storage.py index dd0c31d..db91be8 100644 --- a/alertdb/storage.py +++ b/alertdb/storage.py @@ -1,6 +1,7 @@ import abc import os.path +import google.api_core.exceptions import google.cloud.storage as gcs @@ -39,12 +40,18 @@ def __init__(self, gcp_project: str, bucket_name: str): self.bucket = self.object_store_client.bucket(bucket_name) def get_alert(self, alert_id: str) -> bytes: - blob = self.bucket.blob(f"/alert_archive/v1/alerts/{alert_id}.avro.gz") - return blob.download_as_bytes() + try: + blob = self.bucket.blob(f"/alert_archive/v1/alerts/{alert_id}.avro.gz") + return blob.download_as_bytes() + except google.api_core.exceptions.NotFound as not_found: + raise NotFoundError("alert not found") from not_found def get_schema(self, schema_id: str) -> bytes: - blob = self.bucket.blob(f"/alert_archive/v1/schemas/{schema_id}.json") - return blob.download_as_bytes() + try: + blob = self.bucket.blob(f"/alert_archive/v1/schemas/{schema_id}.json") + return blob.download_as_bytes() + except google.api_core.exceptions.NotFound as not_found: + raise NotFoundError("alert not found") from not_found class NotFoundError(Exception): diff --git a/tests/test_integration.py b/tests/test_integration.py index 12b6d32..03b19a5 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -102,14 +102,34 @@ def tearDown(self): def test_get_existing_alerts(self): for alert_id, alert in self.stored_alerts.items(): - have = self._get_alert(alert_id) - assert have.content == alert + response = self._get_alert(alert_id) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, alert) - def _get_alert(self, alert_id: str): + def test_get_existing_schemas(self): + for schema_id, schema in self.stored_schemas.items(): + response = self._get_schema(schema_id) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, schema) + + def test_get_missing_alert(self): + response = self._get_alert("bogus") + self.assertEqual(response.status_code, 404) + + def test_get_missing_schema(self): + response = self._get_schema("bogus") + self.assertEqual(response.status_code, 404) + + def _get_alert(self, alert_id: str) -> requests.Response: host = self.server_host port = self.server_port url = f"http://{host}:{port}/v1/alerts/{alert_id}" logger.info("fetching %s", url) - response = requests.get(url) - response.raise_for_status() - return response + return requests.get(url) + + def _get_schema(self, schema_id: str) -> requests.Response: + host = self.server_host + port = self.server_port + url = f"http://{host}:{port}/v1/schemas/{schema_id}" + logger.info("fetching %s", url) + return requests.get(url) From 12152d7f9fd3feba42530473bdedd7c64cfd0909 Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Thu, 15 Jul 2021 19:31:20 -0700 Subject: [PATCH 04/10] Simplify integration test --- tests/test_integration.py | 37 ++++--------------------------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/tests/test_integration.py b/tests/test_integration.py index 03b19a5..e4357d8 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,12 +1,9 @@ import os -import sys import unittest import google.cloud.storage as gcs -import multiprocessing -import time -import uvicorn import requests import logging +from fastapi.testclient import TestClient from alertdb.server import create_server @@ -80,25 +77,7 @@ def setUp(self): """ backend = GoogleObjectStorageBackend(self.gcp_project, self.bucket_name) self.server = create_server(backend) - self.server_host = "127.0.0.1" - self.server_port = 14541 - self.server_process = multiprocessing.Process( - target=uvicorn.run, - args=(self.server, ), - kwargs={ - "host": self.server_host, - "port": self.server_port, - }, - daemon=True, - ) - logger.info("launching server process") - self.server_process.start() - logger.info("server process pid: %s", self.server_process.pid) - time.sleep(0.5) # Time for the server to start up - - def tearDown(self): - logger.info("terminating server") - self.server_process.terminate() + self.client = TestClient(self.server) def test_get_existing_alerts(self): for alert_id, alert in self.stored_alerts.items(): @@ -121,15 +100,7 @@ def test_get_missing_schema(self): self.assertEqual(response.status_code, 404) def _get_alert(self, alert_id: str) -> requests.Response: - host = self.server_host - port = self.server_port - url = f"http://{host}:{port}/v1/alerts/{alert_id}" - logger.info("fetching %s", url) - return requests.get(url) + return self.client.get(f"/v1/alerts/{alert_id}") def _get_schema(self, schema_id: str) -> requests.Response: - host = self.server_host - port = self.server_port - url = f"http://{host}:{port}/v1/schemas/{schema_id}" - logger.info("fetching %s", url) - return requests.get(url) + return self.client.get(f"/v1/schemas/{schema_id}") From 1a800faafd3f67acc94b41e1b487f3d820aa1c22 Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Fri, 16 Jul 2021 13:56:45 -0700 Subject: [PATCH 05/10] Add a bunch of documenting comments --- alertdb/server.py | 35 ++++++++++++++++++- alertdb/storage.py | 87 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/alertdb/server.py b/alertdb/server.py index c9f80d2..9c3c1b5 100644 --- a/alertdb/server.py +++ b/alertdb/server.py @@ -1,13 +1,46 @@ +"""HTTP frontend server implementation.""" + from fastapi import FastAPI, HTTPException, Response from alertdb.storage import AlertDatabaseBackend, NotFoundError +# This Content-Type is described as the "preferred content type" for a Confluent +# Schema Registry here: +# https://docs.confluent.io/platform/current/schema-registry/develop/api.html#content-types +# We're not running a Confluent Schema Registry, and don't conform to the API of +# one, but we do serve schemas, so this seems possibly appropriate. SCHEMA_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json" + +# There's no consensus on an Avro content type. application/avro+binary is +# sometimes used, but not very standard. If we did that, we'd want to specify +# the content-encoding as well, since contents are gzipped. +# +# application/octet-stream, which represents arbitrary bytes, is maybe overly +# general, but it's at least well-understood. ALERT_CONTENT_TYPE = "application/octet-stream" -def create_server(backend: AlertDatabaseBackend): +def create_server(backend: AlertDatabaseBackend) -> FastAPI: + """ + Creates a new instance of an HTTP handler which fetches alerts and schemas + from a backend. + + Parameters + ---------- + backend : AlertDatabaseBackend + The backend that stores alerts to be served. + + Returns + ------- + FastAPI : A FastAPI application which routes HTTP requests to return schemas. + """ + + # FastAPI documentation suggests that the application be a global singleton, + # with handlers defined as top-level functions, but this doesn't seem to + # permit any way of passing in a persistent backend. So, this little + # create_server closure exists to allow dependency injection. + app = FastAPI() @app.get("/v1/schemas/{schema_id}") diff --git a/alertdb/storage.py b/alertdb/storage.py index db91be8..0a8a854 100644 --- a/alertdb/storage.py +++ b/alertdb/storage.py @@ -1,3 +1,7 @@ +""" +Implementations of backend storage systems for the alert database server. +""" + import abc import os.path @@ -6,16 +10,92 @@ class AlertDatabaseBackend(abc.ABC): + """ + An abstract interface representing a storage backend for alerts and schemas. + """ + @abc.abstractmethod def get_alert(self, alert_id: str) -> bytes: + """ + Retrieve a single alert's payload, in compressed Confluent Wire Format. + + Confluent Wire Format is described here: + https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format + + To summarize, it is a 5-byte header, followed by binary-encoded Avro data. + + The first header byte is magic byte, with a value of 0. + The next 4 bytes are a 4-byte schema ID, which is an unsigned 32-bit + integer in big-endian order. + + Parameters + ---------- + alert_id : str + The ID of the alert to be retrieved. + + Returns + ------- + bytes + The alert contents in compressed Confluent Wire Format: serialized + with Avro's binary encoding, prefixed with a magic byte and the + schema ID, and then compressed with gzip. + + Raises + ------ + NotFoundError + If no alert can be found with that ID. + + Examples + -------- + >>> import gzip + >>> import struct + >>> import io + >>> raw_response = backend.get_alert("alert-id") + >>> wire_format_payload = io.BytesIO(gzip.decompress(raw_response)) + >>> magic_byte = wire_format_payload.read(1) + >>> schema_id = struct.unpack(">I", wire_format_payload.read(4)) + >>> alert_contents = wire_format_payload.read() + """ raise NotImplementedError() @abc.abstractmethod def get_schema(self, schema_id: str) -> bytes: + """Retrieve a single alert schema JSON document in its JSON-serialized form. + + Parameters + ---------- + schema_id : str + The ID of the schema to be retrieved. + + Returns + ------- + bytes + The schema document, encoded with JSON. + + Raises + ------ + NotFoundError + If no schema can be found with that ID. + + Examples + -------- + >>> import gzip + >>> import struct + >>> import io + >>> alert_payload = backend.get_alert("alert-id") + >>> alert_payload = gzip.decompress(alert_payload) + >>> alert_ + """ raise NotImplementedError() class FileBackend(AlertDatabaseBackend): + """ + Retrieves alerts and schemas from a directory on disk. + + This is provided as an example, to ensure that it's clear how to implement + an AlertDatabaseBackend subclass. + """ def __init__(self, root_dir: str): self.root_dir = root_dir @@ -35,6 +115,11 @@ def get_schema(self, schema_id: str) -> bytes: class GoogleObjectStorageBackend(AlertDatabaseBackend): + """ + Retrieves alerts and schemas from a Google Cloud Storage bucket. + + The path for alert and schema objects follows the scheme in DMTN-183. + """ def __init__(self, gcp_project: str, bucket_name: str): self.object_store_client = gcs.Client(project=gcp_project) self.bucket = self.object_store_client.bucket(bucket_name) @@ -55,4 +140,4 @@ def get_schema(self, schema_id: str) -> bytes: class NotFoundError(Exception): - pass + """Error which represents a failure to find an alert or schema in a backend.""" From 8566e03b30cfc89f2a1271c9fb57d682f14144ee Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Fri, 16 Jul 2021 13:57:37 -0700 Subject: [PATCH 06/10] Fix alertdb.py errors Fix an invalid import, and correctly spell 'AssertionError'. --- alertdb/bin/alertdb.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/alertdb/bin/alertdb.py b/alertdb/bin/alertdb.py index 0aa45b7..51a4750 100644 --- a/alertdb/bin/alertdb.py +++ b/alertdb/bin/alertdb.py @@ -1,7 +1,8 @@ import uvicorn import argparse -from alertdb.server import create_server, FileBackend, GoogleObjectStorageBackend +from alertdb.server import create_server +from alertdb.storage import FileBackend, GoogleObjectStorageBackend def main(): @@ -45,7 +46,7 @@ def main(): backend = GoogleObjectStorageBackend(args.gcp_project, args.gcp_bucket) else: # Shouldn't be possible if argparse is using the choices parameter as expected... - raise AsertionError("only valid --backend choices are local-files and google-cloud") + raise AssertionError("only valid --backend choices are local-files and google-cloud") server = create_server(backend) uvicorn.run(server, host=args.listen_host, port=args.listen_port, log_level="info") From aaf93effd5eba345215bd5cbd4aef678034e6054 Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Fri, 16 Jul 2021 14:05:13 -0700 Subject: [PATCH 07/10] Add some docstrings to tests. --- tests/test_integration.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_integration.py b/tests/test_integration.py index e4357d8..6a72f72 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -80,22 +80,26 @@ def setUp(self): self.client = TestClient(self.server) def test_get_existing_alerts(self): + """Test that retrieving an alert over HTTP works as expected.""" for alert_id, alert in self.stored_alerts.items(): response = self._get_alert(alert_id) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, alert) def test_get_existing_schemas(self): + """Test that retrieving a schema over HTTP works as expected.""" for schema_id, schema in self.stored_schemas.items(): response = self._get_schema(schema_id) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, schema) def test_get_missing_alert(self): + """Test that retrieving an alert that does not exist gives a 404.""" response = self._get_alert("bogus") self.assertEqual(response.status_code, 404) def test_get_missing_schema(self): + """Test that retrieving a schema that does not exist gives a 404.""" response = self._get_schema("bogus") self.assertEqual(response.status_code, 404) From 653b92d47e26fe4d37e5e73a97445def043ba104 Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Fri, 16 Jul 2021 14:05:33 -0700 Subject: [PATCH 08/10] Provide a bit more help in alertdb command --- alertdb/bin/alertdb.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/alertdb/bin/alertdb.py b/alertdb/bin/alertdb.py index 51a4750..5332588 100644 --- a/alertdb/bin/alertdb.py +++ b/alertdb/bin/alertdb.py @@ -6,7 +6,10 @@ def main(): - parser = argparse.ArgumentParser("alertdb") + parser = argparse.ArgumentParser( + "alertdb", formatter_class=argparse.ArgumentDefaultsHelpFormatter, + description="Run an alert database HTTP server." + ) parser.add_argument( "--listen-host", type=str, default="127.0.0.1", help="host address to listen on for requests", From b466b051c6160d927a7989421da7284722691366 Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Fri, 16 Jul 2021 14:05:44 -0700 Subject: [PATCH 09/10] Add some instructions to the README. --- README.md | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/README.md b/README.md index 527a99d..6666576 100644 --- a/README.md +++ b/README.md @@ -18,3 +18,56 @@ pip install . The server is installed by `pip` as a command named `alertdb`: ``` +% alertdb -h +usage: alertdb [-h] [--listen-host LISTEN_HOST] [--listen-port LISTEN_PORT] + [--backend {local-files,google-cloud}] + [--local-file-root LOCAL_FILE_ROOT] [--gcp-project GCP_PROJECT] + [--gcp-bucket GCP_BUCKET] + +Run an alert database HTTP server. + +optional arguments: + -h, --help show this help message and exit + --listen-host LISTEN_HOST + host address to listen on for requests (default: + 127.0.0.1) + --listen-port LISTEN_PORT + host port to listen on for requests (default: 5000) + --backend {local-files,google-cloud} + backend to use to source alerts (default: local-files) + --local-file-root LOCAL_FILE_ROOT + when using the local-files backend, the root directory + where alerts should be found (default: None) + --gcp-project GCP_PROJECT + when using the google-cloud backend, the name of the + GCP project (default: None) + --gcp-bucket GCP_BUCKET + when using the google-cloud backend, the name of the + Google Cloud Storage bucket (default: None) +``` + +## Running tests ## + +The only test is an integration test against the Interim Data Facility on Google +Cloud. + +You'll need an activated Google Cloud SDK to use it (like with `gcloud auth +application-default login`). + +Then, specify a GCP project to run against via an `$ALERTDB_TEST_GCP_PROJECT` +environment variable, and run the tests: + +``` +% export ALERTDB_TEST_GCP_PROJECT=alert-stream +% pytest . +============================= test session starts ============================== +platform linux -- Python 3.8.10, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 +rootdir: /home/swnelson/code/rubin/alert_database_server +collected 4 items + +tests/test_integration.py .... [100%] + +============================== 4 passed in 10.88s ============================== +``` + +The test needs permissions to create buckets and blobs in your project. From 8a902c5eda080a4331d633354f556d3bb734f2ee Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Tue, 27 Jul 2021 10:18:37 -0700 Subject: [PATCH 10/10] Fix docstring typo --- alertdb/storage.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/alertdb/storage.py b/alertdb/storage.py index 0a8a854..b0abcc7 100644 --- a/alertdb/storage.py +++ b/alertdb/storage.py @@ -82,9 +82,18 @@ def get_schema(self, schema_id: str) -> bytes: >>> import gzip >>> import struct >>> import io + >>> import json + >>> import fastavro + >>> + >>> # Get an alert from the backend, and extract its schema ID >>> alert_payload = backend.get_alert("alert-id") - >>> alert_payload = gzip.decompress(alert_payload) - >>> alert_ + >>> wire_format_payload = io.BytesIO(gzip.decompress(alert_payload)) + >>> magic_byte = wire_format_payload.read(1) + >>> schema_id = struct.unpack(">I", wire_format_payload.read(4)) + >>> + >>> # Download and use the schema + >>> schema_bytes = backend.get_schema(schema_id) + >>> schema = fastavro.parse(json.loads(schema_bytes)) """ raise NotImplementedError()