Skip to content

Commit 9d1e27f

Browse files
authored
Merge pull request #231 from djarecka/enh/bosh
i will merge this, and any other things could be added in different PRs
2 parents 020181d + 65cdd56 commit 9d1e27f

File tree

9 files changed

+447
-32
lines changed

9 files changed

+447
-32
lines changed

pydra/engine/boutiques.py

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
import typing as ty
2+
import json
3+
import attr
4+
from urllib.request import urlretrieve
5+
from pathlib import Path
6+
from functools import reduce
7+
8+
from ..utils.messenger import AuditFlag
9+
from ..engine import ShellCommandTask
10+
from ..engine.specs import SpecInfo, ShellSpec, ShellOutSpec, File, attr_fields
11+
from .helpers_file import is_local_file
12+
13+
14+
class BoshTask(ShellCommandTask):
15+
"""Shell Command Task based on the Boutiques descriptor"""
16+
17+
def __init__(
18+
self,
19+
zenodo_id=None,
20+
bosh_file=None,
21+
audit_flags: AuditFlag = AuditFlag.NONE,
22+
cache_dir=None,
23+
input_spec_names: ty.Optional[ty.List] = None,
24+
messenger_args=None,
25+
messengers=None,
26+
name=None,
27+
output_spec_names: ty.Optional[ty.List] = None,
28+
rerun=False,
29+
strip=False,
30+
**kwargs,
31+
):
32+
"""
33+
Initialize this task.
34+
35+
Parameters
36+
----------
37+
zenodo_id: :obj: str
38+
Zenodo ID
39+
bosh_file : : str
40+
json file with the boutiques descriptors
41+
audit_flags : :obj:`pydra.utils.messenger.AuditFlag`
42+
Auditing configuration
43+
cache_dir : :obj:`os.pathlike`
44+
Cache directory
45+
input_spec_names : :obj: list
46+
Input names for input_spec.
47+
messenger_args :
48+
TODO
49+
messengers :
50+
TODO
51+
name : :obj:`str`
52+
Name of this task.
53+
output_spec_names : :obj: list
54+
Output names for output_spec.
55+
strip : :obj:`bool`
56+
TODO
57+
58+
"""
59+
self.cache_dir = cache_dir
60+
if (bosh_file and zenodo_id) or not (bosh_file or zenodo_id):
61+
raise Exception("either bosh or zenodo_id has to be specified")
62+
elif zenodo_id:
63+
self.bosh_file = self._download_spec(zenodo_id)
64+
else: # bosh_file
65+
self.bosh_file = bosh_file
66+
67+
with self.bosh_file.open() as f:
68+
self.bosh_spec = json.load(f)
69+
70+
self.input_spec = self._prepare_input_spec(names_subset=input_spec_names)
71+
self.output_spec = self._prepare_output_spec(names_subset=output_spec_names)
72+
self.bindings = ["-v", f"{self.bosh_file.parent}:{self.bosh_file.parent}:ro"]
73+
74+
super(BoshTask, self).__init__(
75+
name=name,
76+
input_spec=self.input_spec,
77+
output_spec=self.output_spec,
78+
executable=["bosh", "exec", "launch"],
79+
args=["-s"],
80+
audit_flags=audit_flags,
81+
messengers=messengers,
82+
messenger_args=messenger_args,
83+
cache_dir=self.cache_dir,
84+
strip=strip,
85+
rerun=rerun,
86+
**kwargs,
87+
)
88+
self.strip = strip
89+
90+
def _download_spec(self, zenodo_id):
91+
"""
92+
usind boutiques Searcher to find url of zenodo file for a specific id,
93+
and download the file to self.cache_dir
94+
"""
95+
from boutiques.searcher import Searcher
96+
97+
searcher = Searcher(zenodo_id, exact_match=True)
98+
hits = searcher.zenodo_search().json()["hits"]["hits"]
99+
if len(hits) == 0:
100+
raise Exception(f"can't find zenodo spec for {zenodo_id}")
101+
elif len(hits) > 1:
102+
raise Exception(f"too many hits for {zenodo_id}")
103+
else:
104+
zenodo_url = hits[0]["files"][0]["links"]["self"]
105+
zenodo_file = self.cache_dir / f"zenodo.{zenodo_id}.json"
106+
urlretrieve(zenodo_url, zenodo_file)
107+
return zenodo_file
108+
109+
def _prepare_input_spec(self, names_subset=None):
110+
""" creating input spec from the zenodo file
111+
if name_subset provided, only names from the subset will be used in the spec
112+
"""
113+
binputs = self.bosh_spec["inputs"]
114+
self._input_spec_keys = {}
115+
fields = []
116+
for input in binputs:
117+
name = input["id"]
118+
if names_subset is None:
119+
pass
120+
elif name not in names_subset:
121+
continue
122+
else:
123+
names_subset.remove(name)
124+
if input["type"] == "File":
125+
tp = File
126+
elif input["type"] == "String":
127+
tp = str
128+
elif input["type"] == "Number":
129+
tp = float
130+
elif input["type"] == "Flag":
131+
tp = bool
132+
else:
133+
tp = None
134+
# adding list
135+
if tp and "list" in input and input["list"]:
136+
tp = ty.List[tp]
137+
138+
mdata = {
139+
"help_string": input.get("description", None) or input["name"],
140+
"mandatory": not input["optional"],
141+
"argstr": input.get("command-line-flag", None),
142+
}
143+
fields.append((name, tp, mdata))
144+
self._input_spec_keys[input["value-key"]] = "{" + f"{name}" + "}"
145+
if names_subset:
146+
raise RuntimeError(f"{names_subset} are not in the zenodo input spec")
147+
spec = SpecInfo(name="Inputs", fields=fields, bases=(ShellSpec,))
148+
return spec
149+
150+
def _prepare_output_spec(self, names_subset=None):
151+
""" creating output spec from the zenodo file
152+
if name_subset provided, only names from the subset will be used in the spec
153+
"""
154+
boutputs = self.bosh_spec["output-files"]
155+
fields = []
156+
for output in boutputs:
157+
name = output["id"]
158+
if names_subset is None:
159+
pass
160+
elif name not in names_subset:
161+
continue
162+
else:
163+
names_subset.remove(name)
164+
path_template = reduce(
165+
lambda s, r: s.replace(*r),
166+
self._input_spec_keys.items(),
167+
output["path-template"],
168+
)
169+
mdata = {
170+
"help_string": output.get("description", None) or output["name"],
171+
"mandatory": not output["optional"],
172+
"output_file_template": path_template,
173+
}
174+
fields.append((name, attr.ib(type=File, metadata=mdata)))
175+
176+
if names_subset:
177+
raise RuntimeError(f"{names_subset} are not in the zenodo output spec")
178+
spec = SpecInfo(name="Outputs", fields=fields, bases=(ShellOutSpec,))
179+
return spec
180+
181+
def _command_args_single(self, state_ind, ind=None):
182+
"""Get command line arguments for a single state"""
183+
input_filepath = self._bosh_invocation_file(state_ind=state_ind, ind=ind)
184+
cmd_list = (
185+
self.inputs.executable
186+
+ [str(self.bosh_file), input_filepath]
187+
+ self.inputs.args
188+
+ self.bindings
189+
)
190+
return cmd_list
191+
192+
def _bosh_invocation_file(self, state_ind, ind=None):
193+
"""creating bosh invocation file - json file with inputs values"""
194+
input_json = {}
195+
for f in attr_fields(self.inputs):
196+
if f.name in ["executable", "args"]:
197+
continue
198+
if self.state and f"{self.name}.{f.name}" in state_ind:
199+
value = getattr(self.inputs, f.name)[state_ind[f"{self.name}.{f.name}"]]
200+
else:
201+
value = getattr(self.inputs, f.name)
202+
# adding to the json file if specified by the user
203+
if value is not attr.NOTHING and value != "NOTHING":
204+
if is_local_file(f):
205+
value = Path(value)
206+
self.bindings.extend(["-v", f"{value.parent}:{value.parent}:ro"])
207+
value = str(value)
208+
209+
input_json[f.name] = value
210+
211+
filename = self.cache_dir / f"{self.name}-{ind}.json"
212+
with open(filename, "w") as jsonfile:
213+
json.dump(input_json, jsonfile)
214+
215+
return str(filename)

