Skip to content

Commit

Permalink
Replace Kafka SSL with SASL.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed Mar 7, 2024
1 parent f40de7f commit b8e2646
Showing 1 changed file with 32 additions and 15 deletions.
47 changes: 32 additions & 15 deletions python/lsst/consdb/hinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import random
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Sequence

Expand All @@ -10,7 +11,6 @@
import httpx
import kafkit.registry
import kafkit.registry.httpx
import kafkit.ssl
import yaml
from astro_metadata_translator import ObservationInfo
from lsst.resources import ResourcePath
Expand Down Expand Up @@ -230,6 +230,28 @@ def process_date(day_obs: str) -> None:
# Initialization #
##################


@dataclass
class KafkaConfig:
"""Class for configuring Kafka-related items."""

bootstrap: str
group_id: str
username: str
password: str
schema_url: str


def get_kafka_config() -> KafkaConfig:
return KafkaConfig(
bootstrap=os.environ["KAFKA_BOOTSTRAP"],
group_id=os.environ.get("KAFKA_GROUP_ID", "consdb-consumer"),
username=os.environ["KAFKA_USERNAME"],
password=os.environ["KAFKA_PASSWORD"],
schema_url=os.environ["SCHEMA_URL"],
)


instrument = os.environ.get("INSTRUMENT", "LATISS")
match instrument:
case "LATISS":
Expand Down Expand Up @@ -275,9 +297,6 @@ def process_date(day_obs: str) -> None:
if bucket_prefix:
os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1"

kafka_bootstrap = os.environ["KAFKA_BOOTSTRAP"]
schema_url = os.environ["SCHEMA_URL"]
kafka_group_id = "1"

topic = f"lsst.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable"

Expand All @@ -289,26 +308,24 @@ def process_date(day_obs: str) -> None:

async def main() -> None:
"""Handle Header Service largeFileObjectAvailable messages."""
global bucket_prefix, kafka_bootstrap, kafka_group_id, schema_url, topic
global bucket_prefix

kafka_config = get_kafka_config()
async with httpx.AsyncClient() as client:
schema_registry = kafkit.registry.httpx.RegistryApi(
http_client=client, url=schema_url
http_client=client, url=kafka_config.schema_url
)
deserializer = kafkit.registry.Deserializer(registry=schema_registry)

ssl_context = kafkit.ssl.create_ssl_context(
cluster_ca_path=broker_ca_path,
client_cert_path=client_cert_path,
client_key_path=client_key_path,
)
consumer = aiokafka.AIOKafkaConsumer(
topic,
bootstrap_servers=kafka_bootstrap,
ssl_context=ssl_context,
security_protocol="SSL",
group_id=kafka_group_id,
bootstrap_servers=kafka_config.bootstrap,
group_id=kafka_config.group_id,
auto_offset_reset="earliest",
isolation_level="read_committed",
security_protocol="SASL_PLAINTEXT",
sasl_plain_username=kafka_config.username,
sasl_plain_password=kafka_config.password,
)

await consumer.start()
Expand Down

0 comments on commit b8e2646

Please sign in to comment.