-
Notifications
You must be signed in to change notification settings - Fork 1
/
zip_preprocess.py
executable file
·198 lines (170 loc) · 9.25 KB
/
zip_preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python3
"""zip_preprocess.py."""
import argparse
import sys
import csv
import json
import time
import os
from libs.zipeditor.zipeditor import ZipEditor, zip_scanner, zip_scanner_excludedirs
from zipfile import BadZipFile
from libs.fuzzyhasher.fuzzyhasher import FuzzyHasher
from libs.preprocess.preprocess import Preprocessor, content_field_standardize
from libs.preprocess.preprocess import source_field_from_filename
from libs.deduper.deduper import LinkFilter
def zip_batch_process(zip_dir_root='', source_field='content', preprocessing_log='_preprocessing_log.csv', wikifier_output_dir='wikifier', skip_rerun=False, tmpdir=None):
"""Batch preprocess."""
# Start the timer
startBatch = time.time()
timings = []
# get list of all zips
zip_files = zip_scanner_excludedirs(zip_dir_root)
print(len(zip_files), 'zip files found')
# create a FuzzyHasher for making and comparing hashes
fhr = FuzzyHasher(source_field=source_field, prefilter='baggify,lower_alnum')
# preprocess configuration
options = {
'merge_noun_chunks': False,
'merge_subtokens': False,
'skip_ents': ['CARDINAL', 'DATE (except months)', 'QUANTITY', 'TIME'],
'collect_readability_scores': True
}
skip_files = []
if(skip_rerun and preprocessing_log):
try:
with open(preprocessing_log, 'r') as plogfile:
reader = csv.reader(plogfile)
for row in reader:
skip_files.append(row[1])
except FileNotFoundError as err:
print("Preprocessing log file not found, will create while logging.")
# create a Preprocessor
pp = Preprocessor()
# create a LinkFilter
lf = LinkFilter()
# loop over zips and unpack for editing
for zip_file in zip_files:
# skip
if(skip_rerun and zip_file in skip_files):
print("\n---\nSkipping:", zip_file)
continue
print("\n---\nOpening:", zip_file)
startZip = time.time()
with ZipEditor(zip_file, tmpdir) as zed:
changed = False
try:
zed.open()
except (BadZipFile, PermissionError, RuntimeError) as err:
print(err.__class__.__name__, ": ", zip_file, err)
with open(preprocessing_log, 'a') as plogfile:
plogfile.write('zip_fail,' + zip_file + ',' + str(err.__class__.__name__) + ': ' + str(err) + '\n')
continue
manifest_dir = zed.getdir()
# get file list
json_files = [os.path.join(r, file) for r, d, f in os.walk(manifest_dir) for file in f if file.endswith('.json') and not file.startswith('._')]
print(len(json_files), 'json files found')
# loop through json files for fixes and fuzzy hash
for json_file in json_files:
with open(json_file, 'r+') as f:
try:
data = json.load(f)
except (json.decoder.JSONDecodeError, KeyError, PermissionError, ValueError) as err:
with open(preprocessing_log, 'a') as plogfile:
# with @ the entry will not cause zip skipping
plogfile.write('json_fail,' + zip_file + '@' + json_file + ',' + str(err.__class__.__name__) + ': ' + str(err) + '\n')
continue
# fix for non-standard content fields
changed_scrub = content_field_standardize(data)
# add source fields, update metapath
namestr = os.path.splitext(os.path.basename(json_file))[0]
changed_source = source_field_from_filename(namestr, data)
# request a hash add, record if it changed the file
changed_hash = fhr.add_hash_to_json(data, update_old=not skip_rerun)
# modify file only if something changed
changed_file = changed_scrub or changed_hash or changed_source
if changed_file:
f.seek(0)
json.dump(data, f, indent=2)
f.truncate()
# mark zip for saving if any file changed
changed = True
try:
# deduplicate
results = fhr.compare_files_in_dir(zed.getdir())
result_list = [[str(item).replace(zed.getdir()+'/','') for item in row] for row in results]
if result_list:
print('\n...duplicates found:', str(len(result_list)), '\n')
changed = True
with open(os.path.join(zed.getdir(),'_duplicates.txt'), "w") as dupefile:
writer = csv.writer(dupefile, dialect='excel-tab')
for result in result_list:
writer.writerow(result)
# create delete list
lf.links = result_list
deletes_list = lf.filter_nodes(source='components', filter='remove')
# print('dl', deletes_list)
with open(os.path.join(zed.getdir(),'_deletes.txt'), "w") as delfile:
for item in deletes_list:
delfile.write("%s\n" % item)
else:
print('\n...no duplicates found.')
except (json.decoder.JSONDecodeError, KeyError, PermissionError, ValueError) as err:
with open(preprocessing_log, 'a') as plogfile:
# with @ the entry will not cause zip skipping
plogfile.write('deduplicate_fail,' + '@' + zip_file + ',' + str(err.__class__.__name__) + ': ' + str(err) + '\n')
continue
# create the wikifier output directory. setting the preprocessor wikifier_output_dir property activates outputting during the preprocess
pp.wikifier_output_dir = os.path.join(wikifier_output_dir, os.path.basename(zed.file).rsplit('.zip')[0])
os.makedirs(pp.wikifier_output_dir, exist_ok=True)
with open(preprocessing_log, 'a') as plogfile:
try:
pp.preprocess_dir(manifest_dir=manifest_dir, content_property='content', kwargs=options)
plogfile.write('done,' + zip_file + '\n')
changed = True
if changed:
print('\n ...saving:', zip_file)
zed.save()
except (json.decoder.JSONDecodeError, KeyError, PermissionError, ValueError) as err:
print(err)
plogfile.write('preprocess_fail,' + zip_file + ',' + str(err.__class__.__name__) + ': ' + str(err) + '\n')
print('\n...closing:', zip_file, '\n\n')
endZip = time.time()
t = endZip - startZip
timings.append([t, zip_file])
print('Processed zip in ' + str(t) + ' seconds.\n\n----------\n\n')
endBatch = time.time()
t = endBatch - startBatch
timings.append([t, "TOTAL"])
print('\n\n==========\nProcessed all zip files in ' + str(t) + ' seconds.')
with open(os.path.join(zip_dir_root,'_timings.txt'), "w") as timefile:
writer = csv.writer(timefile, dialect='excel-tab')
for timing in timings:
writer.writerow(timing)
print(timings)
def main(args):
"""Collection of actions to execute on run."""
zip_batch_process(zip_dir_root=args.inpath, source_field=args.content, preprocessing_log=args.log, wikifier_output_dir=args.wiki, skip_rerun=args.skip, tmpdir=args.tmpdir)
if __name__ == '__main__':
PARSER = argparse.ArgumentParser(description=__doc__,
usage='use "%(prog)s --help" for more information',
formatter_class=argparse.RawTextHelpFormatter)
PARSER.add_argument('-i', '--inpath', default='.',
help='input path for directory of zips, e.g. "../data"')
PARSER.add_argument('-l', '--log', default='_preprocessing_log.csv',
help='output file path for log file, e.g. "_preprocessing_log.csv"')
PARSER.add_argument('-c', '--content', default='content',
help='json file field for source, e.g. "content"')
PARSER.add_argument('-w', '--wiki', default='wikifier',
help='output directory path for wikifier data, e.g. "wikifier"')
PARSER.add_argument('-s', '--skip', action='store_true',
help='skip rerun of any files already listed in log, whether done or fail; false by default')
PARSER.add_argument('-t', '--tmpdir', default='',
help='root path for unpacking tmp files, system defined by default')
# PARSER.add_argument('-d', '--dedupe', action='store_true', help='generate deduplicate analysis, false by default ')
# PARSER.add_argument('-h', '--hash', action='store_true', help='add fuzzy hashes to articles, false by default ')
# PARSER.add_argument('-m', '--meta', action='store_true', help='add spacy metadata, false by default')
if not sys.argv[1:]:
PARSER.print_help()
PARSER.exit()
ARGS = PARSER.parse_args()
main(ARGS)