Skip to content

Commit

Permalink
Merge pull request #1 from lsst-dm/tickets/DM-30446
Browse files Browse the repository at this point in the history
DM-30446: Implement DMTN-183's alert database server
  • Loading branch information
spenczar authored Jul 27, 2021
2 parents 969f1e6 + 8a902c5 commit e780649
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 0 deletions.
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,56 @@ pip install .

The server is installed by `pip` as a command named `alertdb`:
```
% alertdb -h
usage: alertdb [-h] [--listen-host LISTEN_HOST] [--listen-port LISTEN_PORT]
[--backend {local-files,google-cloud}]
[--local-file-root LOCAL_FILE_ROOT] [--gcp-project GCP_PROJECT]
[--gcp-bucket GCP_BUCKET]
Run an alert database HTTP server.
optional arguments:
-h, --help show this help message and exit
--listen-host LISTEN_HOST
host address to listen on for requests (default:
127.0.0.1)
--listen-port LISTEN_PORT
host port to listen on for requests (default: 5000)
--backend {local-files,google-cloud}
backend to use to source alerts (default: local-files)
--local-file-root LOCAL_FILE_ROOT
when using the local-files backend, the root directory
where alerts should be found (default: None)
--gcp-project GCP_PROJECT
when using the google-cloud backend, the name of the
GCP project (default: None)
--gcp-bucket GCP_BUCKET
when using the google-cloud backend, the name of the
Google Cloud Storage bucket (default: None)
```

## Running tests ##

The only test is an integration test against the Interim Data Facility on Google
Cloud.

