From 58b2824cb35dd6068de84d6c21cb3021ac14d503 Mon Sep 17 00:00:00 2001 From: Kai Koehler Date: Tue, 4 Jun 2024 15:25:18 -0700 Subject: [PATCH] Fix small issues --- src/sasquatchbackpack/cli.py | 9 ++--- src/sasquatchbackpack/sasquatch.py | 48 +++++++++++++------------ src/sasquatchbackpack/schemas/usgs.avsc | 2 +- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/src/sasquatchbackpack/cli.py b/src/sasquatchbackpack/cli.py index d27678d..a8ec55d 100644 --- a/src/sasquatchbackpack/cli.py +++ b/src/sasquatchbackpack/cli.py @@ -141,7 +141,7 @@ def main() -> None: @click.option( "-c", "--coords", - help="latitude and longitude of the central coordnates " + help="latitude and longitude of the central coordinates " "(latitude, longitude). Defaults to the coordinates of Cerro Pachon.", default=DEFAULT_COORDS, type=(float, float), @@ -195,9 +195,10 @@ def usgs_earthquake_data( ) source = sources.USGSSource(config) - poster = sasquatch.BackpackDispatcher(source, sasquatch.DispatcherConfig()) - - click.echo(poster.post()) + backpack_dispatcher = sasquatch.BackpackDispatcher( + source, sasquatch.DispatcherConfig() + ) + click.echo(backpack_dispatcher.post()) if __name__ == "__main__": diff --git a/src/sasquatchbackpack/sasquatch.py b/src/sasquatchbackpack/sasquatch.py index 741d95e..39d293d 100644 --- a/src/sasquatchbackpack/sasquatch.py +++ b/src/sasquatchbackpack/sasquatch.py @@ -1,7 +1,8 @@ """Handles dispatch of backpack data to kafka.""" -import json +import os from dataclasses import dataclass +from string import Template import requests @@ -15,11 +16,26 @@ class DispatcherConfig: """Class containing relevant configuration information for the BackpackDispatcher. + + Values + ------ + sasquatch_rest_proxy_url + environment variable contatining the target for data + partitions_count + number of partitions to create + replication_factor + number of replicas to create + namespace + environment varible containing the target namespace """ - sasquatch_rest_proxy_url = ( - "https://data-int.lsst.cloud/sasquatch-rest-proxy" + sasquatch_rest_proxy_url = os.getenv( + "SASQUATCH_REST_PROXY_URL", + "https://data-int.lsst.cloud/sasquatch-rest-proxy", ) + partitions_count = 1 + replication_factor = 3 + namespace = os.getenv("BACKPACK_NAMESPACE", "lsst.example") class BackpackDispatcher: @@ -40,20 +56,11 @@ def __init__( ) -> None: self.source = source self.config = config - self.schema = source.load_schema() - self.namespace = self.get_namespace() - - def get_namespace(self) -> str: - """Sorts the schema and returns the namespace value. - - Returns - ------- - namespace - provided namespace from the schema file - """ - json_schema = json.loads(self.schema) - - return json_schema["namespace"] + self.schema = Template(source.load_schema()).substitute( + { + "namespace": self.config.namespace, + } + ) def create_topic(self) -> str: """Create kafka topic based off data from provided source. @@ -73,13 +80,10 @@ def create_topic(self) -> str: cluster_id = r.json()["data"][0]["cluster_id"] - # The topic is created with one partition and a replication - # factor of 3 by default, this configuration is fixed for the - # Sasquatch Kafka cluster. topic_config = { "topic_name": f"{self.namespace}." + f"{self.source.topic_name}", - "partitions_count": 1, - "replication_factor": 3, + "partitions_count": self.config.partitions_count, + "replication_factor": self.config.replication_factor, } headers = {"content-type": "application/json"} diff --git a/src/sasquatchbackpack/schemas/usgs.avsc b/src/sasquatchbackpack/schemas/usgs.avsc index 6ec0ec0..4c14773 100644 --- a/src/sasquatchbackpack/schemas/usgs.avsc +++ b/src/sasquatchbackpack/schemas/usgs.avsc @@ -1 +1 @@ -{"namespace": "lsst.example", "type": "record", "name": "$topic_name", "description": "Collection of earthquakes near the summit", "fields": [{"name": "timestamp", "type": "long"}, {"name": "id", "type": "str", "description": "unique earthquake id"}, {"name": "latitude", "type": "float", "units": "Degrees"}, {"name": "longitude", "type": "float", "units": "Degrees"}, {"name": "depth", "type": "float", "units": "Km"}, {"name": "magnitude", "type": "float", "units": "Richter Magnitudes"}]} \ No newline at end of file +{"namespace": "$namespace", "type": "record", "name": "$topic_name", "description": "Collection of earthquakes near the summit", "fields": [{"name": "timestamp", "type": "long"}, {"name": "id", "type": "str", "description": "unique earthquake id"}, {"name": "latitude", "type": "float", "units": "Degrees"}, {"name": "longitude", "type": "float", "units": "Degrees"}, {"name": "depth", "type": "float", "units": "Km"}, {"name": "magnitude", "type": "float", "units": "Richter Magnitudes"}]}