-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconsumer_confluent.py
33 lines (28 loc) · 940 Bytes
/
consumer_confluent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from confluent_kafka import Consumer
from json import loads
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'broker.version.fallback': '0.10.1',
'api.version.fallback.ms': 0,
'security.protocol': 'PLAINTEXT',
'group.id': 'dev',
'queue.buffering.max.ms': 10,
'batch.num.messages': 5,
'auto.offset.reset': 'earliest',
})
consumer.subscribe(['numtest'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(msg.error())
else:
# Proper message
print(loads(msg.value()))
except KeyboardInterrupt:
print('Aborted by user')
finally:
# Close down consumer to commit final offsets.
consumer.close()