-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path07_correspondence.py
executable file
·142 lines (106 loc) · 3.91 KB
/
07_correspondence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
"""Use marker scores and mapping positions to create corresponding positions
between the VCFs
Usage:
<program> input_scores window_size output_correspondence
Where window_size is the number of neighbor SNPs to consider on each side.
"""
# Modules
from scipy.stats import pearsonr
import sys
# Functions
def keep_snp(past, now, future):
# Score of current locus
if float(now[0]) > 0.5:
return True
else:
# Organize infos for past, now, and future into pandas dataframe
infos = [x for x in past + [now] + future]
# Keep only SNPs in same chromosome
chromosome = now[2]
infos = [x for x in infos if x[2] == chromosome]
if len(infos) < 2 * window_size:
return False
# Skip if closest neighbours are too far (span more than 5Kbp by SNP in window_size)
# Don't use info of too sparse markers
left_pos = infos[0][4]
right_pos = infos[1][4]
if (int(right_pos) - int(left_pos)) > (2 * window_size * 5000):
return False
# Compute useful neighbourhood metrix
scores = [float(x[0]) for x in infos]
average = round(sum(scores) / len(scores), 2)
if average < 0.4:
return False
num_negative = len([x for x in scores if x <= 0.0])
if num_negative > window_size / 2:
return False
# Is Pearson coef for scores close to 1?
try:
pos1 = [int(x[4]) for x in infos]
pos2 = [int(x[6]) for x in infos]
pearson = abs(pearsonr(pos1, pos2)[0])
except:
pearson = 0.0
if pearson >= 0.99:
return True
# Parsing user input
try:
input_scores = sys.argv[1]
window_size = int(sys.argv[2])
output_correspondence = sys.argv[3]
num_cpus = int(sys.argv[4])
except:
print(__doc__)
sys.exit(1)
# Read input_scores into 3 compartments past, now, future
past = []
now = None
future = []
file_num = int(input_scores.strip().split("/")[1].split(".")[1])
with open(input_scores, "rt") as infile:
with open(output_correspondence, "wt") as outfile:
for line in infile:
# Header
if line.startswith("Score"):
outfile.write(line)
continue
#Score, Penalties, QueryScaffold, QueryName, QueryPos, TargetChrom, TargetPos, Reversed
l = line.strip().split("\t")
# Get first info line
if not now:
if file_num == 0:
now = l
# Fill future list
while len(future) < window_size:
l = infile.readline().strip().split("\t")
future.append(l)
else:
past.append(l)
while len(past) < window_size:
l = infile.readline().strip().split("\t")
past.append(l)
now = infile.readline().strip().split("\t")
while len(future) < window_size:
l = infile.readline().strip().split("\t")
future.append(l)
# Slide past, now, and future one step forward
else:
past.append(now)
now = future.pop(0)
future.append(l)
if len(past) > window_size:
past.pop(0)
# Evaluate SNPs
if keep_snp(past, now, future):
outfile.write("\t".join(now) + "\n")
# Treat last SNPs of the file
if file_num == num_cpus - 1:
while future:
past.append(now)
now = future.pop(0)
if len(past) > window_size:
past.pop(0)
# Evaluate SNPs
if keep_snp(past, now, future):
outfile.write("\t".join(now) + "\n")