diff --git a/example.py b/example.py index dac97b751..9907450f6 100755 --- a/example.py +++ b/example.py @@ -1,15 +1,15 @@ #!/usr/bin/env python -import threading, logging, time -import multiprocessing +import threading, time -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka.admin import NewTopic class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.stop_event = threading.Event() - + def stop(self): self.stop_event.set() @@ -23,14 +23,15 @@ def run(self): producer.close() -class Consumer(multiprocessing.Process): + +class Consumer(threading.Thread): def __init__(self): - multiprocessing.Process.__init__(self) - self.stop_event = multiprocessing.Event() - + threading.Thread.__init__(self) + self.stop_event = threading.Event() + def stop(self): self.stop_event.set() - + def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', @@ -44,29 +45,38 @@ def run(self): break consumer.close() - - + + def main(): + # Create 'my-topic' Kafka topic + try: + admin = KafkaAdminClient(bootstrap_servers='localhost:9092') + + topic = NewTopic(name='my-topic', + num_partitions=1, + replication_factor=1) + admin.create_topics([topic]) + except Exception: + pass + tasks = [ Producer(), Consumer() ] + # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic for t in tasks: t.start() time.sleep(10) - + + # Stop threads for task in tasks: task.stop() for task in tasks: task.join() - - + + if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) main()