Skip to content

Commit

Permalink
KafkaItemExporter env parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
WonderBeat committed Aug 6, 2024
1 parent 5275402 commit 6be9a2f
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
import collections
import json
import logging
import os
from typing import Any

from kafka import KafkaProducer

from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter


class KafkaItemExporter:

def __init__(self, output, item_type_to_topic_mapping, converters=()):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.converter = CompositeItemConverter(converters)
self.connection_url = self.get_connection_url(output)
print(self.connection_url)
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)
kafka_params = self.read_kafka_env_vars()
kafka_params["bootstrap_servers"] = self.connection_url
self.producer = KafkaProducer(**kafka_params)

def read_kafka_env_vars(self):
"""Reads environment variables starting with "kafka_" and returns a dictionary."""
kafka_env_vars = {}
for key, value in os.environ.items():
try:
value = int(value)
except ValueError:
pass # value is not an int
if key.startswith("KAFKA_"):
kafka_env_vars[key[len("KAFKA_"):].lower()] = value
return kafka_env_vars

def get_connection_url(self, output):
try:
Expand Down

0 comments on commit 6be9a2f

Please sign in to comment.