This is the Kafka ingester for the Rubin Observatory's alert database. The database is described more thoroughly in DMTN-183.
The ingester is expected to run continuously. It does these things:
- Reads alert data from Kafka off the live feed.
- Gzips the raw Kafka messages and copies them into a Google Cloud Storage bucket, indexed by alert ID.
- Ensures that the schemas used in all messages are copied into Google Cloud Storage from Confluent Schema Registry.
Clone, and install with pip (probably in a virtualenv):
git clone [email protected]:lsst-dm/alert_database_ingester.git
cd alert_database_ingester
python -m venv virtualenv
source virtualenv/bin/activate
pip install .
The ingester is installed by pip
as a command named alertdb-ingester
:
usage: alertdb-ingester [-h] [--endpoint-url ENDPOINT_URL]
[--bucket-alerts BUCKET_ALERTS] [--bucket-schemas BUCKET_SCHEMAS]
[--kafka-host KAFKA_HOST] [--kafka-topic KAFKA_TOPIC]
[--kafka-group KAFKA_GROUP]
[--kafka-auth-mechanism {mtls,scram}]
[--kafka-username KAFKA_USERNAME]
[--tls-client-key-location TLS_CLIENT_KEY_LOCATION]
[--tls-client-crt-location TLS_CLIENT_CRT_LOCATION]
[--tls-server-ca-crt-location TLS_SERVER_CA_CRT_LOCATION]
[--schema-registry-host SCHEMA_REGISTRY_HOST]
Run a worker to copy alerts from Kafka into an object store backend.
optional arguments:
-h, --help show this help message and exit
--endpoint-url ENDPOINT_URL
when using a remote bucket, the url where the bucket is
located.
--kafka-host KAFKA_HOST
kafka host with alert data (default: usdf-alert-stream-dev.lsst.cloud:9094)
--kafka-topic KAFKA_TOPIC
name of the Kafka topic with alert data (default: alerts-simulated)
--kafka-group KAFKA_GROUP
Name of a Kafka Consumer group to run under (default:
alertdb-ingester)
--kafka-auth-mechanism {mtls,scram}
Kafka authentication mechanism to use (default: scram)
--kafka-username KAFKA_USERNAME
Username to use when connecting to Kafka. Only used if
--kafka-auth-mechanism=ssl (default: admin)
--tls-client-key-location TLS_CLIENT_KEY_LOCATION
Path to a client PEM key used for mTLS authentication. Only
used if --kafka-auth-mechanism=scram. (default: )
--tls-client-crt-location TLS_CLIENT_CRT_LOCATION
Path to a client public cert used for mTLS authentication.
Only used if --kafka-auth-mechanism=scram. (default: )
--tls-server-ca-crt-location TLS_SERVER_CA_CRT_LOCATION
Path to a CA public cert used to verify the server's TLS
cert. Only used if --kafka-auth-mechanism=scram. (default: )
--schema-registry-address SCHEMA_REGISTRY_ADDRESS
Address of a Confluent Schema Registry server hosting
schemas (default: https://usdf-alert-schemas-dev.slac.stanford.edu)
The ingester needs a Kafka password. It gets this from you via an environment variable, ALERTDB_KAFKA_PASSWORD
.
So, an example invocation:
export ALERTDB_KAFKA_PASSWORD=...
alertdb-ingester
Install the dev dependencies and in editable mode with pip install --editable '.[dev]'
.
Then, install precommit hooks with
pre-commit install
Linters and mypy checks should run automatically when you go to commit. To run them on-demand, you can use:
pre-commit run
That will only run on the files that you changed; to run on all files, use
pre-commit run --all-files
Run tests with pytest
. You can just do pytest .
in the repo root.
Integration tests require some credentials. If you don't have them set, the integration tests will be skipped. You need these - you must fill in the actual usernames and passwords, of course:
export ALERT_INGEST_TEST_KAFKA_URL=kafka://username:[email protected]
export ALERT_INGEST_TEST_REGISTRY_URL=https://username:[email protected]
export ALERT_INGEST_TEST_GCP_PROJECT=alert-stream
Then, pytest .
will run the integration tests, which create temporary Kafka
and Schema Registry resources and run against them. You must have a test bucket created to run tests using
minio.
If running in a new test environment, you will need to install
pip install lsst-alert-packet pip install --upgrade kafka-python pip install aiokafka pip install pytest
And you will need to run the tests using env pytest TEST_NAME