-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathfs-peptide.py
278 lines (223 loc) · 8.44 KB
/
fs-peptide.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""Complete working example for figure 1.
Prerequisites:
This script assumes it is running in a local copy of the
https://github.com/kassonlab/gmxapi-tutorials repository. It will look for input
files relative to the location of this file.
After installing GROMACS and gmxapi, execute this script with a Python 3.7+ interpreter.
For a trajectory ensemble, use `mpiexec` and `mpi4py`. For example, for an ensemble of
size 50, activate your gmxapi Python virtual environment and run
mpiexec -n 50 `which python` -m mpi4py fs-peptide.py --cores 50
"""
import argparse
import logging
import os
import typing
from pathlib import Path
# Configure logging module before gmxapi.
logging.basicConfig(level=logging.INFO)
import gmxapi as gmx
import gmxapi.abc
from gmxapi import function_wrapper
from gmxapi.exceptions import UsageError
logging.info(f'gmxapi Python package version {gmx.__version__}')
assert gmx.version.has_feature('mdrun_runtime_args')
assert gmx.version.has_feature('container_futures')
assert gmx.version.has_feature('mdrun_checkpoint_output')
try:
from mpi4py import MPI
rank_number = MPI.COMM_WORLD.Get_rank()
ensemble_size = MPI.COMM_WORLD.Get_size()
except ImportError:
rank_number = 0
ensemble_size = 1
rank_tag = ''
MPI = None
else:
rank_tag = 'rank{}:'.format(rank_number)
# Update the logging output.
log_format = '%(levelname)s %(name)s:%(filename)s:%(lineno)s %(rank_tag)s%(message)s'
for handler in logging.getLogger().handlers:
handler.setFormatter(logging.Formatter(log_format))
parser = argparse.ArgumentParser()
parser.add_argument(
'--cores',
default=os.cpu_count(),
type=int,
help='The total number of cores allocated for the job.'
)
parser.add_argument(
'--maxh',
type=float,
default=0.1,
help='Maximum time for a single simulation in this module.'
)
args = parser.parse_args()
maxh = args.maxh
allocation_size = args.cores
try:
local_cpu_set_size = len(os.sched_getaffinity(0))
except (NotImplementedError, AttributeError):
threads_per_rank = allocation_size // ensemble_size
else:
threads_per_rank = min(local_cpu_set_size, allocation_size // ensemble_size)
#
# Define some helpers
#
def less_than(lhs: typing.SupportsFloat, rhs: typing.SupportsFloat):
"""Compare the left-hand-side to the right-hand-side.
Follows the Numpy logic for normalizing the numeric types of *lhs* and *rhs*.
"""
import numpy as np
dtype = int
if any(isinstance(operand, float) for operand in (lhs, rhs)):
dtype = float
elif all(isinstance(operand, typing.SupportsFloat) for operand in (lhs, rhs)):
if type(np.asarray([lhs, rhs])[0].item()) is float:
dtype = float
elif any(isinstance(operand, gmxapi.abc.Future) for operand in (lhs, rhs)):
for item in (lhs, rhs):
if hasattr(item, 'dtype'):
if issubclass(item.dtype, (float, np.float)):
dtype = float
elif hasattr(item, 'description'):
if issubclass(item.description.dtype, (float, np.float)):
dtype = float
else:
raise UsageError(f'No handling for [{repr(lhs)}, {repr(rhs)}].')
if dtype is int:
def _less_than(lhs: int, rhs: int) -> bool:
return lhs < rhs
elif dtype is float:
def _less_than(lhs: float, rhs: float) -> bool:
return lhs < rhs
else:
raise UsageError('Operation only supports standard numeric types.')
return function_wrapper()(_less_than)(lhs=lhs, rhs=rhs)
@function_wrapper()
def _numpy_min_float(data: gmx.NDArray) -> float:
import numpy as np
logging.info(f'Looking for minimum in {data}')
return float(np.min(data._values))
def numeric_min(array):
"""Find the minimum value in an array.
"""
return _numpy_min_float(data=array)
@gmx.function_wrapper(output={'data': gmx.NDArray})
def xvg_to_array(path: str, output):
"""Get an NxM array from a GROMACS xvg data file.
For energy output, N is the number of samples, and the first of M
columns contains simulation time values.
"""
import numpy
logging.info(f'Reading xvg file {path}.')
data = numpy.genfromtxt(path, comments='@', skip_header=14)
logging.info(f'Read array shape {data.shape} from {path}.')
if len(data.shape) == 1:
# Trajectory was too short. Only a single line was read.
assert data.shape[0] == 2
data = data.reshape((1, 2))
assert len(data.shape) == 2
output.data = data[:, 1]
######################
# Confirm inputs exist
#
_script_dir = Path(__file__).parent
input_dir = _script_dir.parent.resolve() / 'input_files' / 'fs-peptide'
if not all(p.exists() for p in (input_dir, input_dir / 'start0.pdb', input_dir / 'ref.pdb')):
raise RuntimeError(f'Did not find input files in {input_dir}.')
reference_struct = input_dir / 'ref.pdb'
######################
# Complete code from figure 1
#
def figure1a():
"""Figure 1a: gmxapi command-line operation.
Prepare a molecular model from a PDB file using the `pdb2gmx` GROMACS tool.
"""
# Figure 1a code
args = ['pdb2gmx', '-ff', 'amber99sb-ildn', '-water', 'tip3p']
input_files = {'-f': os.path.join(input_dir, 'start0.pdb')}
output_files = {
'-p': 'topol.top',
'-i': 'posre.itp',
'-o': 'conf.gro'}
make_top = gmx.commandline_operation('gmx', args, input_files, output_files)
return make_top
def figure1b(make_top):
"""Figure 1b: gmxapi command on ensemble input
Call the GROMACS MD preprocessor to create a simulation input file. Declare an
ensemble simulation workflow starting from the single input file.
Args:
make_top operation handle, as generated in `figure1a`
Returns:
Trajectory output. (list, if ensemble simulation)
"""
# Confirm inputs exist.
make_top.run()
assert os.path.exists(make_top.output.file['-o'].result())
assert os.path.exists(make_top.output.file['-p'].result())
cmd_dir = input_dir
assert os.path.exists(input_dir / 'grompp.mdp')
# Figure 1b code.
grompp_input_files = {'-f': os.path.join(cmd_dir, 'grompp.mdp'),
'-c': make_top.output.file['-o'],
'-p': make_top.output.file['-p']}
# make array of inputs
N = ensemble_size
grompp = gmx.commandline_operation(
'gmx',
['grompp'],
input_files=[grompp_input_files] * N,
output_files={'-o': 'run.tpr'})
tpr_input = grompp.output.file['-o'].result()
input_list = gmx.read_tpr(tpr_input)
md = gmx.mdrun(input_list, runtime_args={'-maxh': str(maxh)})
md.run()
return {
'input_list': input_list,
'trajectory': md.output.trajectory.result()
}
def figure1c(input_list):
"""Figure 1c: looping and custom operations"""
subgraph = gmx.subgraph(
variables={
'found_native': False,
'checkpoint': '',
'min_rms': 1e6})
with subgraph:
md = gmx.mdrun(
input_list,
runtime_args={
'-cpi': subgraph.checkpoint,
'-maxh': str(maxh),
'-noappend': None,
'-nt': str(threads_per_rank)
})
subgraph.checkpoint = md.output.checkpoint
rmsd = gmx.commandline_operation(
'gmx', ['rms'],
input_files={
'-s': reference_struct,
'-f': md.output.trajectory},
output_files={'-o': 'rmsd.xvg'},
stdin='Backbone Backbone\n'
)
subgraph.min_rms = numeric_min(
xvg_to_array(rmsd.output.file['-o']).output.data).output.data
subgraph.found_native = less_than(lhs=subgraph.min_rms, rhs=0.3).output.data
folding_loop = gmx.while_loop(
operation=subgraph,
condition=gmx.logical_not(subgraph.found_native),
# max_iteration=10
)()
logging.info('Beginning folding_loop.')
folding_loop.run()
logging.info(f'Finished folding_loop. min_rms: {folding_loop.output.min_rms.result()}')
return folding_loop
if __name__ == '__main__':
make_top = figure1a()
logging.info('Created a handle to a commandline operation.')
input_list, trajectory = figure1b(make_top).values()
assert input_list.output.ensemble_width == ensemble_size
folding_loop = figure1c(input_list)
results = {key: getattr(folding_loop.output, key).result() for key in folding_loop.output.keys()}
logging.info(f'Folding loop result: {results}')