-
Notifications
You must be signed in to change notification settings - Fork 3
/
quarantine.py
266 lines (217 loc) · 8.13 KB
/
quarantine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import argparse
import logging
import multiprocessing # maybe should be using Ray, maybe Celery
import os
import Queue
import sys
import tempfile
import time
from random import shuffle
import glob
import shutil
from docking.autodock import runAutodock, prepDPFshell
from docking.autogrid import runAutogrid
from getjob import API, TrancheReader
from util import Receptor
parser = argparse.ArgumentParser()
parser.parse_args()
from raven import Client
client = Client('https://95200bce44ef41ae828324e243dc3240:[email protected]/6')
sentry_errors_log = logging.getLogger("sentry.errors")
sentry_errors_log.addHandler(logging.StreamHandler())
# Currently designed to be CPU or GPU centric processing, so you can launch one container of each type.
# TODO maybe change autodock.py to allow using all CPUs + GPUs
# Issue with multiprocessing and check_call worked around using this approach
# https://gist.github.com/ownport/63167dbb162f998964f309a5046bef58
#!/usr/bin/env python
import signal
import sys
def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
sys.exit(13)
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
#signal.pause()
def isGPU():
gpuBins = glob.glob('/AutoDock-GPU/bin/autodock_gpu_*')
if len(gpuBins) and os.path.exists(gpuBins[0]):
return True
else:
return False
'''
try:
1 / 0
except ZeroDivisionError:
client.captureException()
'''
'''
The primary loop for the client ....
To minimize bandwidth requirements on the UCSF Zinc database, clients will download single tranche files,
and generally stick with them for lengthy periods of time. Thus, the outer loop is a request to the server of
which tranche file should be processed.
'''
devmode = os.getenv('DEBUG') # if set, enters developer mode (contacts local server
USERNAME = os.getenv('ME') # if set, enters developer mode (contacts local server
cpu_count = multiprocessing.cpu_count() / 2 # assume hyperthreading so ignore half the logical cores
gpu_count = 2 # assuming one GPU job at a time, use nvidia-smi to see if you have <100% utilization
jobs_to_cache = cpu_count # have one job per core available, or 3-5 jobs if GPU running
POISON_PILL = "STOP"
def fetchLoop(work_new):
print("Fetch worker started")
client = API(USERNAME, dev=devmode)
while True:
trancheID, tranche = client.nextTranche() # contact server for a tranche assignment
TR = TrancheReader(trancheID, tranche, mirror=client.mirror) # then download and open this tranche for reading
# inner loop - which ligand models from this tranche file should we execute?
while True:
while work_new.qsize() > jobs_to_cache: # Puts the breaks on execution
time.sleep(1)
# get model number from server
ligandNum, receptors = client.nextLigand(trancheID) # ask server which ligand model number to execute
print('Server told us to work on model '+str(ligandNum))
try: zincID, model = TR.getModel(ligandNum) # parse out of Tranche file
except StopIteration:
client.trancheEOF(trancheID)
break
for receptorName in receptors:
receptor = Receptor(receptorName)
print('queueing docking algorithm on ' + receptor.name)
# Creating a temporary directory per job
dir = tempfile.mkdtemp() # .TemporaryDirectory()
work_new.put([dir, receptor.name, trancheID, ligandNum, client])
#dir = os.path.join(os.getcwd(), 'receptors', receptor)
if not os.path.exists(dir): # if a new receptor has been deployed, but we don't have it, stop the client and run git-pull
raise ValueError("Creating temporary file failed")
#sys.exit(1)
# Need to copy receptor file
#receptor_dir = os.path.join(os.getcwd(), 'receptors', receptor)
receptor_dir = receptor.dir # new object handles receptor downloads, etc
print("Copying "+receptor_dir+" to "+dir)
try:
#shutil.copytree(receptor_dir, dir)
shutil.copy(receptor_dir+'/template.gpf', dir)
shutil.copy(receptor_dir+'/receptor.pdbqt', dir)
except OSError:
pass
#check_call([ls, dir])
TR.saveModel(model, outfile=os.path.join(dir, 'ligand.pdbqt'))
sys.stdout.flush() # make sure child processes print
time.sleep(0.5) # slow down to help syncing, server
return
class CpuConsumer(multiprocessing.Process):
def __init__(self, work_new, work_gpu):
multiprocessing.Process.__init__(self)
self.work_new = work_new
self.work_gpu = work_gpu
# Run one of these per physical core
def run(self):
proc_name = self.name
print("CPU worker started")
while True:
while self.work_new.empty() or self.work_gpu.qsize() >= jobs_to_cache:
time.sleep(1)
dir, receptor, trancheID, ligandNum, client = self.work_new.get()
if dir is POISON_PILL: # poison pill to exit
print('CPU {}: CPU exiting'.format(proc_name))
self.work_new.task_done()
break
print('CPU {}: {}'.format(proc_name, dir))
start = time.time()
# Tasks that run on the CPU
runAutogrid(cwd=dir)
prepDPFshell(cwd=dir)
# Tasks that may run on the GPU
if not isGPU():
print('running docking algorithm on CPU in '+str(dir))
results, logFile = runAutodock(cwd=dir)
end = time.time()
results['time'] = end - start
results['receptor'] = receptor
results['tranche'] = trancheID
results['ligand'] = ligandNum
# FIXME - different autodock versions have different logfile formats - some don't export ligand name
client.reportResults(results, logFile)
shutil.rmtree(dir) # hack to remove temporaryDirectory w/o context (https://stackoverflow.com/questions/6884991/how-to-delete-a-directory-created-with-tempfile-mkdtemp)
else:
self.work_gpu.put([dir, receptor, trancheID, ligandNum, client]) # hand work over to GPU
self.work_new.task_done()
sys.stdout.flush()
print('CPU work_new='+str(self.work_new.qsize())+' GPU work_gpu='+str(self.work_gpu.qsize()))
return
# Only run if GPU version and probably only use one
class GpuConsumer(multiprocessing.Process):
def __init__(self, work_gpu):
multiprocessing.Process.__init__(self)
self.work_gpu = work_gpu
# Run one of these per physical core
def run(self):
proc_name = self.name
print("GPU worker started")
while True:
while self.work_gpu.empty():
time.sleep(0.2)
dir, receptor, trancheID, ligandNum, client = self.work_gpu.get()
if dir is POISON_PILL: # poison pill to exit
print('GPU {}: GPU exiting'.format(proc_name))
self.work_gpu.task_done()
break
print('GPU {}: {}'.format(proc_name, dir))
start = time.time()
print('running docking algorithm on GPU in '+str(dir))
try:
results, logFile = runAutodock(cwd=dir) # Exception list index out of range in parsers.py:34 parseLogFile
end = time.time()
results['time'] = end - start
results['receptor'] = receptor
results['tranche'] = trancheID
results['ligand'] = ligandNum
# FIXME - different autodock versions have different logfile formats - some don't export ligand name
client.reportResults(results, logFile)
except:
pass
shutil.rmtree(dir)
self.work_gpu.task_done()
sys.stdout.flush()
return
def dispatchCenter():
work_new = multiprocessing.JoinableQueue()
work_gpu = multiprocessing.JoinableQueue()
if not isGPU():
worker_count = cpu_count
else: # is GPU
worker_count = 1 # no need for overkill, one cpu for unpack
jobs_to_cache = 3 # probably enough buffering to keep GPU fed, else up to 5
print("CPU workers count = "+str(cpu_count))
# Loops getting stuck, trying this approach https://stackoverflow.com/questions/29571671/basic-multiprocessing-with-while-loop
if isGPU():
cpu_consumers = [
CpuConsumer(work_new, work_gpu)
for i in range(worker_count)
]
for cw in cpu_consumers:
cw.start()
gpu_consumers = [
GpuConsumer(work_gpu)
for i in range(gpu_count)
]
for gw in gpu_consumers:
gw.start()
else:
cpu_consumers = [
CpuConsumer(work_new, work_gpu)
for i in range(worker_count)
]
for cw in cpu_consumers:
cw.start()
print("Starting fetching process")
fetchLoop(work_new)
for i in range(worker_count):
work_new.put(POISON_PILL)
work_new.join()
if isGPU():
for i in range(gpu_count):
work_gpu.put(POISON_PILL)
work_gpu.join()
return
if __name__ == '__main__':
dispatchCenter()