From 8fcdb0547a684c3ea5424eb4ace73d55e38ee256 Mon Sep 17 00:00:00 2001 From: Henry Date: Thu, 10 Oct 2024 20:34:09 -0700 Subject: [PATCH] adding example of writting to kafka --- .../apache_beam/examples/write_kafka.py | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 sdks/python/apache_beam/examples/write_kafka.py diff --git a/sdks/python/apache_beam/examples/write_kafka.py b/sdks/python/apache_beam/examples/write_kafka.py new file mode 100644 index 0000000000000..642458c6430b6 --- /dev/null +++ b/sdks/python/apache_beam/examples/write_kafka.py @@ -0,0 +1,46 @@ +import argparse +from datetime import datetime, date +import logging +import random +import json +import typing + +from typing import Tuple, List, Any, Iterable + + +import apache_beam as beam +from apache_beam.io.kafka import WriteToKafka +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam import Pipeline +import apache_beam.typehints.schemas +from apache_beam.typehints.schemas import Any as BeamAny + +DATA = [ + ('user1', 1), + ('user2', 2), + ] + +def prepare_for_kafka(element:Tuple['str', 'int']) : + key = 'test-key' + return (key.encode('utf8'), json.dumps(element).encode('utf8')) + +def run(): + """ + main entry point to run Apache Beam Job + + """ + + with Pipeline() as pipeline: + lines = (pipeline | 'Create data' >> beam.Create(DATA) + | "Prepare " >> beam.Map(prepare_for_kafka).with_output_types(typing.Tuple[bytes, bytes]) + | "Write to Kafka" >> WriteToKafka( + producer_config={'bootstrap.servers': "localhost:9092"}, + topic='test-out', + ) + ) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.ERROR) + run()