-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconsumer.py
140 lines (104 loc) · 3.24 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
from asyncio import Task
from datetime import datetime
from typing import Optional, Set
from aiokafka import AIOKafkaConsumer, errors, TopicPartition
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import asyncio
import json
import logging
# KAFKA CONSUMER CONFIGURATION
##############################
KAFKA_TOPIC = os.environ.get('KAFKA_TOPIC')
KAFKA_BOOTSTRAP_SERVERS = (
os.environ.get('KAFKA_BOOTSTRAP_SERVERS').split(",")
)
CONSUMER_GROUP = os.environ.get('CONSUMER_GROUP')
# GLOBAL VARIABLES
##################
consumer: Optional[AIOKafkaConsumer] = None
consumer_task: Optional[Task] = None
messages = []
user_statuses = {}
# INITIALIZING LOGGER
#####################
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s',
level=logging.INFO)
log = logging.getLogger(__name__)
# KAFKA CONSUMER FUNCTIONALITY
##############################
async def initialize():
global consumer
log.debug(f'Initializing KafkaConsumer for topic {KAFKA_TOPIC}, group_id {CONSUMER_GROUP}'
f' and using bootstrap servers {KAFKA_BOOTSTRAP_SERVERS}')
consumer = AIOKafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id=CONSUMER_GROUP
)
await consumer.start()
partitions: Set[TopicPartition] = consumer.assignment()
for tp in partitions:
await consumer.seek_to_committed(tp)
return
async def read_incoming_messages(kafka_consumer):
try:
async for msg in kafka_consumer:
log.info(f"Consumed msg: {msg}")
event = msg.value
event["consumed"] = get_pretty_time_with_milliseconds()
messages.append(event)
except errors.KafkaError:
log.exception(f'Kafka error {errors.KafkaError}')
except Exception:
log.exception(f'General error {Exception}')
finally:
log.warning('Stopping consumer')
await kafka_consumer.stop()
async def consume():
global consumer_task
consumer_task = asyncio.create_task(read_incoming_messages(consumer))
def get_pretty_time_with_milliseconds():
"""Returns the current time in a pretty format with milliseconds."""
now = datetime.now()
formatted = now.strftime("%H:%M:%S:%f")
return formatted[:-3]
# API FUNCTIONALITY
###################
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins="*",
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup_event():
log.info('Initializing API ...')
await initialize()
await consume()
@app.on_event("shutdown")
async def shutdown_event():
log.info('Shutting down API')
consumer_task.cancel()
await consumer.stop()
@app.get("/")
async def list_messages():
data = []
for m in messages:
data.append(m)
messages.clear()
return data
@app.get("/statuses")
async def list_messages():
return user_statuses
@app.get("/ping")
async def ping():
return "pong:{}".format(CONSUMER_GROUP)
# APPLICATION STARTUP
#####################
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=os.environ.get("PORT", 8000))