pydra/engine/specs.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,8 @@ def collect_additional_outputs(self, input_spec, inputs, output_dir):
311311
if fld.type is File:
312312
# assuming that field should have either default or metadata, but not both
313313
if (
314-
not (fld.default is None or fld.default == attr.NOTHING)
315-
and fld.metadata
316-
):
314+
fld.default is None or fld.default == attr.NOTHING
315+
) and not fld.metadata: # TODO: is it right?
317316
raise Exception("File has to have default value or metadata")
318317
elif not fld.default == attr.NOTHING:
319318
additional_out[fld.name] = self._field_defaultvalue(
@@ -360,9 +359,23 @@ def _field_metadata(self, fld, inputs, output_dir):
360359
if "value" in fld.metadata:
361360
return output_dir / fld.metadata["value"]
362361
elif "output_file_template" in fld.metadata:
363-
return output_dir / fld.metadata["output_file_template"].format(
364-
**inputs.__dict__
362+
sfx_tmpl = (output_dir / fld.metadata["output_file_template"]).suffixes
363+
if sfx_tmpl:
364+
# removing suffix from input field if template has it's own suffix
365+
inputs_templ = {
366+
k: v.split(".")[0]
367+
for k, v in inputs.__dict__.items()
368+
if isinstance(v, str)
369+
}
370+
else:
371+
inputs_templ = {
372+
k: v for k, v in inputs.__dict__.items() if isinstance(v, str)
373+
}
374+
out_path = output_dir / fld.metadata["output_file_template"].format(
375+
**inputs_templ
365376
)
377+
return out_path
378+
366379
elif "callable" in fld.metadata:
367380
return fld.metadata["callable"](fld.name, output_dir)
368381
else:

pydra/engine/task.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,13 +399,16 @@ def _run_task(self):
399399
else:
400400
args = self.command_args
401401
if args:
402-
# removing emty strings
402+
# removing empty strings
403403
args = [str(el) for el in args if el not in ["", " "]]
404404
keys = ["return_code", "stdout", "stderr"]
405405
values = execute(args, strip=self.strip)
406406
self.output_ = dict(zip(keys, values))
407407
if self.output_["return_code"]:
408-
raise RuntimeError(self.output_["stderr"])
408+
if self.output_["stderr"]:
409+
raise RuntimeError(self.output_["stderr"])
410+
else:
411+
raise RuntimeError(self.output_["stdout"])
409412

410413

411414
class ContainerTask(ShellCommandTask):
9.9 MB
Binary file not shown.

0 commit comments

Comments
 (0)