-
Notifications
You must be signed in to change notification settings - Fork 2
/
polarise_vcf_indels.py
executable file
·176 lines (156 loc) · 5.82 KB
/
polarise_vcf_indels.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python
from __future__ import print_function
import argparse
# arguments
parser = argparse.ArgumentParser()
parser.add_argument('-vcf', help='VCF file containing insertion and deletion variants to polarise', required=True)
parser.add_argument('-align_data',
help='Text file in bed type format with alignment sequences corresponding to '
'INDEL positions, generated using INDELsfromMAF.py',
required=True)
parser.add_argument('-target_spp', help='Species of samples in VCF file, corresponding to align data file header',
required=True)
parser.add_argument('-no_vcf', default=False, action='store_true', help=argparse.SUPPRESS)
parser.add_argument('-out_unaligned', default=False, action='store_true', help=argparse.SUPPRESS)
args = parser.parse_args()
# variables
vcf_file = args.vcf
if not args.no_vcf:
annotated_vcf = open(vcf_file.replace('.vcf', '.polarised.vcf'), 'w')
align_file = args.align_data
spp = args.target_spp
# build alignment data dictionary
spp_list = []
align_data = {}
for line in open(align_file):
line = line.rstrip('\n').split('\t')
if line[0] == 'ID':
spp_list = line[3:]
else:
chromo = line[0].split('.')[1]
vcf_position = str(int(line[1]) + 1)
seq_key = chromo + '_' + vcf_position
sequences = line[3:]
align_data[seq_key] = {spp_list[i]: sequences[i].split(',') for i in range(0, len(spp_list))}
# counters
counter = 0
match = 0
no_hotspot = 0
low_coverage = 0
ambiguous = 0
not_aligned = 0
# loop through vcf file and annotate INDELs
previous_line = ''
for line in open(vcf_file):
if line.startswith('#'):
if line.startswith('##contig') and previous_line.startswith('##INFO'):
new_info = '##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">\n'
if not args.no_vcf:
annotated_vcf.write(new_info)
previous_line = line
if not args.no_vcf:
annotated_vcf.write(line)
else:
counter += 1
orig_line = line
line = line.split('\t')
chrom = line[0]
pos = line[1]
ref = line[3]
alt = line[4]
info = line[7]
try:
alignment_info = align_data[chrom + '_' + pos]
except KeyError:
if args.out_unaligned:
print(line)
not_aligned += 1
if not args.no_vcf:
annotated_vcf.write(orig_line)
continue
# identify flanking INDELs and positions without full coverage
neighbour_bp = [[x[0], x[2]] for x in alignment_info.values()]
full_coverage = True
neighbouring_deletions = False
for bp in neighbour_bp:
if bp[0] == '-' or bp[1] == '-':
neighbouring_deletions = True
break
if bp[0] == '.' or bp[1] == '.':
full_coverage = False
break
# identify indel hotspots
indel_sequences = [y[1].rstrip('-') for y in alignment_info.values()]
hotspot = False
for indel in indel_sequences:
if '-' in indel:
hotspot = True
break
# skips sites where ref allele differs from that in alignment, ie insertion within INDEL
if not ref == alignment_info[spp][1].rstrip('-').upper():
no_hotspot += 1
if not args.no_vcf:
annotated_vcf.write(orig_line)
continue
# skip sites with multiple INDELs in outgroups
elif hotspot is True:
no_hotspot += 1
if not args.no_vcf:
annotated_vcf.write(orig_line)
continue
# Skip sites without full species coverage
elif full_coverage is False:
low_coverage += 1
if not args.no_vcf:
annotated_vcf.write(orig_line)
continue
# skips sites that are flanked by INDELs
elif neighbouring_deletions is True:
no_hotspot += 1
if not args.no_vcf:
annotated_vcf.write(orig_line)
continue
else:
# identify if ref or alt is ancestral
target_seq = alignment_info[spp][1]
out_group_seqs = [alignment_info[out_spp][1] for out_spp in alignment_info.keys() if out_spp != spp]
ref_anc = True
alt_anc = True
# identify ref ancestral
for sequence in out_group_seqs:
if len(ref) != len(sequence.rstrip('-')):
ref_anc = False
break
# identify alt ancestral
for sequence in out_group_seqs:
if len(alt) != len(sequence.rstrip('-')):
alt_anc = False
break
# skip ambiguous sites
if alt_anc is ref_anc:
ambiguous += 1
if not args.no_vcf:
annotated_vcf.write(orig_line)
continue
# set AA annotation
aa = 'NONE'
if ref_anc is True:
aa = ';AA=' + ref
elif alt_anc is True:
aa = ';AA=' + alt
# write annotation
if not aa == 'NONE':
polarised_line = '\t'.join(line[0:7]) + '\t' + info + aa + '\t' + '\t'.join(line[8:])
if not args.no_vcf:
annotated_vcf.write(polarised_line)
match += 1
print('category\tcount\n'
'total\t' + str(counter) + '\n'
'polarised\t' + str(match) + '\n'
'hotspot\t' + str(no_hotspot) + '\n'
'low_coverage\t' + str(low_coverage) + '\n'
'not_aligned\t' + str(not_aligned) + '\n'
'ambiguous\t' + str(ambiguous) + '\n'
'unpolarised\t' + str(counter - match))
if not args.no_vcf:
annotated_vcf.close()