Skip to content

Commit

Permalink
KafkaItemExporter env parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
WonderBeat committed Aug 6, 2024
1 parent 5275402 commit 0518e2d
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import collections
import json
import logging
import os
from typing import Any

from kafka import KafkaProducer

Expand All @@ -14,7 +16,21 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()):
self.converter = CompositeItemConverter(converters)
self.connection_url = self.get_connection_url(output)
print(self.connection_url)
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)
kafka_params = self.read_kafka_env_vars()
kafka_params["bootstrap_servers"] = self.connection_url
self.producer = KafkaProducer(**kafka_params)

def read_kafka_env_vars(self) -> dict[str, Any]:
"""Reads environment variables starting with "kafka_" and returns a dictionary."""
kafka_env_vars = {}
for key, value in os.environ.items():
try:
value = int(value)
except ValueError:
pass # value is not an int
if key.startswith("KAFKA_"):
kafka_env_vars[key[len("KAFKA_"):].lower()] = value
return kafka_env_vars

def get_connection_url(self, output):
try:
Expand Down

0 comments on commit 0518e2d

Please sign in to comment.