-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathusb-util.py
executable file
·149 lines (107 loc) · 4.31 KB
/
usb-util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
"""
OVERVIEW
Extract USB mass storage device events from Cb Response.
"""
import argparse
import csv
import json
import os
import sys
from redcanary.responseutils.common import *
from cbapi.response import CbEnterpriseResponseAPI
from cbapi.response.models import Process
if sys.version_info.major >= 3:
_python3 = True
else:
_python3 = False
# Use these to find registry events of interest. Disk device class is probably
# what you want, but you may also choose to fool with the volume device class
# as well: '{53f5630d-b6bf-11d0-94f2-00a0c91efb8b}'
match_guid = ['{53f56307-b6bf-11d0-94f2-00a0c91efb8b}']
search_terms = ["registry\\machine\\system\\currentcontrolset\\control\\deviceclasses\\{53f5630d-b6bf-11d0-94f2-00a0c91efb8b}\\*",
"registry\\machine\\currentcontrolset\\control\\deviceclasses\\{53f56307-b6bf-11d0-94f2-00a0c91efb8b}\\*"]
class USBEvent:
def __init__(self, path):
self.path = path
self.vendor = ''
self.product = ''
self.version = ''
self.serial = ''
#self.drive_letter = ''
#self.volume_name = ''
self.parse()
def __repr__(self):
for k,v in self.__dict__.iteritems():
print('{0},{1}'.format(k, v))
def parse(self):
path = self.path.split('usbstor#disk&')[1]
fields = path.split('&')
self.vendor = fields[0].split('ven_')[1]
self.product = fields[1].split('prod_')[1]
if self.vendor == 'drobo':
# Drobo doesn't provide a version
drobo_fields = self.product.split('#')
self.product = drobo_fields[0]
self.serial = drobo_fields[1]
else:
self.version = fields[2].split('#')[0].split('rev_')[1]
self.serial = fields[2].split('#')[1]
def usbstor_search(cb_conn, query, query_base=None, timestamps=False):
if query_base is not None:
query += query_base
query_result = cb_conn.select(Process).where(query)
query_result_len = len(query_result)
results = set()
for proc in query_result:
for regmod in proc.regmods:
#TODO: Convert time boundary (minutes) and check against the
# regmod event time to speed things up
for guid in match_guid:
if guid in regmod.path and 'usbstor#disk&' in regmod.path:
usb_result = USBEvent(regmod.path)
output_fields = [proc.hostname,
usb_result.vendor,
usb_result.product,
usb_result.version,
usb_result.serial]
if timestamps == True:
output_fields.insert(0, convert_timestamp(regmod.timestamp))
results.add(tuple(output_fields))
return results
def main():
parser = build_cli_parser("USB utility")
# Output options
parser.add_argument("--timestamps", action="store_true",
help="Include timestamps in results.")
args = parser.parse_args()
if args.queryfile:
sys.exit("queryfile not supported in this utility")
if args.prefix:
output_filename = '%s-usbstor.csv' % args.prefix
else:
output_filename = 'usbstor.csv'
if args.profile:
cb = CbEnterpriseResponseAPI(profile=args.profile)
else:
cb = CbEnterpriseResponseAPI()
output_file = open(output_filename, 'w')
writer = csv.writer(output_file, quoting=csv.QUOTE_ALL)
header_row = ['endpoint', 'vendor', 'product', 'version', 'serial']
if args.timestamps == True:
header_row.insert(0, 'timestamp')
writer.writerow(header_row)
for term in search_terms:
query = 'process_name:ntoskrnl.exe regmod:%s' % term
if args.days:
query += ' last_update:-%dm' % (args.days*1440)
elif args.minutes:
query += ' last_update:-%dm' % args.minutes
results = usbstor_search(cb, query, query_base=args.query, timestamps=args.timestamps)
for row in results:
if _python3 == False:
row = [col.encode('utf8') if isinstance(col, unicode) else col for col in list(row)]
writer.writerow(row)
output_file.close()
if __name__ == '__main__':
sys.exit(main())