forked from RobWelbourn/Twilio-Tools-2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcountcps.py
executable file
·454 lines (366 loc) · 17.8 KB
/
countcps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
#!/usr/bin/env python
"""Program that takes a CSV file of CDRs and produces a list of one-second intervals
with call counts, again as a CSV file. Optionally, the program will display the
spread of CPS values.
usage: countcps.py [-h] [-s START] [-e END] [--tz TZ]
[-t {auto,header,positional}] [-c COLUMN] [--spread]
[--queue] [--version] [--log {debug,info,warning}]
cdr_file cps_file
Create a calls-per-second CSV file from a CDR file.
positional arguments:
cdr_file input CSV file containing call detail records
cps_file output CSV file containing CPS counts
optional arguments:
-h, --help show this help message and exit
-s START, --start START ignore records before this date/time
(YYYY-MM-DD [[HH:MM:SS]±HH:MM])
-e END, --end END ignore records after this date/time
(YYYY-MM-DD [[HH:MM:SS]±HH:MM])
--tz TZ timezone as ±HHMM offset from UTC (default: timezone
of local machine)
-t {auto,header,positional}, --type {auto,header,positional}
specify format of CDR file (auto: autodetect; header:
has a header row; positional: no header row)
-c COLUMN, --column COLUMN column name or number containing call start date/time
--spread display CPS spread
--queue display queue time estimates from CDRs
--version show program's version number and exit
--log {debug,info,warning} set logging level
The program will by default attempt to auto-detect the format of the CDR file. Twilio
Console, Looker and Monkey download formats are recognized. Otherwise, it looks for the
first column that is formatted as an ISO 8061 date. If the above conditions are not true,
then you should specify the name (if there is a header row) or number (if no header) of
the column that contains the date/time the call was made.
Note that the program will automatically filter out non-Outgoing API calls for Console,
Looker and Monkey CDRs; for other sources, you should make sure that the only calls
included in the CDR file are outbound calls.
"""
import sys
import argparse
from datetime import datetime, timedelta
import csv
import logging
from decimal import Decimal
__version__ = "1.0"
DEFAULT_FIELDNAMES = \
['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'] # Used to select a CDR field by position
DATETIME_FORMATS = {
'Monkey': "%a, %d %b %Y %H:%M:%S %z", # e.g. "Sat, 12 Sep 2020 10:30:05 -0700"
'Console': "%H:%M:%S %Z %Y-%m-%d", # e.g. "14:52:06 EDT 2020-09-10"
'ISO': None # e.g. "2020-09-10 14:52:06.000"
}
logger = logging.getLogger(__name__)
# Set up logging for the module.
def configure_logging(level=logging.INFO):
logger.setLevel(level)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s.%(msecs)03d: %(message)s', datefmt='%H:%M:%S')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Return parsed command line arguments.
def get_args():
col_num = None
# Check that column number or name is possible.
def column_id(str):
try:
nonlocal col_num
col_num = int(str)
if col_num < 1 or col_num > len(DEFAULT_FIELDNAMES):
raise argparse.ArgumentTypeError("Column number is out of range")
else:
return str
except ValueError:
return str
# Parse a timezone offset and return a tzinfo object.
def tzinfo(str):
try:
dt = datetime.strptime(str, '%z')
return dt.tzinfo
except ValueError:
raise argparse.ArgumentTypeError(
"Timezone offset should be a signed value in the form ±HHMM")
# Calculate timezone default.
now = datetime.now()
local_timezone = now.astimezone().tzinfo
parser = argparse.ArgumentParser(
description="Create a calls-per-second CSV file from a CDR file.",
epilog=(
"We recommend defaulting the CDR file type to 'auto', unless the start "
"date/time is not the first date/time column in the file, in which "
"case you should specify 'column', which is the name (type='header') "
"or number (type='positional') of the start date/time column. "
"Add a filename to the command line prefixed by '@' if you wish to place "
"parameters in a file, one parameter per line."),
fromfile_prefix_chars='@')
parser.add_argument(
'cdr_file', type=argparse.FileType('r'),
help="input CSV file containing call detail records")
parser.add_argument(
'cps_file', type=argparse.FileType('w'),
help="output CSV file containing CPS counts")
parser.add_argument(
'-s', '--start', type=datetime.fromisoformat,
help="ignore records before this date/time (YYYY-MM-DD [[HH:MM:SS]±HHMM])")
parser.add_argument(
'-e', '--end', type=datetime.fromisoformat,
help="ignore records after this date/time (YYYY-MM-DD [[HH:MM:SS]±HHMM])")
parser.add_argument(
'--tz', default=local_timezone, type=tzinfo,
help="timezone as ±HHMM offset from UTC (default: timezone of local machine)")
parser.add_argument(
'-t', '--type', choices=['auto', 'header', 'positional'], default='auto',
help=("specify format of CDR file (auto: autodetect; "
"header: has a header row; positional: no header row)"))
parser.add_argument(
'-c', '--column', type=column_id,
help="column name or number containing call start date/time")
parser.add_argument(
'--spread', action='store_true',
help="display CPS spread")
parser.add_argument(
'--queue', action='store_true',
help="display queue time estimates from CDRs")
parser.add_argument(
'--version', action='version', version=__version__)
parser.add_argument(
'--log', choices=['debug', 'info', 'warning'], default='info',
help="set logging level")
args = parser.parse_args()
if args.type == 'positional' and not col_num:
parser.error("Start date/time field specified by position, but no column number specified")
if args.type == 'header' and not args.column:
parser.error("Start date/time field specified by column name, but none specified")
return args
# Take a row of CSV values and find all those that are formatted as a datetime.
# We'll try all the known datetime formats in turn, until we find one that works.
# Returns a tuple containing a list of the column numbers, indexed from 1,
# the datetime format, and timezone info.
def look_for_datetime(columns):
dt_cols = []
tzinfo = None
for fmt_name, fmt_string in DATETIME_FORMATS.items():
logger.debug('Trying %s datetime format', fmt_name)
i = 1
for column in columns:
try:
if fmt_string:
dt = datetime.strptime(column, fmt_string)
else:
dt = datetime.fromisoformat(column)
dt_cols.append(i)
tzinfo = dt.tzinfo
except ValueError:
pass
i += 1
if dt_cols: break
if dt_cols:
logger.debug("Columns formatted as date/time values: %s", dt_cols)
logger.debug("Datetime format is %s", fmt_name)
logger.debug("Timezone in CDR file is %s", tzinfo)
else:
fmt_name = None
logger.debug("No datetime items found in row")
return (dt_cols, fmt_string, tzinfo)
# Look for a candidate header field, choosing the first found in the given list.
def look_for_header(columns, candidates):
for candidate in candidates:
if candidate in columns: return candidate
return None
# Structure containing header row and date/time format information.
class CDRinfo:
def __init__(self):
self.has_header = False
self.start_col_id = None
self.flags_col_id = None
self.direction_col_id = None
self.queuetime_col_id = None
self.datetime_format = None
self.tzinfo = None
# Returns a CDRinfo containing details of the name or position of Flags
# and DateCreated/StartTime columns, and the date/time format.
def detect_cdr_type(args):
# Let's initially assume the CDR file has a header, and get the field names.
cdr_info = CDRinfo()
reader = csv.DictReader(args.cdr_file)
fieldnames = reader.fieldnames
if fieldnames is None:
sys.exit("Error: CDR file is empty!")
logger.debug("Header fieldnames: %s", fieldnames)
# See whether this is a real header by determining whether any of the
# field names are actually datetimes.
dt_cols, cdr_info.datetime_format, cdr_info.tzinfo = look_for_datetime(fieldnames)
cdr_info.has_header = False if dt_cols else True
# Next, do a little more validation.
if args.type == 'positional' and cdr_info.has_header:
sys.exit("Error: CDR file has header row, but start date/time was specified by position")
if args.type == 'header' and not cdr_info.has_header:
sys.exit("Error: CDR file has no header row, but start date/time was specified by column name")
# If there's a header, get the next row to use as a sample.
if cdr_info.has_header:
try:
sample_row = next(reader).values()
logger.debug("Sample row: %s", sample_row)
except StopIteration:
sys.exit("Error: CDR file contains no call records!")
dt_cols, cdr_info.datetime_format, cdr_info.tzinfo = look_for_datetime(sample_row)
if not dt_cols:
sys.exit("Error: CDR file contains no recognizable call records!")
# If the start date/time column is positional, check against the header row.
if args.type == 'positional':
if int(args.column) in dt_cols:
cdr_info.start_col_id = args.column
logger.info("CDR file confirmed as type 'positional'")
else:
sys.exit(f"Column {args.column} does not contain date/time values")
# If the start date/time column was specified by name, check agsinst the sample row.
elif args.type == 'header':
try:
column_num = fieldnames.index(args.column) + 1 # Remember, indexed from 1
except ValueError:
sys.exit(f"No such column name '{args.column}' in header row")
if column_num in dt_cols:
cdr_info.start_col_id = args.column
logger.info("CDR file confirmed as type 'header'")
else:
sys.exit(f"Column {args.column} does not contain date/time values")
# Autodetect: look for Monkey/Looker/Console headers. If we can't find a recognized
# start date/time header, we'll pick the first column with a datetime.
elif args.type == 'auto':
if cdr_info.has_header:
# Determine whether any of the standard headers are present.
cdr_info.flags_col_id = look_for_header(fieldnames, ['Flags', 'flags'])
cdr_info.direction_col_id = look_for_header(fieldnames, ['Direction', 'direction'])
cdr_info.queuetime_col_id = look_for_header(fieldnames, ['QueueTime', 'queue_time'])
cdr_info.start_col_id = look_for_header(
fieldnames, ['DateCreated', 'date_created', 'StartTime', 'start_time'])
if cdr_info.flags_col_id:
logger.info("CDR file autodetected as likely from Monkey or Looker")
elif cdr_info.direction_col_id:
logger.info("CDR file autodetected as likely from Console or getcdrs.py")
# If there's a defined start date/time header, make sure the column is a datetime.
if cdr_info.start_col_id:
col_num = fieldnames.index(cdr_info.start_col_id) + 1 # Indexed from 1
if col_num not in dt_cols:
sys.exit(f"Column {args.column} does not contain date/time values")
# Otherwise pick the first column with a datetime.
else:
cdr_info.start_col_id = fieldnames[dt_cols[0] - 1]
logger.info("CDR file autodetected as type 'header'")
else:
# No headers, so pick the first datetime column.
cdr_info.start_col_id = str(dt_cols[0])
logger.info("CDR file autodetected as type 'positional'")
logger.debug("Start column is '%s'", cdr_info.start_col_id)
logger.debug("Flags column is '%s'", cdr_info.flags_col_id)
logger.debug("Direction column is '%s'", cdr_info.direction_col_id)
args.cdr_file.seek(0) # Reset reader to beginning of file again.
return cdr_info
# We will need to make sure that start and end times have proper timezone info.
# If the CDRs contain TZ info, then the start and end times must also contain
# TZ info; the reverse is also true.
def adjust_start_and_end_times(start, end, cdr_tz, given_tz):
if start: logger.debug("Start date/time parsed as %r", start)
if end: logger.debug("End date/time parsed as %r", end)
logger.debug("Timezone adjustment if needed: %r", given_tz)
if cdr_tz:
if start and start.tzinfo is None:
start = start.replace(tzinfo=given_tz)
if end and end.tzinfo is None:
end = end.replace(tzinfo=given_tz)
else:
if start and start.tzinfo:
start = start.replace(tzinfo=None)
if end and end.tzinfo:
end = end.replace(tzinfo=None)
if start: logger.debug("Adjusted start date/time: %r", start)
if end: logger.debug("Adjusted end date/time: %r", end)
return start, end
def calculate_spread(intervals):
logger.debug("Calculating spread...")
spread = {}
for value in intervals.values():
if value in spread.keys():
spread[value] += 1
else:
spread[value] = 1
return spread
def print_spread(spread):
print()
print("Spread")
print("------")
for key in sorted(spread.keys()):
print(f'{key:4d} CPS: x {spread[key]}')
print()
def print_queue_times(queue_times):
print()
if queue_times:
print("Queue Time Estimates")
print("--------------------")
for queue_time in sorted(queue_times.keys()):
print(f'{queue_time:6.2f} secs: x {queue_times[queue_time]}')
else:
print("No queue times were recorded")
print()
def main(args):
configure_logging(level=getattr(logging, args.log.upper()))
cdr_info = detect_cdr_type(args)
start, end = adjust_start_and_end_times(args.start, args.end, cdr_info.tzinfo, args.tz)
logger.debug("Reading CSV file...")
intervals = {}
queue_times = {}
num_read = 0
num_counted = 0
num_written = 0
with args.cdr_file as cdr_file:
cdrs = csv.DictReader(cdr_file, fieldnames=None if cdr_info.has_header else DEFAULT_FIELDNAMES)
for cdr in cdrs:
try:
num_read += 1
# Filter all but Outgoing API calls, if the CDRs were exported from Monkey, Looker or
# Twilio Console. If not from these sources, the CDR file should be pre-filtered.
# Flags definition can be found here: https://wiki.hq.twilio.com/display/RT/Call (Twilions only).
if cdr_info.flags_col_id and (int(cdr[cdr_info.flags_col_id]) & 0x0002 != 2):
continue
if cdr_info.direction_col_id and (cdr[cdr_info.direction_col_id] not in ['Outgoing API', 'outbound-api']):
continue
# Get the call start date/time, according to the format of the source.
if cdr_info.datetime_format is None:
call_start = datetime.fromisoformat(cdr[cdr_info.start_col_id])
else:
call_start = datetime.strptime(
cdr[cdr_info.start_col_id],
cdr_info.datetime_format)
# If the call was queued, add it to a tally for the queue length, and adjust the start time.
if cdr_info.queuetime_col_id:
queue_time = Decimal(cdr[cdr_info.queuetime_col_id]) / 1000 # Result in seconds
if queue_time in queue_times.keys():
queue_times[queue_time] += 1
else:
queue_times[queue_time] = 1
if queue_time > 0:
call_start -= timedelta(seconds=int(queue_time))
# Filter records outside of the chosen period.
if start and call_start < start: continue
if end and call_start >= end: continue
# Count the call against its CPS interval.
num_counted += 1
if call_start in intervals.keys():
intervals[call_start] += 1
else:
intervals[call_start] = 1
except Exception as err:
logger.error("Line: %s", cdr)
sys.exit(f"Problem parsing CDR file: {str(err)}")
logger.debug("%s records read, %s records counted", num_read, num_counted)
logger.debug("Writing CPS file...")
with args.cps_file as cps_file:
for key, value in intervals.items():
num_written += 1
print(f'{key},{value}', file=cps_file)
logger.debug("%s records written", num_written)
if args.spread:
print_spread(calculate_spread(intervals))
if args.queue:
print_queue_times(queue_times)
if __name__ == "__main__":
main(get_args())