-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.py
executable file
·137 lines (99 loc) · 4.07 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
import os
import sys
import json
import signal
import logging
import optparse
from helpers import amqp_connect
# --- global variables ----------------------------------------------------------------
message_count = 0
amqp_conn = None
amqp_channel = None
instances = {}
prog_base = os.path.split(sys.argv[0])[1] # Name of this program
logger = logging.getLogger("my_logger")
# --- functions ----------------------------------------------------------------------
def setup_logger(debug_flag):
# log to the console
console = logging.StreamHandler()
# default log level - make logger/console match
logger.setLevel(logging.INFO)
console.setLevel(logging.INFO)
# debug - from command line
if debug_flag:
logger.setLevel(logging.DEBUG)
console.setLevel(logging.DEBUG)
# formatter
formatter = logging.Formatter("%(asctime)s %(levelname)7s: %(message)s")
console.setFormatter(formatter)
logger.addHandler(console)
logger.debug("Logger has been configured")
def prog_sigint_handler(signum, frame):
logger.warn("Exiting due to signal %d" % (signum))
for i in instances:
if instances[i]["counter"] > 0:
write_stats_to_file(i)
if not amqp_conn is None:
amqp_conn.close()
def write_stats_to_file(container_id):
with open("logs/"+container_id[:12]+'.log', "a+") as g:
for m in instances[container_id]["measurements"]:
output = "%d %f %f %d %d %d %d\n" % (m["end_timestamp"], m["cpu_usage"], m["memory_usage_percent"], m["blkio"]["bytes_read"], m["blkio"]["bytes_write"], m["network"]["rx_bytes"],m["network"]["tx_bytes"])
g.write(output)
instances[container_id]["counter"] = 0
instances[container_id]["measurements"] = []
def on_message(method_frame, header_frame, body):
parsed = json.loads(body)
container_id = parsed["container_id"]
if container_id in instances:
instances[container_id]["counter"] += 1
instances[container_id]["measurements"].append(parsed)
if instances[container_id]["counter"] >= 10:
write_stats_to_file(container_id)
else:
instances[container_id] = {}
instances[container_id]["counter"] = 1
instances[container_id]["measurements"] = [parsed]
#print json.dumps(parsed, indent=2)
#amqp_channel.basic_ack(delivery_tag=method_frame.delivery_tag)
def setup_amqp(amqp_details):
global amqp_conn
global amqp_channel
global amqp_queue
amqp_conn = amqp_connect(amqp_details)
amqp_channel = amqp_conn.channel()
#amqp_channel.basic_consume(on_message, amqp_details["queue"])
return
def main():
# Configure command line option parser
prog_usage = "usage: %s [options]" % (prog_base)
parser = optparse.OptionParser(usage=prog_usage)
parser.add_option("-f", "--file", action = "store", dest = "file", help = "Config file")
parser.add_option("-d", "--debug", action = "store_true", dest = "debug", help = "Enables debugging output")
# Parse command line options
(options, args) = parser.parse_args()
setup_logger(options.debug)
if not options.file:
logger.critical("An input file has to be given with --file")
sys.exit(1)
# Die nicely when asked to (Ctrl+C, system shutdown)
signal.signal(signal.SIGINT, prog_sigint_handler)
signal.signal(signal.SIGTERM, prog_sigint_handler)
with open(options.file, "r") as f:
config = json.load(f)
setup_amqp(config["amqp_endpoint"])
try:
for measurement in amqp_channel.consume(config["amqp_endpoint"]["queue"], no_ack=True, inactivity_timeout=60.0):
if measurement:
method_frame, header_frame, body = measurement
on_message(method_frame, header_frame, body)
except Exception as e:
logging.critical(e)
for i in instances:
if instances[i]["counter"] > 0:
write_stats_to_file(i)
if not amqp_conn is None:
amqp_conn.close()
if __name__ == "__main__":
main()