Open
Description
What version of Pydra are you using?
0.25.0
Of importance (we believe): this issue comes from upgrading torch to 2.6.0
What were you trying to do?
Use Pydra with torch.Tensors in tasks (splitting list of tensors to functions that input and output individual tensors).
What did you expect will happen?
Test to pass.
What actually happened?
Test no longer passes and errors with:
AttributeError: Task 'test_task_task' has no output attribute 'out', available: '_is_param', 'all_'
Example code:
"""Tests Pydra Helping functions."""
import pydra
import torch
@pydra.mark.task
def pydra_task(test_input: torch.Tensor) -> torch.Tensor:
"""Task function for Pydra workflow to run."""
return test_input + 2
def test_pydra() -> None:
"""Test simple tensor pydra workflow."""
wf = pydra.Workflow(name="wf_test", input_spec=["x"])
wf.split("x", x=[torch.tensor([[3, 4], [5, 6]]), torch.tensor([[0, 1], [1, 2]])])
wf.add(pydra_task(name="test_task_task", test_input=wf.lzin.x))
wf.set_output([("wf_out", wf.test_task_task.lzout.out)])
with pydra.Submitter(plugin="serial", n_procs=1) as sub:
sub(wf)
results = wf.result()
assert results[0].output.wf_out.equal(torch.tensor([[5, 6], [7, 8]]))
assert results[1].output.wf_out.equal(torch.tensor([[2, 3], [3, 4]]))
Expected: Pass the test
Actual:
Note: based on #761 this code might be needed for the test to pass:
@register_serializer(torch.Tensor)
def bytes_repr_arraylike(obj: torch.Tensor, cache: Cache) -> Iterator[bytes]:
"""Register a serializer for Torch tensors that allows Pydra to properly use them."""
yield f"{obj.__class__.__module__}{obj.__class__.__name__}:".encode()
array = np.asanyarray(obj)
yield f"{array.size}:".encode()
if array.dtype == "object":
yield from bytes_repr_sequence_contents(iter(array.ravel()), cache)
else:
yield array.tobytes(order="C")
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
v1.0