-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathoffset_finder.py
280 lines (236 loc) · 10.9 KB
/
offset_finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# @file offset_finder.py
# @author Esko Kautto ([email protected])
# @updated 2016-06-16
from copy import deepcopy
from structures import CIGAR
import time
class OffsetFinder:
max_iterations = 10
@staticmethod
def message(msg):
if type(msg) is str:
msg = msg.split('\n')
for line in msg:
timestamp = time.strftime('%m/%d/%y %H:%M:%S')
print('[{0}] OffsetFinder: {1}'.format(timestamp, line))
# end .message()
"""
Determine the offset for starting to read the read's
sequence, based on the read's start position, the
CIGAR string, and the msi locus start. Since there
could be potential insertions or deletions before
the MSI locus, the offset has to take those into account
from the CIGAR.
"""
@staticmethod
def find_offset(read, locus):
# First, get the best-effort estimate for the offset,
# based on the read's expected genomic start position,
# the locus start, and the read's CIGAR string.
offset = OffsetFinder.estimate_offset(read, locus)
# Check the boundary to make sure the entire read isn't
# slightly offset; this can happen if the aligner shifts
# the read.pos when an indel is very close to the edge of
# the read sequence.
offset_change = OffsetFinder.adjust_for_shifts(
read,
locus,
offset)
offset += offset_change
return offset
# end .find_offset()
"""
Estimate the offset of the read sequence in relation to the
target locus start. The returned offset will be an integer in
the range of [0, |sequence|]. An offset of 0 will indicate the
read starts right at the locus boundary; an offset of |sequence|
indicates the read doesn't cover the locus.
"""
@staticmethod
def estimate_offset(read, locus):
offset = 0
genomic_pos = read.pos
cigar = CIGAR.to_array(read.cigar)
if cigar[0][0] == 'S':
# Soft clipped. Pop the element off the list so that the soft
# clipped region isn't double-processed later in the method.
# Ideally the SAMRead preprocessing will have removed any
# preceding soft clipping so this won't be needed.
offset += cigar[0][1]
cigar.pop(0)
if genomic_pos == locus.start:
# Genomic position of the start of the read is the start
# of the MSI locus. Alignment might have resulted in position
# being slightly off; make sure the sequence starts with the
# target kmmer. If it doesn't, shift to the right (increment
# offset) until the right kmer is encountered.
# No (further) offset required.
if not OffsetFinder.at_target_kmer(read, locus, offset):
offset += OffsetFinder.adjust_reading_frame(read, locus, offset)
return offset
# Iterate through the cigar string until we find the
# start of the locus
while cigar:
token = cigar[0][0]
size = cigar[0][1]
cigar.pop(0) # Remove this CIGAR segment.
if token == 'S':
# Soft clipped segment. This should not happen in the middle
# of a CIGAR string; since the SAMRead preprocessor gets rid
# of prepended soft clippings, and there is a previous check
# for it within this method, the only time this should be
# encountered is at the end of a sequence, at which point the
# read likely did not cover the MSI locus.
genomic_pos += size
offset += size
elif token == 'D':
# A deletion in the sequence. The genomic position will be
# incremented by the size of the deletion, but the offset
# will not be affected since the deleted bases are not in
# the read sequence.
genomic_pos += size
elif token == 'I':
# An insertion of one or more bases into the sequence.
# Since the insertion is not part of the original (reference)
# sequence, the genomic position will not be incremented, but
# the offset will be to essentially skip over the insertion.
offset += size
elif token == 'M':
if genomic_pos + size > locus.start:
# Matched sequence crosses locus boundary; increment
# the offset only be the difference so that bases are
# not accidentally skipped.
offset += locus.start - genomic_pos
genomic_pos = locus.start
elif genomic_pos + size == locus.start:
# Matched sequence reaches the exact boundary of the
# MSI locus. If the next CIGAR string segment is an
# indel, run an additional calculation to check if
# further offset is necessary.
offset += size
genomic_pos = locus.start
if len(cigar) and cigar[0][0] in ['D', 'I']:
offset += OffsetFinder.boundary_indel_offset(
read, locus, cigar, offset, genomic_pos)
elif genomic_pos + size == locus.start - 1:
# Right before the locus boundary. As with the previous
# case, make sure an indel in the next CIGAR segment
# is accounted for if necessary.
offset += size
genomic_pos = locus.start - 1
if len(cigar) and cigar[0][0] in ['D', 'I']:
offset += OffsetFinder.boundary_indel_offset(
read, locus, cigar, offset, genomic_pos)
else:
# Not at the locus boundary yet; increment offset and
# genomic position normally.
offset += size
genomic_pos += size
if genomic_pos >= locus.start:
# MSI locus reached
break
# Return the estimate offset.
return offset
# end .estimate_offset()
"""
Handles cases where an indel exists right at the locus boundary.
This is relatively likely if the aligner accounted for any repeat
count changes by placing the added or removed k-mers at the start
of the MSI locus.
"""
@staticmethod
def boundary_indel_offset(read, locus, cigar, offset, genomic_pos):
offset_change = 0
if genomic_pos == locus.start and False:
# Only checks insertions at the locus boundary.
insertion_seq = read.seq[seq_pos:seq_pos+insert_size]
if insertion_seq[0:locus.kmer_length] != locus.kmer:
# The insertion appers to be a random insertion, not
# part of the repeat count change for the MSI locus.
# Shift the offset accordingly.
offset += insert_size
return offset_change
# end .boundary_indel_offset()
"""
Checks the boundary position to make sure the entire read before the MSI locus
isn't slightly shifted; this could happen if the aligner shifts the 'start'
position of the entire read when the locus is close to the edge of the read
sequence (e.g. the start position for caTTTTT.. might get decremented by N
if there are N extra T's in the locus, despite the read starting with 'ca'.)
"""
@staticmethod
def adjust_for_shifts(read, locus, offset):
# First try to get sequence into the proper reading frame (if it
# isn't already) by left- or right-shifting it.
offset_change = OffsetFinder.adjust_reading_frame(read, locus, offset)
if offset_change < 0:
# Encountered problem adjusting reading frame.
return 0
# Check if we can adjust the offset to an earlier position to
# capture an earlier (continuous) start for the repeat sequence.
offset_change += OffsetFinder.shift_start_left(read, locus, offset + offset_change)
return offset_change
# end .adjust_for_shifts()
"""
Tries to adjust the read sequence into the proper reading frame.
"""
@staticmethod
def adjust_reading_frame(read, locus, offset):
if OffsetFinder.at_target_kmer(read, locus, offset):
# Already in proper reading frame, no adjusting required.
return 0
# Try to adjust the reading frame until the read is within
# the correct frame.
offset_change = 0
in_frame = False
for i in range(1, OffsetFinder.max_iterations + 1):
if OffsetFinder.at_target_kmer(read, locus, offset - i):
# Left-shifting read moved it into the proper reading frame.
in_frame = True
offset_change = -1 * i
elif OffsetFinder.at_target_kmer(read, locus, offset + i):
# Right-shifting read moved it into the proper reading frame.
in_frame = True
offset_change = i
if in_frame:
# Read is now in the proper reading frame.
break
if not in_frame:
# Couldn't get the read into the right reading frame.
offset_change = -1
return offset_change
# end of .adjust_reading_frame()
"""
Tries to find an 'earlier' start for the repeat sequence
by checking left (before) of the current offset. Assumes
the read is already in the proper reading frame.
"""
@staticmethod
def shift_start_left(read, locus, offset = 0):
# Start by assuming no change will occur.
offset_change = 0
# Kmer length acts as the shift unit (-/neg since we're shifting left).
shift_unit = -1 * locus.kmer_length
for i in range(1, OffsetFinder.max_iterations + 1):
start = offset + (i * shift_unit)
stop = offset + (i * shift_unit) + locus.kmer_length
if start < 0 or stop > read.length:
# Going out of boundaries on the read; stop.
break
if read.seq[start:stop] == locus.kmer:
# Found the kmer at the earlier offset position
offset_change = (i * shift_unit)
else:
# No longer within repeat sequence
break
return offset_change
# end of .shift_start_left()
"""
Basic helper method that checks if the current offset in the read
sequence matches the targeted k-mer sequence.
"""
@staticmethod
def at_target_kmer(read, locus, offset = 0):
return (read.seq[offset:offset+locus.kmer_length] == locus.kmer)
# end .at_target_kmer()
# end OffsetFinder class definition.