Skip to content

Commit

Permalink
Support the target topic being specified when write() is called, allo…
Browse files Browse the repository at this point in the history
…wing a single Producer object to be used to write to multiple topics.
  • Loading branch information
cnweaver committed May 24, 2024
1 parent 7e14395 commit ee021b4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 7 deletions.
18 changes: 13 additions & 5 deletions adc/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,23 @@ def __init__(self, conf: 'ProducerConfig') -> None:
def write(self,
msg: Union[bytes, 'Serializable'],
headers: Optional[Union[dict, list]] = None,
delivery_callback: Optional[DeliveryCallback] = log_delivery_errors) -> None:
delivery_callback: Optional[DeliveryCallback] = log_delivery_errors,
topic: Optional[str] = None) -> None:
if isinstance(msg, Serializable):
msg = msg.serialize()
self.logger.debug("writing message to %s", self.conf.topic)
if topic is None:
if self.conf.topic is not None:
topic = self.conf.topic
else:
raise Exception("No topic specified for write: "
"Either configure a topic when consturcting the Producer, "
"or specify the topic argument to write()")
self.logger.debug("writing message to %s", topic)
if delivery_callback is not None:
self._producer.produce(self.conf.topic, msg, headers=headers,
self._producer.produce(topic, msg, headers=headers,
on_delivery=delivery_callback)
else:
self._producer.produce(self.conf.topic, msg, headers=headers)
self._producer.produce(topic, msg, headers=headers)

def flush(self, timeout: timedelta = timedelta(seconds=10)) -> int:
"""Attempt to flush enqueued messages. Return the number of messages still
Expand Down Expand Up @@ -78,7 +86,7 @@ def __exit__(self, type, value, traceback) -> bool:
@dataclasses.dataclass
class ProducerConfig:
broker_urls: List[str]
topic: str
topic: Optional[str]
auth: Optional[SASLAuth] = None
error_callback: Optional[ErrorCallback] = log_client_errors

Expand Down
51 changes: 49 additions & 2 deletions tests/test_kafka_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,43 @@ def test_contextmanager_support(self):
self.assertEqual(messages[1].value(), b"message 2")
self.assertEqual(messages[2].value(), b"message 3")

def test_multi_topic_handling(self):
"""Use a single producer object to write messages to multiple topics,
and check that a consumer can receive them all.
"""
topics = ["test_multi_1", "test_multi_2"]

# Push some messages in
producer = adc.producer.Producer(adc.producer.ProducerConfig(
broker_urls=[self.kafka.address],
topic=None,
auth=self.kafka.auth,
))
for i in range(0,8):
producer.write(str(i), topic=topics[i%2])
producer.flush()
logger.info("messages sent")

# check that we receive the messages from the right topics
consumer = adc.consumer.Consumer(adc.consumer.ConsumerConfig(
broker_urls=[self.kafka.address],
group_id="test_consumer",
auth=self.kafka.auth,
))
consumer.subscribe(topics)
stream = consumer.stream()
total_messages = 0;
for msg in stream:
if msg.error() is not None:
raise Exception(msg.error())
idx = int(msg.value())
self.assertEqual(msg.topic(), topics[idx%2])
total_messages += 1
if total_messages == 8:
break
self.assertEqual(total_messages, 8)


class KafkaDockerConnection:
"""Holds connection information for communicating with a Kafka broker running
Expand Down Expand Up @@ -437,6 +474,8 @@ def query_kafka_broker_address(self):
if not addrs:
return None
ip = addrs[0]['HostIp']
if len(ip) == 0:
ip = "localhost"
port = addrs[0]['HostPort']
return f"{ip}:{port}"

Expand Down Expand Up @@ -502,8 +541,16 @@ def get_or_create_container(self):
detach=True,
auto_remove=True,
network=self.net.name,
# Setting None below the OS pick an ephemeral port.
ports={"9092/tcp": None},
# Kafka insists on redirecting consumers to one of its advertised listeners,
# which it will get wrong if it is running in a private container network.
# To fix this, we need to tell it what to advertise, which means we must
# know what port will be visible from the host system, and we cannot use an
# ephemeral port, which would be known to us only after the container is
# started. Since we have to pick something, pick 9092, which means that
# these tests cannot run if there is already an instance of Kafka running on
# the same host.
ports={"9092/tcp": 9092},
command=["/root/runServer","--advertisedListener","SASL_SSL://localhost:9092"],
)

def get_or_create_docker_network(self):
Expand Down

0 comments on commit ee021b4

Please sign in to comment.