-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathVcfRecord.py
207 lines (162 loc) · 6.21 KB
/
VcfRecord.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import re
import sys
class VcfRecord(object):
""" reprsents a VCF dataline """
def __init__(self, chrom='.', pos='.', id='.', ref='.', alt='.', qual='.', filter='.', info='.'):
""" iniialize a VCF record """
self.chrom=chrom
self.pos=pos
self.id=id
self.ref=ref
self.alt=alt
self.qual=qual
self.filter=filter
self.info=info
self.genotypes=[] # list of VcfGenotype object as elements
def setChrom(self,chrom):
self.chrom=chrom
def setPos(self,pos):
self.pos=pos
def setId(self,id):
self.id=id
def setRef(self,ref):
self.ref=ref
def setAlt(self,alt):
self.alt=alt
def setQual(self,qual):
self.qual=qual
def setFilter(self,filter):
""" a VCF filter can have more than one filter tag """
if self.filter != '.' and self.filter != filter and self.filter!= "PASS": # catn tag the site twice with same filter id
filterstring=",".join( [ self.filter, filter])
self.filter=filterstring
else:
self.filter=filter
def setInfo(self,info):
self.info=info
def addInfo(self,info):
""" add more information to the infostring of the vcf record """
infostring=";".join([self.info, info])
self.info=infostring
def isTransition(self):
""" C <->T or A <->G is Transition; everything else is Transversion """
if self.ref == 'A':
if self.alt == 'G':
return True
else:
return False
elif self.ref == 'G':
if self.alt == 'A':
return True
else:
return False
elif self.ref == 'C':
if self.alt == 'T':
return True
else:
return False
elif self.ref == 'T':
if self.alt == 'C':
return True
else:
return False
else:
return None
def getChrom(self):
return self.chrom
def getPos(self):
return self.pos
def getId(self):
return self.id
def getRef(self):
return self.ref
def getAlt(self):
return self.alt
def getQual(self):
return self.qual
def getFilter(self):
return self.filter
def getInfo(self):
return self.info
def returnInfoDict(self):
""" return a dictionary made from the the info field
Splitting a semicolon-separated string to a dictionary, in Python
http://stackoverflow.com/a/186873 """
return dict(item.split("=") for item in self.info.split(";"))
def check_genotypeFormat(self, formatlist):
""" check the ids in the FORMAT column are contained in the list of FORMAT ids (formatlist) """
if len(self.genotypes) == 0:
sys.stderr.write("VCF file contains no genotype columns\n")
return
else:
self.genotypes[0].checkFormatIds(formatlist)
def checkInfoIds(self, infolist):
""" check the ids in the INFO field to make sure they are contained in the list of INFO ids ( the infolist) """
pattern= '(.*)='
infofields=self.info.split(';')
for elem in infofields:
if '=' not in elem:
id = elem
else:
id =re.search(pattern, elem).groups()[0]
if id == None:
sys.stderr.write("error in parsing INFO column in VcfRecord!\n")
exit(1)
else:
if id not in infolist and id != '.' :
sys.stderr.write(id + " not in ##INFO header!\n")
exit(1)
def appendInfoString(self, infostring ):
""" apppend to INFO string for a VCF record """
self.info+=";"+infostring
def addGenotype(self,genotypeobj):
""" append a VcfGenotype object to genotype list """
self.genotypes.append(genotypeobj)
def addGenotypeList(self, genotypelist):
""" given a list of VcfGenotype obj, set the genotype list for the record """
self.genotypes=genotypelist
def getGenotypes(self):
""" return the list of VcfGenotype objects of the VcfRecord """
return self.genotypes
def getGenotypesAlleles(self):
""" return list of tuples with (allele1, allele2) for each VcfGenotype object in genotypes list """
genotypeAlleles=[]
for vcfgenobj in self.genotypes:
genotypeAlleles.append( vcfgenobj.getAlleles() )
return genotypeAlleles
def allHets(self):
""" return true if all genotypes for the VcfREcord are hets """
for gobj in self.genotypes:
if gobj.isHet() == False:
return False
return True
def siteCallrate(self):
""" calculate the site callrate: #called_genotypes/#genotypes """
called=0
for gobj in self.genotypes:
if gobj.isCalled() == True:
called+=1
callrate=float(called)/float(len(self.genotypes))
return callrate
def sampleCallrate(self,samplist,samplecallsdict):
called=[]
ziplist= zip(samplist, self.genotypes)
for (sample, gobj) in ziplist:
if gobj.isCalled() == True:
samplecallsdict[sample]+=1
def zipGenotypes(self, samplelist):
""" return list of tuples [ (samp1,geno1, samp2, geno2) ... ] where the geno* is a VcfGenotype object """
return zip(samplelist, self.genotypes)
def toString(self):
outstring="\t".join([self.chrom,self.pos,self.id,self.ref,self.alt,self.qual,self.filter,self.info])
return outstring
def __str__(self):
return "\t".join([self.chrom,self.pos,self.id,self.ref,self.alt,self.qual,self.filter,self.info])
def toStringwithGenotypes(self):
outstring=self.toString()
formatstring=self.genotypes[0].getFormatString()
genotypestrings=[]
for g in self.genotypes:
genotypestrings.append( g.toString() )
genostring="\t".join(genotypestrings)
return outstring + "\t" + formatstring + "\t" + genostring