-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_controller.py
87 lines (65 loc) · 2.23 KB
/
kafka_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from __future__ import unicode_literals
import NewSegmentationIsReadyEvent_pb2 as msg
from kafka import KafkaConsumer, KafkaProducer
topic = 'eventTopic'
log = []
def connect_kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
except Exception:
log.append('producer connection failed')
pass
finally:
log.append('producer connection success')
return _producer
def publish_message(producer_instance, topic_name, data):
try:
#key_bytes = bytes(str(key), encoding='utf-8')
#value_bytes = bytes(str(value), encoding='utf-8')
#print(data)
producer_instance.send(topic_name, data.SerializeToString())#key=key_bytes, value=value_bytes)
producer_instance.flush()
log.append('Message successfully sent')
except Exception as e:
log.append(str(e))
def get_message(consumer):
links = []
for message in consumer:
value = message.value
link = msg.NewSegmentationIsReadyEvent()
link.ParseFromString(value)
log.append('Message successfully received')
#log.append(str(link))
links.append(link)
return links
def kafka_try_send():
producer = connect_kafka_producer()
consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
segm = msg.NewSegmentationIsReadyEvent()
segm.segmLink = "http://nowhere"
segm.maskLink = "http://non-existent"
segm.wineId = "1"
#serialized = segm.SerializeToString()
log.append(str(segm))
publish_message(producer, topic, segm) #'msg', str(serialized))
links = get_message(consumer)
log.append(str(links[0]))
d = {}
for i in range(len(log)):
d.update({'line ' + str(i): log[i]})
return d
print(kafka_try_send())
# Test that the protobuf serialize and de-serialize methods are working fine
'''
def serialize_deserialize_test()
segm = msg.Segmentation()
segm2 = msg.Segmentation()
segm.segm_link = "http://nowhere"
segm.mask_link = "http://non-existent"
segm.id = 1
serialized = segm.SerializeToString()
print(serialized)
segm2.ParseFromString(serialized)
print(segm2)
'''