This repository has been archived by the owner on Nov 17, 2021. It is now read-only.
forked from StanfordBioinformatics/Scoring
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreplicate_scoring.py
executable file
·164 lines (139 loc) · 5.23 KB
/
replicate_scoring.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/bin/env python
# This script does not seem to be used in the pipeline. (It seems to
# be functionally similar to the replicate_scoring functions in the
# individual peakcaller modules.
import math
import sys
import os
import glob
from conf import ConfigSample
from bed import PeakSeqBEDParser
bed_parser = PeakSeqBEDParser()
TOP_OVERLAP_HITS = 0.4
class Replicate:
def __init__(self, name, mapped_reads_file, results_dir, eland_files=None):
self.name = name
self.mapped_reads_file = mapped_reads_file
self.results_dir = results_dir
self.hits = None
self.hits_file = None
self.q_val = None
self.num_reads = -1
self.rep_vs_reps = []
if not eland_files:
self.eland_files = []
else:
self.eland_files = eland_files
def num_of_reads(self):
if not os.path.exists(self.mapped_reads_file):
return 0
if self.num_reads < 0:
self.num_reads = sum(1 for line in open(self.mapped_reads_file))
return self.num_reads
def form_hits(self, hits_file, q_val):
f = open(os.path.join(self.results_dir, hits_file))
self.hits = [bed_parser.parse(line) for line in f]
if not self.hits:
raise Exception("Hits file %s empty" % hits_file)
self.hits_file = hits_file
self.q_val = q_val
def output(self):
output_str = '[%s]\n' % self.name
if self.q_val:
output_str += 'q_value=%f\n' % (self.q_val)
output_str += 'mapped_reads=%i\n' % self.num_of_reads()
output_str += 'eland_files=%s\n' % str(self.eland_files)
output_str += 'total_hits=%i\n' % len(self.hits)
if self.hits_file:
output_str += 'filtered_bed=%s\n' % (os.path.join(self.results_dir, self.hits_file))
for chr_bed_file in glob.glob(os.path.join(self.results_dir, '*_hits.bed')):
output_str += 'chr_bed=%s\n' % (chr_bed_file)
for chr_sgr_file in glob.glob(os.path.join(os.path.join(self.results_dir, 'sgr/'), '*.sgr')):
output_str += 'chr_sgr=%s\n' % (chr_sgr_file)
for r in self.rep_vs_reps:
output_str += r.output()
return output_str
class ReplicateByReplicate:
def __init__(self, rep1, rep2, mapped_reads_ratio, hits_ratio, percent_overlap):
self.rep1 = rep1
self.rep2 = rep2
self.mapped_reads_ratio = mapped_reads_ratio
self.hits_ratio = hits_ratio
self.percent_overlap = percent_overlap
def output(self):
output_str = '(%s, %s)\n' % (self.rep1.name, self.rep2.name)
output_str += 'mapped_reads_ratio=%f\n' % self.mapped_reads_ratio
output_str += 'hits_ratio=%f\n' % self.hits_ratio
output_str += 'percent_overlap=%f\n' % self.percent_overlap
return output_str
def ratio(x, y):
if y == 0:
return 0
return float(x) / float(y)
def overlaps(hit1, hit2):
if not hit1.chr == hit2.chr:
return False
elif hit1.stop < hit2.start:
return False
elif hit1.start > hit2.stop:
return False
else:
return True
def calculate_overlap(hits1, hits2, percent_of_hits):
"""Calculates the percentage of hits which overlap.
Args:
hits1: A list of BED annotations
hits2: A list of BED annotations
percent_of_hits: A float of the top percentage of hits to compare
Returns:
The percentage of hits from hits1 which overlapped a hit from hits2
"""
min_len = min(len(hits1), len(hits2)) # Truncate hit lists to length of smaller list
hits1_trunk = hits1[:min_len]
hits2_trunk = hits2[:min_len]
total_checked = max(int(min_len * percent_of_hits), 1)
total_overlap = 0
for h1 in hits1_trunk[:total_checked]:
for h2 in hits2_trunk:
if overlaps(h1, h2):
total_overlap += 1
break
return float(total_overlap) / float(total_checked)
def replicate_stats(rep1, rep2):
mapped_reads_ratio = ratio(rep1.num_of_reads(), rep2.num_of_reads())
hits_ratio = ratio(len(rep1.hits), len(rep2.hits))
percent_overlap = calculate_overlap(rep1.hits, rep2.hits, TOP_OVERLAP_HITS)
return ReplicateByReplicate(rep1, rep2, mapped_reads_ratio, hits_ratio, percent_overlap)
def common_thresholds(threshold_lists):
common = threshold_lists[0]
for tl in threshold_lists[1:]:
for threshold in common:
if threshold not in tl:
common.remove(threshold)
return common
def build_report(combined, replicates, output_file):
f = open(output_file, 'w')
f.write(combined.output())
for r1 in replicates:
for r2 in replicates:
if r1 == r2:
continue
rbyr = replicate_stats(r1, r2)
r1.rep_vs_reps.append(rbyr)
f.write(r1.output())
f.close()
for r in replicates:
r.rep_vs_reps = []
if __name__ == '__main__':
if len(sys.argv) < 3:
print "Usage: replicate_scoring.py output_file sample_config_file [top_overlap_hits]"
raise SystemExit(1)
if len(sys.argv) == 4:
TOP_OVERLAP_HITS = float(sys.argv[3])
config = ConfigSample(sys.argv[2])
replicates = [Replicate('Rep%i' % (i+1), os.path.join(config.TEMP_DIR, config.RUN_NAME + '_%i' % (i+1), '%s_merged_eland.txt' % (config.RUN_NAME + '_%i' % (i+1))), os.path.join(config.RESULTS_DIR, 'Rep%i' % (i+1)), eland_files=reads) for i, reads in enumerate(config.REPLICATES)]
all_replicates = Replicate('RepAll', os.path.join(config.TEMP_DIR, config.RUN_NAME + '_All', '%s_merged_eland.txt' % (config.RUN_NAME + '_All')), os.path.join(config.RESULTS_DIR, 'RepAll'))
for q_val in config.Q_VALUE_THRESHOLDS:
for r in replicates + [all_replicates,]:
r.form_hits('%s_%f_hits_filtered.bed' % (r.name, q_val), q_val)
build_report(all_replicates, replicates, sys.argv[1] + '.%f' % q_val)