Skip to content

Commit

Permalink
Merge pull request #778 from pyiron/return_on_run
Browse files Browse the repository at this point in the history
Return output when calling `run`
  • Loading branch information
liamhuber authored Aug 2, 2023
2 parents 5b3835d + 52028eb commit 98542ce
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 61 deletions.
141 changes: 101 additions & 40 deletions notebooks/workflow_example.ipynb

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion pyiron_contrib/workflow/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class Composite(Node, ABC):
By default, `run()` will be called on all owned nodes have output connections but no
input connections (i.e. the upstream-most nodes), but this can be overridden to
specify particular nodes to use instead.
The `run()` method (and `update()`, and calling the workflow, when these result in
a run), return a new dot-accessible dictionary of keys and values created from the
composite output IO panel.
Does not specify `input` and `output` as demanded by the parent class; this
requirement is still passed on to children.
Expand Down Expand Up @@ -92,10 +95,17 @@ def __init__(
label: str,
*args,
parent: Optional[Composite] = None,
run_on_updates: bool = True,
strict_naming: bool = True,
**kwargs,
):
super().__init__(*args, label=label, parent=parent, **kwargs)
super().__init__(
*args,
label=label,
parent=parent,
run_on_updates=run_on_updates,
**kwargs
)
self.strict_naming: bool = strict_naming
self.nodes: DotDict[str:Node] = DotDict()
self.add: NodeAdder = NodeAdder(self)
Expand Down
17 changes: 11 additions & 6 deletions pyiron_contrib/workflow/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class Function(Node):
on call.
This invokes an `update()` call, which can in turn invoke `run()` if
`run_on_updates` is set to `True`.
`run()` returns the output of the executed function, or a futures object if the
node is set to use an executor.
Calling the node or executing an `update()` returns the same thing as running, if
the node is run, or `None` if it is not set to run on updates or not ready to run.
Args:
node_function (callable): The function determining the behaviour of the node.
Expand Down Expand Up @@ -163,10 +167,12 @@ class Function(Node):
{'p1': 2, 'm1': 1}
Input data can be provided to both initialization and on call as ordered args
or keyword kwargs, e.g.:
or keyword kwargs.
When running, updating, or calling the node, the output of the wrapped function
(if it winds up getting run in the conditional cases of updating and calling) is
returned:
>>> plus_minus_1(2, y=3)
>>> plus_minus_1.outputs.to_value_dict()
{'p1': 3, 'm1': 2}
(3, 2)
Finally, we might stop these updates from happening automatically, even when
all the input data is present and available:
Expand All @@ -180,8 +186,7 @@ class Function(Node):
With these flags set, the node requires us to manually call a run:
>>> plus_minus_1.run()
>>> plus_minus_1.outputs.to_value_dict()
{'p1': 1, 'm1': -1}
(-1, 1)
So function nodes have the most basic level of protection that they won't run
if they haven't seen any input data.
Expand Down Expand Up @@ -360,6 +365,7 @@ def __init__(
super().__init__(
label=label if label is not None else node_function.__name__,
parent=parent,
run_on_updates=run_on_updates,
# **kwargs,
)

Expand All @@ -379,7 +385,6 @@ def __init__(
)
self._verify_that_channels_requiring_update_all_exist()

self.run_on_updates = run_on_updates
self._batch_update_input(*args, **kwargs)

if update_on_instantiation:
Expand Down
30 changes: 21 additions & 9 deletions pyiron_contrib/workflow/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import warnings
from abc import ABC, abstractmethod
from concurrent.futures import Future
from typing import Optional, TYPE_CHECKING
from typing import Any, Optional, TYPE_CHECKING

from pyiron_contrib.executors import CloudpickleProcessPoolExecutor
from pyiron_contrib.workflow.files import DirectoryObject
Expand Down Expand Up @@ -45,6 +45,16 @@ class Node(HasToDict, ABC):
By default, nodes' signals input comes with `run` and `ran` IO ports which force
the `run()` method and which emit after `finish_run()` is completed, respectfully.
The `run()` method returns a representation of the node output (possible a futures
object, if the node is running on an executor), and consequently `update()` also
returns this output if the node is `ready` and has `run_on_updates = True`.
Calling an already instantiated node allows its input channels to be updated using
keyword arguments corresponding to the channel labels, performing a batch-update of
all supplied input and then calling `update()`.
As such, calling the node _also_ returns a representation of the output (or `None`
if the node is not set to run on updates, or is otherwise unready to run).
Nodes have a status, which is currently represented by the `running` and `failed`
boolean flags.
Their value is controlled automatically in the defined `run` and `finish_run`
Expand Down Expand Up @@ -154,7 +164,7 @@ def outputs(self) -> Outputs:

@property
@abstractmethod
def on_run(self) -> callable[..., tuple]:
def on_run(self) -> callable[..., Any | tuple]:
"""
What the node actually does!
"""
Expand All @@ -167,7 +177,7 @@ def run_args(self) -> dict:
"""
return {}

def process_run_result(self, run_output: tuple) -> None:
def process_run_result(self, run_output: Any | tuple) -> None:
"""
What to _do_ with the results of `on_run` once you have them.
Expand All @@ -176,7 +186,7 @@ def process_run_result(self, run_output: tuple) -> None:
"""
pass

def run(self) -> None:
def run(self) -> Any | tuple | Future:
"""
Executes the functionality of the node defined in `on_run`.
Handles the status of the node, and communicating with any remote
Expand All @@ -195,18 +205,19 @@ def run(self) -> None:
self.running = False
self.failed = True
raise e
self.finish_run(run_output)
return self.finish_run(run_output)
elif isinstance(self.executor, CloudpickleProcessPoolExecutor):
self.future = self.executor.submit(self.on_run, **self.run_args)
self.future.add_done_callback(self.finish_run)
return self.future
else:
raise NotImplementedError(
"We currently only support executing the node functionality right on "
"the main python process or with a "
"pyiron_contrib.workflow.util.CloudpickleProcessPoolExecutor."
)

def finish_run(self, run_output: tuple | Future):
def finish_run(self, run_output: tuple | Future) -> Any | tuple:
"""
Switch the node status, process the run result, then fire the ran signal.
Expand All @@ -224,6 +235,7 @@ def finish_run(self, run_output: tuple | Future):
try:
self.process_run_result(run_output)
self.signals.output.ran()
return run_output
except Exception as e:
self.failed = True
raise e
Expand All @@ -234,9 +246,9 @@ def _build_signal_channels(self) -> Signals:
signals.output.ran = OutputSignal("ran", self)
return signals

def update(self) -> None:
def update(self) -> Any | tuple | Future | None:
if self.run_on_updates and self.ready:
self.run()
return self.run()

@property
def working_directory(self):
Expand Down Expand Up @@ -300,4 +312,4 @@ def _batch_update_input(self, **kwargs):

def __call__(self, **kwargs) -> None:
self._batch_update_input(**kwargs)
self.update()
return self.update()
24 changes: 20 additions & 4 deletions pyiron_contrib/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,13 @@ class Workflow(Composite):
These input keys can be used when calling the workflow to update the input. In
our example, the nodes update automatically when their input gets updated, so
all we need to do to see updated workflow output is update the input:
>>> wf(first_x=10)
>>> wf.outputs.second_y.value
>>> out = wf(first_x=10)
>>> out
{'second_y': 12}
Note: this _looks_ like a dictionary, but has some extra convenience that we
can dot-access data:
>>> out.second_y
12
Workflows also give access to packages of pre-built nodes under different
Expand Down Expand Up @@ -125,8 +130,19 @@ class Workflow(Composite):
integrity of workflows when they're used somewhere else?
"""

def __init__(self, label: str, *nodes: Node, strict_naming=True):
super().__init__(label=label, parent=None, strict_naming=strict_naming)
def __init__(
self,
label: str,
*nodes: Node,
run_on_updates: bool = True,
strict_naming=True
):
super().__init__(
label=label,
parent=None,
run_on_updates=run_on_updates,
strict_naming=strict_naming,
)

for node in nodes:
self.add_node(node)
Expand Down
72 changes: 71 additions & 1 deletion tests/unit/workflow/test_function.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest
from concurrent.futures import Future
from sys import version_info
from typing import Optional, Union
import unittest
import warnings

from pyiron_contrib.executors import CloudpickleProcessPoolExecutor
Expand Down Expand Up @@ -342,6 +343,75 @@ def test_call(self):
# there should just be a warning that the data didn't get updated
node(some_randome_kwaaaaarg="foo")

def test_return_value(self):
node = Function(plus_one)

with self.subTest("Run on main process"):
return_on_call = node(1)
self.assertEqual(
return_on_call,
plus_one(1),
msg="Run output should be returned on call"
)

return_on_update = node.update()
self.assertEqual(
return_on_update,
plus_one(1),
msg="Run output should be returned on update"
)

node.run_on_updates = False
return_on_update_without_run = node.update()
self.assertIsNone(
return_on_update_without_run,
msg="When not running on updates, the update should not return anything"
)
return_on_call_without_run = node(2)
self.assertIsNone(
return_on_call_without_run,
msg="When not running on updates, the call should not return anything"
)
return_on_explicit_run = node.run()
self.assertEqual(
return_on_explicit_run,
plus_one(2),
msg="On explicit run, the most recent input data should be used and the "
"result should be returned"
)

with self.subTest("Run on executor"):
node.executor = CloudpickleProcessPoolExecutor()
node.run_on_updates = False

return_on_update_without_run = node.update()
self.assertIsNone(
return_on_update_without_run,
msg="When not running on updates, the update should not return "
"anything whether there is an executor or not"
)
return_on_explicit_run = node.run()
self.assertIsInstance(
return_on_explicit_run,
Future,
msg="Running with an executor should return the future"
)
with self.assertRaises(RuntimeError):
# The executor run should take a second
# So we can double check that attempting to run while already running
# raises an error
node.run()
node.future.result() # Wait for the remote execution to finish

node.run_on_updates = True
return_on_update_with_run = node.update()
self.assertIsInstance(
return_on_update_with_run,
Future,
msg="Updating should return the same as run when we get a run from the "
"update, obviously..."
)
node.future.result() # Wait for the remote execution to finish

@unittest.skipUnless(version_info[0] == 3 and version_info[1] >= 10, "Only supported for 3.10+")
class TestSlow(unittest.TestCase):
Expand Down
45 changes: 45 additions & 0 deletions tests/unit/workflow/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pyiron_contrib.workflow.channels import NotData
from pyiron_contrib.workflow.files import DirectoryObject
from pyiron_contrib.workflow.function import Function
from pyiron_contrib.workflow.util import DotDict
from pyiron_contrib.workflow.workflow import Workflow


Expand Down Expand Up @@ -229,6 +230,50 @@ def sum_(a, b):
# We _must_ use kwargs
wf(42, 42)

def test_return_value(self):
wf = Workflow("wf")
wf.run_on_updates = True
wf.a = wf.add.SingleValue(plus_one)
wf.b = wf.add.SingleValue(plus_one, x=wf.a)

with self.subTest("Run on main process"):
return_on_call = wf(a_x=1)
self.assertEqual(
return_on_call,
DotDict({"b_y": 1 + 2}),
msg="Run output should be returned on call. Expecting a DotDict of "
"output values"
)

return_on_update = wf.update()
self.assertEqual(
return_on_update.b_y,
1 + 2,
msg="Run output should be returned on update"
)

wf.run_on_updates = False
return_on_update_without_run = wf.update()
self.assertIsNone(
return_on_update_without_run,
msg="When not running on updates, the update should not return anything"
)
return_on_call_without_run = wf(a_x=2)
self.assertIsNone(
return_on_call_without_run,
msg="When not running on updates, the call should not return anything"
)
return_on_explicit_run = wf.run()
self.assertEqual(
return_on_explicit_run["b_y"],
2 + 2,
msg="On explicit run, the most recent input data should be used and the "
"result should be returned"
)

# Note: We don't need to test running on an executor, because Workflows can't
# do that yet


if __name__ == '__main__':
unittest.main()

0 comments on commit 98542ce

Please sign in to comment.