10
10
# 20120629 AX added loop over all arguments, exception handling, restructured code, moved processed files to archive or error folder
11
11
# 20120708 AX skip empty ip lines instead or error message
12
12
# 20120708 RB cleaning some names and spelling, also we don't want processed_files.log to clobber the downloaders processed_files.log. So we should use overly descriptive names
13
+ # 20120710 AX added locId lookup and added longip to insert query
13
14
#
14
15
# test:
15
16
# cd /DATA
21
22
# v move error files naar error directory
22
23
# v log process and errors
23
24
# v skip empty ip lines instead or error message
25
+ # v added locId lookup and added longip to insert query
24
26
#
25
27
# Get the date from the filename, and look up the correct maxmind database
26
28
# then, insert the locId directly with the line in the mlab/{glasnost,ndt} database, preventing slow future updates
27
29
# on the other hand, all these updates might be extremely slow: TEST
28
30
#
29
- # todo : refactor all the utility functions in a separate file
30
- # todo : refactor all the passwords in a separate file (which is NOT in the repo, AND is in the .gitignore list
31
-
32
-
31
+ # todo : refactor all the utility functions in a separate file
32
+ # todo : refactor all the passwords in a separate file (which is NOT in the repo, AND is in the .gitignore list
33
33
34
34
import sys
35
35
import re
39
39
import dateutil .parser as dparser
40
40
import MySQLdb
41
41
import shutil
42
+ from maxmind import MaxMind
43
+ import socket , struct
42
44
43
45
#################################################################
44
46
# #
51
53
db_user = "root" # your username
52
54
db_passwd = "" # your password
53
55
db_name = "mlab" # name of the database
54
- db_tables = {"glasnost" : "glasnost" , "ndt" : "ndt " } # a mapping from testname to tablename
56
+ db_tables = {"glasnost" : "glasnost" , "ndt" : "ndt_test " } # a mapping from testname to tablename
55
57
db_filetable = 'files'
56
58
57
59
# directories
58
60
baseDir = '/DATA/mlab/'
61
+ #baseDir = '/home/axel/mlab/'
59
62
scratchDir = baseDir + 'scratch/'
60
63
workDir = baseDir + 'work/'
61
64
archiveDir = baseDir + 'archive/'
67
70
errorLog = "mlab_mysql_import_error.log"
68
71
processLog = "mlab_mysql_import_processed_files.log"
69
72
73
+ # default tables
74
+ maxmind_table = 'Blocks_GeoLiteCity_Last'
75
+ ndt_import = 'ndt_import'
70
76
#################################################################
71
77
# #
72
78
# functions #
73
79
# #
74
80
#################################################################
75
81
82
+ # Convert an IP string to long
83
+ def ip2long (ip ):
84
+ packedIP = socket .inet_aton (ip )
85
+ return struct .unpack ("!L" , packedIP )[0 ]
86
+
87
+ def long2ip (l ):
88
+ return socket .inet_ntoa (struct .pack ('!L' , l ))
89
+
76
90
def usage ():
77
- print "Usage: mlab_mysql_import3.py mlab_file1.csv [mlab_files.csv ...]"
91
+ print "Usage: mlab_mysql_import.py [ -m maxmind_Blocks_Tablename ] mlab_file1.csv [mlab_files.csv ...]"
92
+ print "Default: maxmind_Blocks_Tablename = `Blocks_GeoLiteCity_Last`"
78
93
sys .exit (1 )
79
94
80
95
# This routine extracts the destination server of the mlab file.
@@ -133,9 +148,13 @@ def exists_dbentry(cur, file_id, db_table, test_datetime, destination, source_ip
133
148
134
149
# Insert a connection to the database without testing.
135
150
def blunt_insert_dbentry (cur , file_id , db_table , test_datetime , destination , source_ip ):
136
- columns = ', ' .join (['date' , 'destination' , 'source' , 'file_id' ])
137
- values = '"' + '", "' .join ([test_datetime .isoformat (), destination , source_ip , str (file_id )]) + '"'
151
+ longip = ip2long (source_ip )
152
+ # locid = 0
153
+ locid = mm .lookup (longip ) # lookup location id from ip number
154
+ columns = ', ' .join (['date' , 'destination' , 'source' , 'file_id' , 'longip' , 'locId' ])
155
+ values = '"' + '", "' .join ([test_datetime .isoformat (), destination , source_ip , str (file_id ), str (longip ), str (locid )]) + '"'
138
156
sql = "INSERT INTO " + db_table + " (" + columns + ") VALUES(" + values + ") "
157
+ # print sql
139
158
cur .execute (sql )
140
159
141
160
# Insert a test connection to the database, if it not already exists
@@ -170,6 +189,26 @@ def dedup(file_id, table, test_datetime, destination, source_ip):
170
189
deduplookup [key ] = True
171
190
return True
172
191
192
+ # for the temp table, look up all the locations with the locId
193
+ def lookup_locations (cur , destination ):
194
+ location_table_name = maxmind_table .replace ("Blocks" , "Location" )
195
+ # sql = 'UPDATE mlab.`' + destination + '` L, maxmind.`' + location_table_name + '` M SET L.country_code = M.country, L.region=M.region, L.city=M.city, L.postalCode=M.postalCode, L.latitude=M.latitude, L.longitude=M.longitude, L.metroCode=M.metroCode, L.areaCode=M.areaCode WHERE L.`locId` = M.`locId`'
196
+ sql = 'UPDATE mlab.`ndt_import` L, maxmind.`' + location_table_name + '` M SET L.country_code = M.country, L.region=M.region, L.city=M.city, L.postalCode=M.postalCode, L.latitude=M.latitude, L.longitude=M.longitude, L.metroCode=M.metroCode, L.areaCode=M.areaCode WHERE L.`locId` = M.`locId`'
197
+ updated = cur .execute (sql )
198
+ # update country from country_code later?
199
+ return updated
200
+
201
+ # clear the temp table
202
+ def clear_temp_table (cur ):
203
+ sql = 'truncate table `' + ndt_import + '`'
204
+ cur .execute (sql )
205
+
206
+ # move the temp table to the real on (either ndt_test or ndt)
207
+ def move_temp_table (cur , destination ):
208
+ sql = 'INSERT INTO `' + destination + '` (`created_at`, `date`, `destination`, `source`, `file_id`, `country_code`, `longip`, `locId`, `country`, `region`, `city`, `postalCode`, `latitude`, `longitude`, `metroCode`, `areaCode`) SELECT * FROM `' + ndt_import + '`'
209
+ updated = cur .execute (sql )
210
+ return updated
211
+
173
212
# returns True on error, False on correct processing
174
213
def process_file (f , filename ):
175
214
start_time = datetime .now ()
@@ -181,6 +220,7 @@ def process_file(f, filename):
181
220
passwd = db_passwd ,
182
221
db = db_name )
183
222
cur = db .cursor ()
223
+ clear_temp_table (cur )
184
224
185
225
# Find the destination server by investigating the filename
186
226
destination = extract_destination (filename )
@@ -190,15 +230,15 @@ def process_file(f, filename):
190
230
file_id = get_file_id (cur , filename )
191
231
db .commit ()
192
232
193
- # Find the testsuite by investigating the filename
233
+ # Find the testsuite (glasnost or ndt) by investigating the filename
194
234
try :
195
235
test = [test for test in db_tables .keys () if test in filename ][0 ]
196
236
except IndexError :
197
237
sys .stderr .write ('The filename ' + filename + ' does not contain a valid testname.' )
198
238
return 1
199
239
# print "Found test suite " + test
200
240
201
- # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes
241
+ # The filetest ALONE, takes 3 seconds with a 9 million records database, without indexes
202
242
# But falls back to less than half a second when indexing is turned on on the db
203
243
filetest = True
204
244
# Read the file line by line and import it into the database
@@ -215,9 +255,12 @@ def process_file(f, filename):
215
255
filetest = False
216
256
# test if we have already done it in this or last filetest
217
257
if (dedup (file_id , db_tables [test ], test_datetime , destination , source_ip )):
218
- blunt_insert_dbentry (cur , file_id , db_tables [test ], test_datetime , destination , source_ip )
258
+ # blunt_insert_dbentry(cur, file_id, db_tables[test], test_datetime, destination, source_ip)
259
+ blunt_insert_dbentry (cur , file_id , ndt_import , test_datetime , destination , source_ip )
219
260
end_time = datetime .now ()
220
261
print 'File done in ' + str (end_time - start_time )
262
+ lookup_locations (cur , destination )
263
+ move_temp_table (cur , db_tables [test ])
221
264
failure = False
222
265
except Exception as inst :
223
266
sys .stderr .write ('Exception: ' + str (inst .args ) + '\n ' )
@@ -231,9 +274,6 @@ def process_file(f, filename):
231
274
f .write (pathname + '\n ' )
232
275
f .write ('Error handling file ' + filename + ' (' + str (e .args ) + ')\n ' )
233
276
print
234
- # This bit should probably be cleaned up.
235
- # except:
236
- # sys.stderr.write('Process error ' + '\n')
237
277
finally :
238
278
# Commit and finish up
239
279
sys .stderr .flush ()
@@ -273,7 +313,11 @@ def move_archive(pathname):
273
313
274
314
parser = OptionParser ()
275
315
parser .add_option ("-q" , "--quiet" , action = "store_false" , dest = "verbose" , default = False , help = "don't print status messages to stdout" )
316
+ parser .add_option ("-m" , "--maxmind" , dest = "maxmind_table" , default = '' , help = "optional maxmind_table, if omitted we use 'Last'" )
276
317
(options , args ) = parser .parse_args ()
318
+ if options .maxmind_table != '' :
319
+ maxmind_table = options .maxmind_table
320
+
277
321
if len (args ) == 0 :
278
322
usage ()
279
323
@@ -295,17 +339,26 @@ def move_archive(pathname):
295
339
#################################################################
296
340
global_start_time = datetime .now ()
297
341
342
+ # get instance of maxmind table
343
+ print "using " + maxmind_table
344
+
345
+ mm = MaxMind (db_host , db_user , db_passwd , "maxmind" ,maxmind_table )
346
+
347
+ if not mm :
348
+ sys .stderr .write ('maxmind table does not exist: ' + maxmind_table + ' (' + str (e .args ) + ')\n ' )
349
+ exit (1 )
350
+
298
351
# Iterate over ALL filenames
299
352
for pathname in args :
300
- try :
301
- with open (pathname , 'r' ) as f :
302
- # Extract the basename of the filename, as the path is not of interest after this point
303
- filename = os .path .basename (pathname )
353
+ try :
354
+ with open (pathname , 'r' ) as f :
355
+ # Extract the basename of the filename, as the path is not of interest after this point
356
+ filename = os .path .basename (pathname )
304
357
print "processing file " + filename ,
305
- if (process_file (f , filename )):
306
- shutil .move (pathname ,errorDir )
307
- else :
308
- move_archive (pathname )
358
+ if (process_file (f , filename )):
359
+ shutil .move (pathname ,errorDir )
360
+ else :
361
+ move_archive (pathname )
309
362
# file is automatically closed if needed
310
363
except IOError as e :
311
364
print 'Could not open file ' + pathname + '\n Error: ' + str (e .args )
0 commit comments