Skip to content

[WIP][ENH] Created BoutiquesTask for simiplified execution of Boutiques tools in Pydra #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions pydra/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,153 @@
self.output_[output_names[0]] = output


class BoutiquesTask(FunctionTask):
"""Wrap a Boutiques callable as a task element."""

global boutiques_func

def boutiques_func(descriptor, args, **kwargs):
from boutiques.descriptor2func import function

Check warning on line 218 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L218

Added line #L218 was not covered by tests

tool = function(descriptor)
ret = tool(*args, **kwargs)

Check warning on line 221 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L220-L221

Added lines #L220 - L221 were not covered by tests

# print formatted output
print(ret)

Check warning on line 224 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L224

Added line #L224 was not covered by tests

if ret.exit_code:
raise RuntimeError(ret.stderr)

Check warning on line 227 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L227

Added line #L227 was not covered by tests

return ret.output_files

Check warning on line 229 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L229

Added line #L229 was not covered by tests

def __init__(
self,
descriptor: ty.Text,
audit_flags: AuditFlag = AuditFlag.NONE,
cache_dir=None,
cache_locations=None,
input_spec: ty.Optional[SpecInfo] = None,
messenger_args=None,
messengers=None,
name=None,
output_spec: ty.Optional[BaseSpec] = None,
rerun=False,
bosh_args=[],
**kwargs,
):
"""
Initialize this task.

Parameters
----------
descriptor : :obj:`str`
The filename or zenodo ID of the boutiques descriptor
audit_flags: :obj:`pydra.utils.messenger.AuditFlag`
Auditing configurations
cache_dir : :obj:`os.pathlike`
Cache directory
cache_locations : :obj:`list` of :obj:`os.pathlike`
List of alternative cache locations.
input_spec: :obj:`pydra.engine.specs.SpecInfo`
Specification of inputs.
messenger_args :
TODO
messengers :
TODO
name : :obj:`str`
Name of this task.
output_spec : :obj:`pydra.engine.specs.BaseSpec`
Specification of inputs.
bosh_args : :object:`list` of :obj:`str`
List of arguments to pass to Boutiques

"""

func = boutiques_func
self.func = func

Check warning on line 275 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L274-L275

Added lines #L274 - L275 were not covered by tests

default_fields = [
(
val.name,
attr.ib(
default=val.default,
type=val.annotation,
metadata={
"help_string": f"{val.name} parameter from {func.__name__}"
},
),
)
if val.default is not inspect.Signature.empty
else (
val.name,
attr.ib(type=val.annotation, metadata={"help_string": val.name}),
)
for val in inspect.signature(func).parameters.values()
if val.name != "kwargs"
]

# Adding kwargs here because Boutiques also validates inputs
if input_spec is None:
input_spec = SpecInfo(
name="Inputs",
fields=[
(k, attr.ib(type=type(v), metadata={"help_string": k}),)
for k, v in kwargs.items()
],
bases=(BaseSpec,),
)

# users shouldn't have to add "descriptor" and "args" in their input_spec
input_spec.fields.extend(default_fields)

Check warning on line 309 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L309

Added line #L309 was not covered by tests

fmt_kwargs = {"descriptor": descriptor, "args": bosh_args}
fmt_kwargs.update(kwargs)

Check warning on line 312 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L311-L312

Added lines #L311 - L312 were not covered by tests

super(BoutiquesTask, self).__init__(

Check warning on line 314 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L314

Added line #L314 was not covered by tests
func,
name=name,
audit_flags=audit_flags,
messengers=messengers,
messenger_args=messenger_args,
cache_dir=cache_dir,
cache_locations=cache_locations,
input_spec=input_spec,
rerun=rerun,
**fmt_kwargs,
)

if output_spec is None:
output_spec = SpecInfo(

Check warning on line 328 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L328

Added line #L328 was not covered by tests
name="Output", fields=[("out", ty.Any)], bases=(BaseSpec,)
)
self.output_spec = output_spec

Check warning on line 331 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L331

Added line #L331 was not covered by tests

def _run_task(self):

inputs = attr.asdict(self.inputs)
del inputs["_func"]
self.output_ = None

Check warning on line 337 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L335-L337

Added lines #L335 - L337 were not covered by tests

output = cp.loads(self.inputs._func)(**inputs)

Check warning on line 339 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L339

Added line #L339 was not covered by tests
if output is not None:
output_names = [el[0] for el in self.output_spec.fields]
self.output_ = {}

Check warning on line 342 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L342

Added line #L342 was not covered by tests
if len(output_names) > 1 or output_names[0] != "out":
self.output_ = {
f.boutiques_name: f.file_name
for f in output
if f.boutiques_name in output_names
}
if len(output_names) != len(self.output_.keys()):
raise Exception(

Check warning on line 350 in pydra/engine/task.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/task.py#L350

Added line #L350 was not covered by tests
f"expected {len(self.output_spec.fields)} elements, "
f"but {len(output)} were returned"
)
else:
# Return all filenames if not specified by the user what to return
self.output_[output_names[0]] = [out.file_name for out in output]


class ShellCommandTask(TaskBase):
"""Wrap a shell command as a task element."""

Expand Down