-
Notifications
You must be signed in to change notification settings - Fork 0
/
loader.py
278 lines (234 loc) · 11.3 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
#!flask-venv/bin/python
from app import db, models, utils
from collections import defaultdict
from sqlalchemy import create_engine, text
import customers
import datetime
import json
import pyodbc
import sqlalchemy as sa
class VerticaLoader():
def __init__(self, dbname, user, password):
self.cid = None
self.connectedDb = None
self.cursor = None
self.dbname = None
self.password = password
self.user = user
self.SIGNIFICANT_RECORD_COUNT = 200
self.SIGNIFICANT_PERCENTILE_ERROR = .15
def dbCidMap(self):
bigs = [91, 90, 30, 3524]
alternates = [3795, 4035, 4094]
if self.cid in bigs:
self.dbname = "SQLEngBig"
elif self.cid in alternates:
self.dbname = "SQLEng3"
else:
self.dbname = "SQLEng"
def Connect(self):
self.dbCidMap()
self.engine_string = "vertica+pyodbc://{user}:{password}@{vertica}".format(user=self.user, password=self.password, vertica = self.dbname)
vertica = sa.create_engine(self.engine_string)
self.cursor = vertica.connect()
self.connectedDb = self.dbname
print "connected db", self.dbname
def Query(self, cid, start_date='', end_date='', limit=500):
self.cid = cid
self.test()
formatter = self.queryScrub({
'start_date': start_date,
'end_date': end_date,
'limit': limit,
'cid': self.cid,
})
self.currentStart = formatter['start_date']
vertica_query = utils.redshift_query.format(**formatter)
self.rows = self.cursor.execute(text(vertica_query)).fetchall()
print 'query complete', "%i rows returned" % len(self.rows)
def queryScrub(self, details):
''' some day this would check for whether a user had access to this cid '''
#self.cid = details['cid']
# default to grabbing yesterday's data
scrubbed = {
'cid': details['cid'],
'start_date': details.get('start_date', datetime.date.today() - datetime.timedelta(days=1)).isoformat(),
'end_date': details.get('end_date', datetime.date.today()).isoformat(),
'limit': details.get('limit')
}
return scrubbed
def ReduceResults(self):
hash_grouped = {}
id_keys = ['network', 'geo', 'url_domain', 'size', 'content_type', 'url_schema', 'sdk_version', 'app_guid_int']
for row in self.rows:
hash_string = "&".join(["{key}={this_key}".format(key=key, this_key=str(row[key])) for key in id_keys])
tpclass = row['class']
exists = hash_grouped.get(hash_string, False)
if not exists:
hash_info = {'id':{}, 'metrics':{}, 'count': 0}
for key in id_keys:
hash_info['id'].update({key: row[key]})
hash_grouped[hash_string] = hash_info
hash_grouped[hash_string]['count'] += row['bin_count']
class_exists = hash_grouped[hash_string]['metrics'].get(tpclass, False)
if class_exists:
hash_grouped[hash_string]['metrics'][tpclass]['count'] += row['bin_count']
if tpclass in ['acc', 'byp']:
#merge bins into existing descriptor
hash_grouped[hash_string]['metrics'][tpclass]['bins'].update({row['bin']: row['bin_count']})
else:
#create new class entries
class_info = {'count': row['bin_count']}
if tpclass in ['acc', 'byp']:
sql_percentiles = ['perc25', 'perc50', 'perc75']
sql_measures = ['fbu', 'dcu']
percentiles = {}
for measure in sql_measures:
percentiles[measure] = {}
for perc in sql_percentiles:
key = '_'.join([perc, measure])
if row[key]:
percentiles[measure][key] = float(row[key])
else:
percentiles[measure][key] = None
class_details = {
'percentiles': percentiles,
'sizes': {
'size_25': row['size_25'],
'size_50': row['size_50'],
'size_75': row['size_75']
},
'bins': {
row['bin']: int(row['bin_count']) #cast to int so it can be json serialized later
}
}
class_info.update(class_details)
hash_grouped[hash_string]['metrics'][row['class']] = class_info
print 'sum total of hashes', sum([hash_grouped[subset]['count'] for subset in hash_grouped])
self.comparables = self.finalForm(self.significantChecks(hash_grouped))
def checkClasses(self, comparable):
#do medians exist for both acc and byp?
return all([ comparable['metrics'].get('byp', False), comparable['metrics'].get('acc', False)])
def checkSampleSize(self, comparable_count):
return comparable_count >= self.SIGNIFICANT_RECORD_COUNT
def checkSizePrecision(self, comparable_metrics):
byp_sizes = comparable_metrics.get('byp', {}).get('sizes', {})
acc_sizes = comparable_metrics.get('acc', {}).get('sizes', {})
if not acc_sizes and byp_sizes:
checks=[False]
else:
try:
checks = [((byp_sizes[perc] - acc_sizes[perc]) / byp_sizes[perc]) <
self.SIGNIFICANT_PERCENTILE_ERROR for perc in ['size_25', 'size_50', 'size_75']]
except:
print "line 133", comparable_metrics
checks = [False]
return all(checks)
def significantChecks(self, hash_grouped):
comparables = []
for hashid, comparable in hash_grouped.iteritems():
num_comp = sum([comparable['metrics'].get(tpclass, {}).get('count', 0)
for tpclass in ['acc', 'byp']])
num_except = sum([comparable['metrics'].get(tpclass, {}).get('count', 0) for tpclass in ['ace', 'bye']])
comparable['num_comparable_records'] = num_comp
comparable['num_exception_records'] = num_except
comparable['num_total_records'] = sum([num_comp, num_except])
if self.checkClasses(comparable):
if self.checkSampleSize(comparable['num_comparable_records']):
if self.checkSizePrecision(comparable['metrics']):
comparable['comparability'] = True
else:
comparable['comparability'] = False
comparable['fail_reason'] = 'size comparison'
else:
comparable['comparability'] = False
comparable['fail_reason'] = 'sample size'
else:
comparable['comparability'] = False
comparable['fail_reason'] = 'acc/byp samples'
comparables.append(comparable)
return comparables
def finalForm(self, comparable_list):
reduced_comparables = []
for comparable in comparable_list:
reduced = {
'cid': self.cid,
'network': comparable['id']['network'],
'geo': comparable['id']['geo'],
'url_domain': comparable['id']['url_domain'],
'content_type': comparable['id']['content_type'],
'size': comparable['id']['size'],
'schema': comparable['id']['url_schema'],
'sdk_version': comparable['id']['sdk_version'],
'num_total_records': comparable['num_total_records'],
'num_comparable_records': comparable['num_comparable_records'],
'num_exception_records': comparable['num_exception_records'],
'comparability': comparable['comparability'],
'fail_reason': comparable.get('fail_reason', None),
'reduced_date': self.currentStart,
'app_guid': comparable['id']['app_guid_int']
}
byp_metrics, acc_metrics = comparable['metrics'].get('byp', {}), comparable['metrics'].get('acc', {})
bins = {
'acc': acc_metrics.get('bins', {}),
'byp': byp_metrics.get('bins', {})
}
percentiles = {
'acc': acc_metrics.get('percentiles', {}),
'byp': byp_metrics.get('percentiles', {})
}
reduced.update({'percentiles': json.dumps(percentiles), 'bins': json.dumps(bins)})
if comparable['comparability']:
gain = float(percentiles['byp']['dcu']['perc50_dcu']) / float(percentiles['acc']['dcu']['perc50_dcu']) - 1
else:
gain = None
reduced.update({'gain': gain})
reduced_comparables.append(reduced)
return reduced_comparables
def LoadToProduction(self):
print "Adding {} comparables to reduced_row".format(len(self.comparables))
for entry in self.comparables:
rr = models.ReducedRow(**entry)
db.session.add(rr)
db.session.commit()
def RemoveOldEntries(self, cutoff_date):
print "removing entries prior to {}".format(cutoff_date)
num_rows_deleted = models.ReducedRow.query.filter_by(cid=self.cid).filter(models.ReducedRow.reduced_date <= cutoff_date).delete()
db.session.commit()
print "{} rows deleted".format(num_rows_deleted)
def DailyJobs(self, cid, num_days, end=datetime.datetime.now()):
''' create a set of jobs to process for daily aggregation '''
print "Grabbing the last {} days for all guids in cid {}".format(num_days, cid)
today = datetime.date.today()
dates = [ (today - datetime.timedelta(days=n), today - datetime.timedelta(days=n+1)) for n in range(0, num_days) ]
for job in dates:
self.Query( cid, job[1], job[0], 500000 )
print "vertica query for {} - {} to {} with {}".format(job[1], job[0], self.dbname, self.cursor)
self.ReduceResults()
self.LoadToProduction()
self.rows, self.comparables = None, None
self.RemoveOldEntries( today - datetime.timedelta(days=5 + 1)) #remove entries prior to last 5 days
def test(self):
self.dbCidMap()
if self.connectedDb != self.dbname:
print "mismatch db"
self.cursor = None
if not self.cursor:
print "grabbing new connection"
self.Connect()
if __name__ == "__main__":
import credentials
import sys
cid = sys.argv[1]
num_days = int(sys.argv[2])
if cid != "auto":
try:
cids = [int(cid)]
except:
print "invalid cid specified"
exit()
else:
cids = [cid for cid in customers.cidToName.keys()]
tplog = VerticaLoader(credentials.vertica_dbname, credentials.vertica_user, credentials.vertica_pass)
for cid in cids:
tplog.DailyJobs(cid, num_days)