diff --git a/pyiron_workflow/channels.py b/pyiron_workflow/channels.py index da735497..890f37fc 100644 --- a/pyiron_workflow/channels.py +++ b/pyiron_workflow/channels.py @@ -602,6 +602,13 @@ def _node_injection(self, injection_class, *args, inject_self=True): def __getattr__(self, name): from pyiron_workflow.node_library.standard import GetAttr + if name == "to_hdf": + raise AttributeError( + "This is just a failsafe to protect us against other elements of the " + "pyiron ecosystem (pyiron_base's DataContainer) running a " + "`hasattr('to_hdf')` check on us and accidentally injecting a new " + "getattr node." + ) return self._node_injection(GetAttr, name) def __getitem__(self, item): diff --git a/pyiron_workflow/job.py b/pyiron_workflow/job.py index 212e73bf..8b8c99a4 100644 --- a/pyiron_workflow/job.py +++ b/pyiron_workflow/job.py @@ -3,11 +3,9 @@ Two approaches are provided while we work out which is more convenient and how some edge cases may be handled differently: -- A direct sub-class of :class:`TemplateJob`, created using the usual job creation. -- A helper method for using :meth:`Project.wrap_python_function`. - -The wrapper function appears to be slightly slower (presumably because of the extra -layer of serialization). +- `NodeOutputJob(PythonFunctionContainerJob)`, which gets serialized like any other + wrapped python code. +- `StoredNodeJob(TemplateJob)`, which uses the node's underlying storage capabilities. The intent of this module is to provide immediate access to pyiron's queue submission functionality, while in the long run this should be integrated more directly with the @@ -22,56 +20,138 @@ from __future__ import annotations +import inspect import os import sys from pyiron_base import TemplateJob, JOB_CLASS_DICT +from pyiron_base.jobs.flex.pythonfunctioncontainer import ( + PythonFunctionContainerJob, + get_function_parameter_dict, +) from pyiron_workflow.node import Node from h5io._h5io import _import_class -_WARNINGS_STRING = """ + +def _node_out(node): + node.run() + return node.outputs.to_value_dict() + + +class NodeOutputJob(PythonFunctionContainerJob): + """ + A `PythonFunctionContainer` node that explicitly runs a node input and returns its + output value dictionary. + + Examples: + >>> from pyiron_base import Project + >>> from pyiron_workflow import Workflow + >>> import pyiron_workflow.job # To get the job registered in JOB_CLASS_DICT + >>> + >>> wf = Workflow("pyiron_node", overwrite_save=True) + >>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes + >>> + >>> pr = Project("test") + >>> + >>> nj = pr.create.job.NodeOutputJob("my_node") + >>> nj.input["node"] = wf + >>> nj.run() # doctest:+ELLIPSIS + The job ... + >>> print(nj.output) + DataContainer({'answer__user_input': 42}) + + >>> lj = pr.load(nj.job_name) + >>> print(nj.output) + DataContainer({'answer__user_input': 42}) + + >>> pr.remove_jobs(recursive=True, silently=True) + >>> pr.remove(enable=True) + Warnings: - Node jobs rely on storing the node to file, which means these are also only - available for python >= 3.11. - - The job can be run with `run_mode="non_modal"`, but _only_ if all the nodes - being run are defined in an importable file location -- i.e. copying and - pasting the example above into a jupyter notebook works fine in modal mode, but - will throw an exception if you try to run it non-modally. + All submitted nodes must be importable from their module at load time, or + loading will fail. This means node definitions can't be nested inside another + object, and any nodes defined in `__main__` (e.g. in a jupyter notebook) must + be redefined. If node definitions get changed between saving and loading, all + bets are off. + """ - This hasn't been tested for running on a remote queue. It should work, but it's - _possible_ the same requirement from non-modal mode (importable nodes) will - apply. -""" + def __init__(self, project, job_name): + if sys.version_info < (3, 11): + raise NotImplementedError("Node jobs are only available in python 3.11+") + super().__init__(project, job_name) + self._function = _node_out + self.input.update(get_function_parameter_dict(funct=_node_out)) + self._executor_type = None + + @property + def python_function(self): + return self._function + + @python_function.setter + def python_function(self, funct): + raise NotImplementedError( + f"{self.__class__.__name__}'s python function is to run the node and get " + f"its output values, and this may not be overridden." + ) + + def validate_ready_to_run(self): + if not isinstance(self.input["node"], Node): + raise TypeError(f"'node' input must be of type {Node.__name__}") + elif not self.input["node"].ready: + nl = "\n" + raise ValueError( + f"Node not ready:{nl}{self.input['node'].readiness_report}" + ) + + def run_static(self): + # Overrides the parent method + # Copy and paste except for the output update, which makes sure the output is + # flat and not tested beneath "result" + if ( + self._executor_type is not None + and "executor" in inspect.signature(self._function).parameters.keys() + ): + input_dict = self.input.to_builtin() + del input_dict["executor"] + output = self._function( + **input_dict, executor=self._get_executor(max_workers=self.server.cores) + ) + else: + output = self._function(**self.input.to_builtin()) + self.output.update(output) # DIFFERS FROM PARENT METHOD + self.to_hdf() + self.status.finished = True + + +JOB_CLASS_DICT[NodeOutputJob.__name__] = NodeOutputJob.__module__ class StoredNodeJob(TemplateJob): - __doc__ = ( - """ + """ This job is an intermediate feature for accessing pyiron's queue submission infrastructure for nodes (function nodes, macros, or entire workflows). It leans directly on the storage capabilities of the node itself, except for the node class and name, and the storage backend mode, all of which are held in the - traditional job input. (WARNING: This might be fragile to adjusting the storage + traditional job input. (WARNING: This might be fragile to adjusting the storage backend on the node _after_ the node has been assign to the job.) The job provides direct access to its owned node (as both input and output) on the :attr:`node` attribute. The only requirement is that the node have an untouched working directory (so we can make sure its files get stored _inside_ the job's - directory tree), and that it be saveable (not all objects work with the "h5io" + directory tree), and that it be saveable (not all objects work with the "h5io" storage backend, e.g. `ase.Calculator` objects may break it). Examples: >>> from pyiron_base import Project >>> from pyiron_workflow import Workflow >>> import pyiron_workflow.job # To get the job registered in JOB_CLASS_DICT - >>> + >>> >>> wf = Workflow("pyiron_node", overwrite_save=True) >>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes - >>> + >>> >>> pr = Project("test") - >>> + >>> >>> nj = pr.create.job.StoredNodeJob("my_node") >>> nj.node = wf >>> nj.run() # doctest:+ELLIPSIS @@ -86,9 +166,20 @@ class StoredNodeJob(TemplateJob): >>> pr.remove_jobs(recursive=True, silently=True) >>> pr.remove(enable=True) + + Warnings: + Node jobs rely on storing the node to file, which means these are also only + available for python >= 3.11. + + The job can be run with `run_mode="non_modal"`, but _only_ if all the nodes + being run are defined in an importable file location -- i.e. copying and + pasting the example above into a jupyter notebook works fine in modal mode, but + will throw an exception if you try to run it non-modally. + + This hasn't been tested for running on a remote queue. It should work, but it's + _possible_ the same requirement from non-modal mode (importable nodes) will + apply. """ - + _WARNINGS_STRING - ) def __init__(self, project, job_name): if sys.version_info < (3, 11): @@ -166,57 +257,3 @@ def run_static(self): JOB_CLASS_DICT[StoredNodeJob.__name__] = StoredNodeJob.__module__ - - -def _run_node(node): - node.run() - return node - - -def create_job_with_python_wrapper(project, node): - __doc__ = ( - """ - A convenience wrapper around :meth:`pyiron_base.Project.wrap_python_function` for - running a `pyiron_workflow.Workflow`. (And _only_ workflows, `Function` and `Macro` - children will fail.) - - Args: - project (pyiron_base.Project): A pyiron project. - node (pyiron_workflow.node.Node): The node to run. - - Returns: - (pyiron_base.jobs.flex.pythonfunctioncontainer.PythonFunctionContainerJob): - A job which wraps a function for running the node, with the `"node"` input - pre-populated with the provided node. - - Examples: - >>> from pyiron_base import Project - >>> from pyiron_workflow import Workflow - >>> from pyiron_workflow.job import create_job_with_python_wrapper - >>> - >>> wf = Workflow("pyiron_node", overwrite_save=True) - >>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes - >>> - >>> pr = Project("test") - >>> - >>> nj = create_job_with_python_wrapper(pr, wf) - >>> nj.run() - >>> print(nj.output["result"].outputs.to_value_dict()) - {'answer__user_input': 42} - - >>> lj = pr.load(nj.job_name) - >>> print(nj.output["result"].outputs.to_value_dict()) - {'answer__user_input': 42} - - >>> pr.remove_jobs(recursive=True, silently=True) - >>> pr.remove(enable=True) - - """ - + _WARNINGS_STRING - ) - if sys.version_info < (3, 11): - raise NotImplementedError("Node jobs are only available in python 3.11+") - - job = project.wrap_python_function(_run_node) - job.input["node"] = node - return job diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index b20c1f39..3ff24fae 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -6,7 +6,7 @@ from pyiron_base import Project from pyiron_workflow import Workflow from pyiron_workflow.channels import NOT_DATA -from pyiron_workflow.job import create_job_with_python_wrapper +import pyiron_workflow.job # To get the job classes registered @Workflow.wrap_as.single_value_node("t") @@ -28,10 +28,10 @@ def tearDown(self) -> None: self.pr.remove(enable=True) -class TestStoredNodeJob(_WithAJob): +class TestNodeNodeOutputJobJob(_WithAJob): def make_a_job_from_node(self, node): - job = self.pr.create.job.StoredNodeJob(node.label) - job.node = node + job = self.pr.create.job.NodeOutputJob(node.label) + job.input["node"] = node return job @unittest.skipIf(sys.version_info >= (3, 11), "Storage should only work in 3.11+") @@ -51,7 +51,7 @@ def test_node(self): nj.run() self.assertEqual( 42, - nj.node.outputs.user_input.value, + nj.output.user_input, msg="A single node should run just as well as a workflow" ) @@ -69,7 +69,7 @@ def test_modal(self): ) self.assertEqual( 0, - nj.node.outputs.out__user_input.value, + nj.output.out__user_input, msg="The node should have run, and since it's modal there's no need to " "update the instance" ) @@ -81,8 +81,8 @@ def test_modal(self): msg="The loaded job should be a new instance." ) self.assertEqual( - nj.node.outputs.out__user_input.value, - lj.node.outputs.out__user_input.value, + nj.output.out__user_input, + lj.output.out__user_input, msg="The loaded job should still have all the same values" ) @@ -104,39 +104,62 @@ def test_nonmodal(self): nj.status.finished, msg="The job status should update on completion" ) - self.assertIs( - nj.node.outputs.out__user_input.value, - NOT_DATA, - msg="As usual with remote processes, we expect to require a data read " - "before the local instance reflects its new state." + self.assertEqual( + 0, + len(nj.output), + msg="Non-modal stuff needs to be reloaded" ) lj = self.pr.load(nj.job_name) self.assertEqual( 42, - lj.node.outputs.out__user_input.value, + lj.output.out__user_input, msg="The loaded job should have the finished values" ) @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") - def test_bad_workflow(self): - has_wd_wf = Workflow("not_empty") - try: - has_wd_wf.working_directory # Touch the working directory, creating it - with self.assertRaises( - ValueError, - msg="To make sure the node gets stored _inside_ the job, we only " - "accept the assignment of nodes who haven't looked at their working " - "directory yet" - ): - self.make_a_job_from_node(has_wd_wf) - finally: - has_wd_wf.working_directory.delete() + def test_bad_input(self): + with self.subTest("Not a node"): + nj = self.pr.create.job.NodeOutputJob("will_fail") + nj.input["node"] = 42 + with self.assertRaises(TypeError, msg="The input is not a node"): + nj.run() + + with self.subTest("Node not ready"): + node = Workflow.create.standard.UserInput() # No value! + self.assertFalse(node.ready, msg="Sanity check") + nj = self.make_a_job_from_node(node) + with self.assertRaises(ValueError, msg="The input is not ready"): + nj.run() -class TestWrapperFunction(_WithAJob): + @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") + def test_unloadable(self): + @Workflow.wrap_as.single_value_node("y") + def not_importable_directy_from_module(x): + return x + 1 + + nj = self.make_a_job_from_node(not_importable_directy_from_module(42)) + nj.run() + self.assertEqual( + 43, + nj.output.y, + msg="Things should run fine locally" + ) + with self.assertRaises( + AttributeError, + msg="We have promised that you'll hit trouble if you try to load a job " + "whose nodes are not all importable directly from their module" + # h5io also has this limitation, so I suspect that may be the source + ): + self.pr.load(nj.job_name) + + +class TestStoredNodeJob(_WithAJob): def make_a_job_from_node(self, node): - return create_job_with_python_wrapper(self.pr, node) + job = self.pr.create.job.StoredNodeJob(node.label) + job.node = node + return job @unittest.skipIf(sys.version_info >= (3, 11), "Storage should only work in 3.11+") def test_clean_failure(self): @@ -148,6 +171,17 @@ def test_clean_failure(self): node = Workflow.create.standard.UserInput(42) self.make_a_job_from_node(node) + @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") + def test_node(self): + node = Workflow.create.standard.UserInput(42) + nj = self.make_a_job_from_node(node) + nj.run() + self.assertEqual( + 42, + nj.node.outputs.user_input.value, + msg="A single node should run just as well as a workflow" + ) + @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") def test_modal(self): modal_wf = Workflow("modal_wf") @@ -162,7 +196,7 @@ def test_modal(self): ) self.assertEqual( 0, - nj.output["result"].outputs.out__user_input.value, + nj.node.outputs.out__user_input.value, msg="The node should have run, and since it's modal there's no need to " "update the instance" ) @@ -174,8 +208,8 @@ def test_modal(self): msg="The loaded job should be a new instance." ) self.assertEqual( - nj.output["result"].outputs.out__user_input.value, - lj.output["result"].outputs.out__user_input.value, + nj.node.outputs.out__user_input.value, + lj.node.outputs.out__user_input.value, msg="The loaded job should still have all the same values" ) @@ -197,27 +231,31 @@ def test_nonmodal(self): nj.status.finished, msg="The job status should update on completion" ) - with self.assertRaises( - KeyError, + self.assertIs( + nj.node.outputs.out__user_input.value, + NOT_DATA, msg="As usual with remote processes, we expect to require a data read " "before the local instance reflects its new state." - ): - nj.output["result"] + ) lj = self.pr.load(nj.job_name) self.assertEqual( 42, - lj.output["result"].outputs.out__user_input.value, + lj.node.outputs.out__user_input.value, msg="The loaded job should have the finished values" ) @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") - def test_node(self): - node = Workflow.create.standard.UserInput(42) - nj = self.make_a_job_from_node(node) - with self.assertRaises( - AttributeError, - msg="The wrapping routine doesn't interact well with getattr overrides on " - "node state elements (output data)" - ): - nj.run() + def test_bad_workflow(self): + has_wd_wf = Workflow("not_empty") + try: + has_wd_wf.working_directory # Touch the working directory, creating it + with self.assertRaises( + ValueError, + msg="To make sure the node gets stored _inside_ the job, we only " + "accept the assignment of nodes who haven't looked at their working " + "directory yet" + ): + self.make_a_job_from_node(has_wd_wf) + finally: + has_wd_wf.working_directory.delete()