-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprod-new.py
58 lines (42 loc) · 1.39 KB
/
prod-new.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import faker
from datetime import datetime
import logging
import time
logging.basicConfig(level=logging.INFO)
class Producer:
def __init__(self):
self._init_kafka_producer()
def _init_kafka_producer(self):
self.kafka_host = "kafka-local.kafkaplaypen.svc.cluster.local:9092"
self.kafka_topic = "my-topic"
self.producer = KafkaProducer(
bootstrap_servers=self.kafka_host, value_serializer=lambda v: json.dumps(v).encode(),
)
def publish_to_kafka(self, message):
try:
self.producer.send(self.kafka_topic, message)
self.producer.flush()
except KafkaError as ex:
logging.error(f"Exception {ex}")
else:
logging.info(f"Published message {message} into topic {self.kafka_topic}")
@staticmethod
def create_random_email():
f = faker.Faker()
new_contact = dict(
username=f.user_name(),
first_name=f.first_name(),
last_name=f.last_name(),
email=f.email(),
date_created=str(datetime.utcnow()),
)
return new_contact
if __name__ == "__main__":
producer = Producer()
while True:
random_email = producer.create_random_email()
producer.publish_to_kafka(random_email)
time.sleep(5)