forked from sroberts/malwarehouse
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmalwarehouse.py
394 lines (311 loc) · 13.8 KB
/
malwarehouse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
#!/usr/bin/env python
# encoding: utf-8
"""
malwarehouse.py
Created by Scott Roberts.
Copyright (c) 2012 TogaFoamParty Studios. All rights reserved.
Yara/Exiftool/SSdeep/VT added by Chris Clark [email protected]
"""
import datetime, os, hashlib, sqlite3, shutil, re, urllib, urllib2, ConfigParser, json
import magic
from optparse import OptionParser
from collections import OrderedDict
#Get Config Options
config = ConfigParser.SafeConfigParser()
config.read("malwarehouse.cfg")
option_base_dir = config.get('settings','basedir')
# Import optional requirments
if config.get('options', 'metadata') == 'On':
import exiftool
if config.get('options', 'yara') == 'On':
import yara
if config.get('options', 'ssdeep') == 'On':
import pydeep
def check_prelim():
"""Initial setup code. Eventually this will set options."""
db_path = option_base_dir + "malwarehouse.db"
dir_path = os.path.dirname(option_base_dir)
if not os.path.exists(dir_path):
print "[ERROR] Base directory %s doesn't exist. Creating it..." % (dir_path)
os.makedirs(dir_path)
if not os.path.exists(db_path):
print "[ERROR] Database %s doesn't exist. Creating it..." % (db_path)
conn = sqlite3.connect(db_path)
c = conn.cursor()
# Create table
c.execute('''CREATE TABLE IF NOT EXISTS malwarehouse_index (datetime text, name text, mimetype text, tags text, size integer, md5 text, sha256 text, source text, notes text, vtapi text, metadata text, ssdeep text, yara text)''')
# Save (commit) the changes
conn.commit()
# We can also close the cursor if we are done with it
c.close()
return True
def vtapi(md5):
url = "https://www.virustotal.com/vtapi/v2/file/report"
parameters = {"resource": md5, "apikey": config.get('settings','vtapikey') }
data = urllib.urlencode(parameters)
req = urllib2.Request(url, data)
response = urllib2.urlopen(req)
VTjson = json.loads(response.read())
#print VTjson
if VTjson['response_code'] == 1 :
return str(VTjson['positives']) + '/' + str(VTjson['total']) + ' on ' + str(VTjson['scan_date'])
else :
return "File Does Not Exist in VirusTotal"
def yara_scan(malware_path):
compiledRules = yara.compile(config.get('settings','yararules'))
matches = compiledRules.match(malware_path)
return str(matches)
def get_md5(malware_path):
"""Wrapper for the usual md5 call because it's so verbose."""
return hashlib.md5(file(malware_path, 'r').read()).hexdigest()
def get_ssdeep(malware_path):
return pydeep.hash_file(malware_path)
def get_sha256(malware_path):
"""Wrapper for the usual sha256 call because it's so verbose."""
return hashlib.sha256(file(malware_path, 'r').read()).hexdigest()
def get_mime_type(malware_path):
"""Using magic library to detect the mime type of a given file."""
try:
mime = magic.Magic(mime=True)
return mime.from_file(malware_path)
except Exception:
return "Unknown"
def get_metadata(malware_path):
with exiftool.ExifTool() as et:
metadata = et.get_metadata(malware_path)
del metadata[u'SourceFile']
del metadata[u'File:FilePermissions']
del metadata[u'File:Directory']
del metadata[u'ExifTool:ExifToolVersion']
del metadata[u'File:MIMEType']
metadataString = '\n'
for each in metadata.items():
try:
metadataString += '\t\t' + str(each[0]).split(':', 1)[-1] + ": " + str(each[1]) + "\n"
except Exception, e:
continue
return metadataString
def summary(report_json):
"""Prints an easy to read summary of the malware."""
return "-> %s (%s) Source: %s \n VirusTotal: %s Tags: %s \n Notes: %s \n Yara: %s \n -> Sample Location: %s \n" % (report_json['name'], report_json['md5'], report_json['source'], report_json['vtapi'], report_json['tags'], report_json['notes'], report_json['yara'], option_base_dir + report_json['sha256'])
def details(report_json):
"""Prints an easy to read summary of the malware."""
details = ""
details += "datetime: %s\n" % (report_json['datetime'])
details += "name: %s\n" % (report_json['name'])
details += "source: %s\n" % (report_json['source'])
details += "tags: %s\n" % (report_json['tags'])
details += "notes: %s\n" % (report_json['notes'])
details += "mimetype: %s\n" % (report_json['mimetype'])
details += "size: %s\n" % (report_json['size'])
details += "md5: %s\n" % (report_json['md5'])
details += "sha256: %s\n" % (report_json['sha256'])
details += "ssdeep: %s\n" % (report_json['ssdeep'])
details += "virustotal: %s\n" % (report_json['vtapi'])
details += "yara: %s\n" % (report_json['yara'])
details += "metadata: %s\n" % (report_json['metadata'])
details += "\n"
details += "sample directory: %s" % option_base_dir + report_json['sha256']
return details
def load_db(report_json):
"""Load information about the sample into the index DB."""
conn = sqlite3.connect(option_base_dir + "malwarehouse.db")
c = conn.cursor()
# Insert a row of data
c.execute("INSERT INTO malwarehouse_index VALUES (\"%s\", \"%s\", \"%s\", \"%s\", \"%d\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\")" % (report_json['datetime'], report_json['name'], report_json['mimetype'], report_json['tags'], report_json['size'], report_json['md5'], report_json['sha256'], report_json['source'], report_json['notes'], report_json['vtapi'], report_json['metadata'], report_json['ssdeep'], report_json['yara'],))
# Save (commit) the changes
conn.commit()
# We can also close the cursor if we are done with it
c.close()
print "Sample %s loaded..." % report_json['name']
return True
def load_directory(report_json, malware_path):
"""Creates a directory to store the malware and summary."""
print "Loading Malware %s" % malware_path
sample_dir = "%s%s" % (option_base_dir, report_json['sha256'])
if not os.path.exists(sample_dir):
print "Creating %s" % (sample_dir)
os.makedirs(sample_dir)
shutil.move(malware_path, sample_dir)
with open(sample_dir + '/summary.txt', 'w') as text_file:
text_file.write(details(report_json))
return True
def malware_loader(report_json, malware_path):
load_db(report_json)
load_directory(report_json, malware_path)
def parse_sqlite_result(unparsed):
"Takes the results from a SQLite query and parses it as a dictionary."
return {'datetime': unparsed[0], 'name': unparsed[1], 'mimetype': unparsed[2], 'tags': unparsed[3], 'size': unparsed[4], 'md5': unparsed[5], 'sha256': unparsed[6], 'source': unparsed[7], 'notes': unparsed[8], 'vtapi': unparsed[9], 'metadata': unparsed[10], 'ssdeep': unparsed[11], 'yara': unparsed[12]}
def find_sample(find_string):
conn = sqlite3.connect(option_base_dir + "malwarehouse.db")
c = conn.cursor()
if re.findall(r"^([a-fA-F\d]{64})$", find_string):
#print "\nResults for sha256: %s" % find_string
c.execute("SELECT * FROM malwarehouse_index WHERE sha256 = ?", (find_string,))
elif re.findall(r"^([a-fA-F\d]{32})$", find_string):
#print "\nResults for md5: %s" % find_string
c.execute("SELECT * FROM malwarehouse_index WHERE md5 = ?", (find_string,))
else:
#print "\nResults for file name: %s" % find_string
c.execute("SELECT * FROM malwarehouse_index WHERE name LIKE :query OR tags LIKE :query OR source LIKE :query ", {"query" : '%' + find_string + '%'})
data=c.fetchall()
if data:
print "\nResults for \"" + find_string + "\":\n"
for result in data:
print summary(parse_sqlite_result(result))
return True
else:
print('No Samples Named, Hashed, Sourced or Tagged: %s'%find_string)
return False
def find_sample_meta(find_string):
conn = sqlite3.connect(option_base_dir + "malwarehouse.db")
c = conn.cursor()
c.execute("SELECT * FROM malwarehouse_index WHERE metadata LIKE :query", {"query" : '%' + find_string + '%'})
data=c.fetchall()
if data:
print "\nResults for \"" + find_string + "\" in Metadata:\n"
for result in data:
print summary(parse_sqlite_result(result))
return True
else:
print('No Samples have Metadata matches for: %s'%find_string)
return False
def find_sample_yara(find_string):
conn = sqlite3.connect(option_base_dir + "malwarehouse.db")
c = conn.cursor()
c.execute("SELECT * FROM malwarehouse_index WHERE yara LIKE :query", {"query" : '%' + find_string + '%'})
data=c.fetchall()
if data:
print"\nResults for \"" + find_string + "\" in Yara Matches:\n"
for result in data:
print summary(parse_sqlite_result(result))
return True
else:
print('No Samples have yara matches for: %s'%find_string)
return False
def recent(quanity='5'):
"""Returns a summary of the last n (default: n = 5) pieces of malware."""
conn = sqlite3.connect(option_base_dir + "malwarehouse.db")
c = conn.cursor()
try:
float(quanity)
except:
raise ValueError
c.execute("SELECT DISTINCT * FROM malwarehouse_index ORDER BY datetime DESC LIMIT %s;" % quanity)
data = c.fetchall()
print "\n%s Most Recent Samples:" % quanity
for result in data:
print summary(parse_sqlite_result(result))
return True
def basic_analyzer(malware_path, source, tags, notes):
"""Basic analyzer does the initial triage analysis getting size & hashes along with user supplied data. Returns a dictionary with info."""
try:
with open(malware_path) as f: pass
except IOError as e:
print "You specified an invalid malware target path."
exit(0)
return False
malware_definition = {}
#Parsing Custom Options
malware_definition["source"] = source
malware_definition["tags"] = tags
malware_definition["notes"] = notes
#Parsing Automatically Generated Options
malware_definition["name"] = malware_path.split('/')[-1]
malware_definition["datetime"] = str(datetime.datetime.now())
malware_definition["size"] = os.stat(malware_path).st_size
malware_definition["md5"] = get_md5(malware_path)
malware_definition["ssdeep"] = get_ssdeep(malware_path) if config.get('options', 'ssdeep') == 'On' else 'Disabled'
malware_definition["yara"] = yara_scan(malware_path) if config.get('options', 'yara') == 'On' else 'Disabled'
malware_definition["sha256"] = get_sha256(malware_path)
malware_definition["mimetype"] = get_mime_type(malware_path)
malware_definition["vtapi"] = vtapi(malware_definition["md5"]) if config.get('options', 'vtcheck') == 'On' else 'Disabled'
malware_definition["metadata"] = get_metadata(malware_path) if config.get('options', 'metadata') == 'On' else 'Disabled'
return malware_definition
def main():
if not check_prelim():
print "[ERROR] Initial setup unable to complete. Exiting..."
return False
parser = OptionParser(usage="usage: %prog [options] filepath", version="%prog 0.1")
parser.add_option("-s", "--source",
action="store",
type="string",
dest="SOURCE",
default=None,
help="Source of file")
parser.add_option("-t", "--tags",
action="store",
type="string",
dest="TAGS",
default="No tags",
help="Any characteristics of the malware")
parser.add_option("-n", "--notes",
action="store",
type="string",
dest="NOTES",
default="",
help="Notes about file")
parser.add_option("-f", "--find",
action="store",
type="string",
dest="FIND",
default="",
help="Find a sample by name, tags, source, md5, or sha256")
parser.add_option("-m", "--metadata",
action="store",
type="string",
dest="MFIND",
default="",
help="Find a sample by searching Extracted Metadata")
parser.add_option("-y", "--yara",
action="store",
type="string",
dest="YFIND",
default="",
help="Find a sample by searching Yara Matches")
parser.add_option("-r", "--recent",
action="store",
type="string",
dest="QUANTITY",
default="",
help="Find the most recent # samples")
(options, args) = parser.parse_args()
if options.QUANTITY:
if recent(options.QUANTITY):
return True
else:
return False
elif options.FIND:
if find_sample(options.FIND):
return True
else:
return False
elif options.MFIND:
if find_sample_meta(options.MFIND):
return True
else:
return False
elif options.YFIND:
if find_sample_yara(options.YFIND):
return True
else:
return False
elif len(args) != 1 and not options.FIND and not options.MFIND and not options.YFIND:
parser.error("You didn't specify a malware target path.")
return False
else:
#Parse malware file path and name
malware_path = args[0]
malware_definition = basic_analyzer(malware_path, options.SOURCE, options.TAGS, options.NOTES)
print "Analysis complete. Loading."
malware_loader(malware_definition, malware_path)
return True
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print "User aborted."
except SystemExit:
pass
#except:
#SystemExit