forked from raymanfx/android-cve-checker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stats_engine.py
236 lines (193 loc) · 7.91 KB
/
stats_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import os
import sys
"""
Get changed lines from a git patchfile.
Args:
patch_file: The git patchfile to analyse
pattern: Added or removed lines ('+' or '-')
Returns:
A dict with the following key-value mapping:
[key] -> [value]
filename -> triplet
where each triplet contains three lists:
triplet[0] -> 2 lines of code above the changed block
triplet[1] -> x lines of changed code
triplet[2] -> 2 lines of code below the changed block
"""
def find_changed_lines(patch_file, pattern):
# changed lines for the whole patch
patch_changed_lines = {}
# the file which is changed
changed_file = None
# filter out the unwanted pattern
antipattern = '+' if pattern is '-' else '-'
# store patch contents in memory
patch_contents = [line.rstrip('\n') for line in
open(patch_file, 'r', encoding="utf-8") if line.strip()]
# indicate block scanning in progress
block_scan = False
changed_block = []
for i in range(0, len(patch_contents)):
line = patch_contents[i]
# first, we need to find out which file was changed
if "+++" in line[0:3]:
changed_file = line[6:].strip()
patch_changed_lines[changed_file] = []
# skip until we found which file was changed
elif not changed_file:
continue
# skip the antipattern
elif line[0] is antipattern:
continue
# skip added newlines
elif len(line) < 2:
continue
# collect the changed lines (including whitespace)
# AND the surrounding lines (one above, one below)
elif line[0] is pattern and line[1] is not pattern:
block_scan = True
changed_block.append(line[1:])
elif block_scan and line[0] is not pattern:
above_block = []
above_index = i - 1
while len(above_block) < 2:
above = patch_contents[above_index]
if above[0] is not pattern and above[0] is not antipattern:
above_block.append(above[1:])
above_index -= 1
# reverse block for later search
above_block = list(reversed(above_block))
below_block = []
below_index = i
while len(below_block) < 2:
below = patch_contents[below_index]
if below[0] is not pattern and below[0] is not antipattern:
below_block.append(below[1:])
below_index += 1
if below_index == len(patch_contents):
break
# collect the triplet
triplet = (above_block, changed_block, below_block)
# save triplet to patch changes
patch_changed_lines[changed_file].append(triplet)
# reset variables
block_scan = False
changed_block = []
return patch_changed_lines
"""
Collect ROUGH and possibly WRONG statistics about a certain CVE patch
against a given kernel repo.
Args:
kernel_repo: The kernel git repository.
cve_patch: The CVE git patch to check.
Returns:
Tuple containing stats which show the parts of a CVE that we
think is applied to the kernel.
((actually_added, total_added), (actually_removed, total_removed))
"""
def collect_stats(kernel_repo, cve_patch):
# added/removed lines for the whole patch
patch_added_lines = {}
patch_removed_lines = {}
# we use this data to calculate the fractions later
actually_added_lines = 0
actually_removed_lines = 0
patch_added_lines = find_changed_lines(cve_patch, '+')
total_added_lines = 0
for file in patch_added_lines:
triplets = patch_added_lines[file]
for (above, changed, below) in triplets:
total_added_lines += len(changed)
patch_removed_lines = find_changed_lines(cve_patch, '-')
total_removed_lines = 0
for file in patch_removed_lines:
triplets = patch_removed_lines[file]
for (above, changed, below) in triplets:
total_removed_lines += len(changed)
# check which lines of the patch have been added already
for file in patch_added_lines:
file_path = kernel_repo + '/' + file
# just skip nonexisting files
if not os.path.exists(file_path):
continue
try:
fp_contents = [line.rstrip('\n') for line in
open(file_path, 'r') if line.strip()]
except UnicodeDecodeError:
print("[E] Failed to read file: " + file_path + ", skipping!")
continue
# we need this array so we can check the above block
all_added = []
for (above, added, below) in patch_added_lines[file]:
# keep track of all added lines
all_added.extend(added)
for i in range(0, len(fp_contents)):
line = fp_contents[i]
for (above, added, below) in patch_added_lines[file]:
# check if the added block is present
block_end_index = i
if added[0] in line:
added_lines = added[1:]
fp_lines = fp_contents[i+1:i+len(added_lines)+1]
if set(added_lines) != set(fp_lines):
continue
# check the surrounding lines (two above, two below)
fp_above = []
idx = i - 1
while len(fp_above) < 2:
above_candidate = fp_contents[idx]
if above_candidate not in all_added:
fp_above.append(above_candidate)
idx -= 1
# reverse for later search
fp_above = list(reversed(fp_above))
fp_below = []
idx = block_end_index + 1
while len(fp_below) < 2:
if idx == len(fp_contents):
break
below_candidate = fp_contents[idx]
if below_candidate not in all_added:
fp_below.append(below_candidate)
idx += 1
if (set(above) == set(fp_above)
and set(below) == set(fp_below)):
actually_added_lines += len(added)
for file in patch_removed_lines:
file_path = kernel_repo + '/' + file
# just skip nonexisting files
if not os.path.exists(file_path):
continue
try:
fp_contents = [line.rstrip('\n') for line in
open(file_path, 'r') if line.strip()]
except UnicodeDecodeError:
print("[E] Failed to read file: " + file_path + ", skipping!")
continue
for i in range(0, len(fp_contents)):
lines = fp_contents[i:i+2]
for (above, removed, below) in patch_removed_lines[file]:
# make sure the removed block is missing
if set(above) != set(lines):
continue
fp_below = []
idx = i + 2
while len(fp_below) < 2:
if idx == len(fp_contents):
break
below_candidate = fp_contents[idx]
if below_candidate not in all_added:
fp_below.append(below_candidate)
idx += 1
if set(below) == set(fp_below):
actually_removed_lines += len(removed)
# if the patch does not add/remove lines, we return None
if total_added_lines == 0:
added_stats = None
else:
added_stats = (actually_added_lines, total_added_lines)
if total_removed_lines == 0:
removed_stats = None
else:
removed_stats = (actually_removed_lines, total_removed_lines)
return (added_stats, removed_stats)