You'll need an activated Google Cloud SDK to use it (like with `gcloud auth
application-default login`).

Then, specify a GCP project to run against via an `$ALERTDB_TEST_GCP_PROJECT`
environment variable, and run the tests:

```
% export ALERTDB_TEST_GCP_PROJECT=alert-stream
% pytest .
============================= test session starts ==============================
platform linux -- Python 3.8.10, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
rootdir: /home/swnelson/code/rubin/alert_database_server
collected 4 items
tests/test_integration.py .... [100%]
============================== 4 passed in 10.88s ==============================
```

The test needs permissions to create buckets and blobs in your project.
Empty file added alertdb/__init__.py
Empty file.
55 changes: 55 additions & 0 deletions alertdb/bin/alertdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import uvicorn
import argparse

from alertdb.server import create_server
from alertdb.storage import FileBackend, GoogleObjectStorageBackend


def main():
parser = argparse.ArgumentParser(
"alertdb", formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="Run an alert database HTTP server."
)
parser.add_argument(
"--listen-host", type=str, default="127.0.0.1",
help="host address to listen on for requests",
)
parser.add_argument(
"--listen-port", type=int, default=5000,
help="host port to listen on for requests",
)
parser.add_argument(
"--backend", type=str, choices=("local-files", "google-cloud"), default="local-files",
help="backend to use to source alerts",
)
parser.add_argument(
"--local-file-root", type=str, default=None,
help="when using the local-files backend, the root directory where alerts should be found",
)
parser.add_argument(
"--gcp-project", type=str, default=None,
help="when using the google-cloud backend, the name of the GCP project",
)
parser.add_argument(
"--gcp-bucket", type=str, default=None,
help="when using the google-cloud backend, the name of the Google Cloud Storage bucket",
)
args = parser.parse_args()

# Configure the right backend
if args.backend == "local-files":
if args.local_file_root is None:
parser.error("--backend=local-files requires --local-file-root be set")
backend = FileBackend(args.local_file_root)
elif args.backend == "google-cloud":
if args.gcp_project is None:
parser.error("--backend=google-cloud requires --gcp-project be set")
if args.gcp_bucket is None:
parser.error("--backend=google-cloud requires --gcp-bucket be set")
backend = GoogleObjectStorageBackend(args.gcp_project, args.gcp_bucket)
else:
# Shouldn't be possible if argparse is using the choices parameter as expected...
raise AssertionError("only valid --backend choices are local-files and google-cloud")

server = create_server(backend)
uvicorn.run(server, host=args.listen_host, port=args.listen_port, log_level="info")
64 changes: 64 additions & 0 deletions alertdb/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""HTTP frontend server implementation."""

from fastapi import FastAPI, HTTPException, Response

from alertdb.storage import AlertDatabaseBackend, NotFoundError


# This Content-Type is described as the "preferred content type" for a Confluent
# Schema Registry here:
# https://docs.confluent.io/platform/current/schema-registry/develop/api.html#content-types
# We're not running a Confluent Schema Registry, and don't conform to the API of
# one, but we do serve schemas, so this seems possibly appropriate.
SCHEMA_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"

# There's no consensus on an Avro content type. application/avro+binary is
# sometimes used, but not very standard. If we did that, we'd want to specify
# the content-encoding as well, since contents are gzipped.
#
# application/octet-stream, which represents arbitrary bytes, is maybe overly
# general, but it's at least well-understood.
ALERT_CONTENT_TYPE = "application/octet-stream"


def create_server(backend: AlertDatabaseBackend) -> FastAPI:
"""
Creates a new instance of an HTTP handler which fetches alerts and schemas
from a backend.
Parameters
----------
backend : AlertDatabaseBackend
The backend that stores alerts to be served.
Returns
-------
FastAPI : A FastAPI application which routes HTTP requests to return schemas.
"""

# FastAPI documentation suggests that the application be a global singleton,
# with handlers defined as top-level functions, but this doesn't seem to
# permit any way of passing in a persistent backend. So, this little
# create_server closure exists to allow dependency injection.

app = FastAPI()

@app.get("/v1/schemas/{schema_id}")
def get_schema(schema_id: str):
try:
schema_bytes = backend.get_schema(schema_id)
except NotFoundError as nfe:
raise HTTPException(status_code=404, detail="schema not found") from nfe

return Response(content=schema_bytes, media_type=SCHEMA_CONTENT_TYPE)

@app.get("/v1/alerts/{alert_id}")
def get_alert(alert_id: str):
try:
alert_bytes = backend.get_alert(alert_id)
except NotFoundError as nfe:
raise HTTPException(status_code=404, detail="alert not found") from nfe

return Response(content=alert_bytes, media_type=ALERT_CONTENT_TYPE)

return app
152 changes: 152 additions & 0 deletions alertdb/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""
Implementations of backend storage systems for the alert database server.
"""

import abc
import os.path

import google.api_core.exceptions
import google.cloud.storage as gcs


class AlertDatabaseBackend(abc.ABC):
"""
An abstract interface representing a storage backend for alerts and schemas.
"""

@abc.abstractmethod
def get_alert(self, alert_id: str) -> bytes:
"""
Retrieve a single alert's payload, in compressed Confluent Wire Format.
Confluent Wire Format is described here:
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
To summarize, it is a 5-byte header, followed by binary-encoded Avro data.
The first header byte is magic byte, with a value of 0.
The next 4 bytes are a 4-byte schema ID, which is an unsigned 32-bit
integer in big-endian order.
Parameters
----------
alert_id : str
The ID of the alert to be retrieved.
Returns
-------
bytes
The alert contents in compressed Confluent Wire Format: serialized
with Avro's binary encoding, prefixed with a magic byte and the
schema ID, and then compressed with gzip.
Raises
------
NotFoundError
If no alert can be found with that ID.
Examples
--------
>>> import gzip
>>> import struct
>>> import io
>>> raw_response = backend.get_alert("alert-id")
>>> wire_format_payload = io.BytesIO(gzip.decompress(raw_response))
>>> magic_byte = wire_format_payload.read(1)
>>> schema_id = struct.unpack(">I", wire_format_payload.read(4))
>>> alert_contents = wire_format_payload.read()
"""
raise NotImplementedError()

@abc.abstractmethod
def get_schema(self, schema_id: str) -> bytes:
"""Retrieve a single alert schema JSON document in its JSON-serialized form.
Parameters
----------
schema_id : str
The ID of the schema to be retrieved.
Returns
-------
bytes
The schema document, encoded with JSON.
Raises
------
NotFoundError
If no schema can be found with that ID.
Examples
--------
>>> import gzip
>>> import struct
>>> import io
>>> import json
>>> import fastavro
>>>
>>> # Get an alert from the backend, and extract its schema ID
>>> alert_payload = backend.get_alert("alert-id")
>>> wire_format_payload = io.BytesIO(gzip.decompress(alert_payload))
>>> magic_byte = wire_format_payload.read(1)
>>> schema_id = struct.unpack(">I", wire_format_payload.read(4))
>>>
>>> # Download and use the schema
>>> schema_bytes = backend.get_schema(schema_id)
>>> schema = fastavro.parse(json.loads(schema_bytes))
"""
raise NotImplementedError()


class FileBackend(AlertDatabaseBackend):
"""
Retrieves alerts and schemas from a directory on disk.
This is provided as an example, to ensure that it's clear how to implement
an AlertDatabaseBackend subclass.
"""
def __init__(self, root_dir: str):
self.root_dir = root_dir

def get_alert(self, alert_id: str) -> bytes:
try:
with open(os.path.join(self.root_dir, "alerts", alert_id), "rb") as f:
return f.read()
except FileNotFoundError as file_not_found:
raise NotFoundError("alert not found") from file_not_found

def get_schema(self, schema_id: str) -> bytes:
try:
with open(os.path.join(self.root_dir, "schemas", schema_id), "rb") as f:
return f.read()
except FileNotFoundError as file_not_found:
raise NotFoundError("schema not found") from file_not_found


class GoogleObjectStorageBackend(AlertDatabaseBackend):
"""
Retrieves alerts and schemas from a Google Cloud Storage bucket.
The path for alert and schema objects follows the scheme in DMTN-183.
"""
def __init__(self, gcp_project: str, bucket_name: str):
self.object_store_client = gcs.Client(project=gcp_project)
self.bucket = self.object_store_client.bucket(bucket_name)

def get_alert(self, alert_id: str) -> bytes:
try:
blob = self.bucket.blob(f"/alert_archive/v1/alerts/{alert_id}.avro.gz")
return blob.download_as_bytes()
except google.api_core.exceptions.NotFound as not_found:
raise NotFoundError("alert not found") from not_found

def get_schema(self, schema_id: str) -> bytes:
try:
blob = self.bucket.blob(f"/alert_archive/v1/schemas/{schema_id}.json")
return blob.download_as_bytes()
except google.api_core.exceptions.NotFound as not_found:
raise NotFoundError("alert not found") from not_found


class NotFoundError(Exception):
"""Error which represents a failure to find an alert or schema in a backend."""
30 changes: 30 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[metadata]
name = lsst-alert-database-server
version = 0.1.0
description = A server for the Rubin Observatory alert database
url = https://github.com/lsst-dm/alert_database_server
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Development Status :: 3 - Alpha
author = Spencer Nelson
author_email = [email protected]
license = GPLv3

[options]
python_requires = >= 3.8
install_requires =
fastapi
uvicorn
google-cloud-storage
requests

tests_require =
pytest

packages =
alertdb

[options.entry_points]
console_scripts =
alertdb = alertdb.bin.alertdb:main
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from setuptools import setup

setup()
Loading

0 comments on commit e780649

Please sign in to comment.