From 5c465cc5ca235c38ae6436d782c5002fb2fc329a Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 20 Feb 2024 11:40:40 -0800 Subject: [PATCH 1/5] Introduce a new node job --- pyiron_workflow/job.py | 95 ++++++++++++++++++++++++++++++++++ tests/unit/test_job.py | 112 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) diff --git a/pyiron_workflow/job.py b/pyiron_workflow/job.py index 212e73bf..0db02791 100644 --- a/pyiron_workflow/job.py +++ b/pyiron_workflow/job.py @@ -22,13 +22,108 @@ from __future__ import annotations +import inspect import os import sys from pyiron_base import TemplateJob, JOB_CLASS_DICT +from pyiron_base.jobs.flex.pythonfunctioncontainer import ( + PythonFunctionContainerJob, get_function_parameter_dict +) from pyiron_workflow.node import Node from h5io._h5io import _import_class + +def _node_out(node): + node.run() + return node.outputs.to_value_dict() + + +class NodeOutputJob(PythonFunctionContainerJob): + """ + A `PythonFunctionContainer` node that explicitly runs a node input and returns its + output value dictionary. + + Examples: + >>> from pyiron_base import Project + >>> from pyiron_workflow import Workflow + >>> import pyiron_workflow.job # To get the job registered in JOB_CLASS_DICT + >>> + >>> wf = Workflow("pyiron_node", overwrite_save=True) + >>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes + >>> + >>> pr = Project("test") + >>> + >>> nj = pr.create.job.NodeOutputJob("my_node") + >>> nj.input["node"] = wf + >>> nj.run() # doctest:+ELLIPSIS + The job ... + >>> print(nj.output) + DataContainer({'answer__user_input': 42}) + + >>> lj = pr.load(nj.job_name) + >>> print(nj.output) + DataContainer({'answer__user_input': 42}) + + >>> pr.remove_jobs(recursive=True, silently=True) + >>> pr.remove(enable=True) + + Warnings: + All submitted nodes must be importable from their module at load time, or + loading will fail. This means node definitions can't be nested inside another + object, and any nodes defined in `__main__` (e.g. in a jupyter notebook) must + be redefined. If node definitions get changed between saving and loading, all + bets are off. + """ + def __init__(self, project, job_name): + super().__init__(project, job_name) + self._function = _node_out + self.input.update(get_function_parameter_dict(funct=_node_out)) + self._executor_type = None + + @property + def python_function(self): + return self._function + + @python_function.setter + def python_function(self, funct): + raise NotImplementedError( + f"{self.__class__.__name__}'s python function is to run the node and get " + f"its output values, and this may not be overridden." + ) + + def validate_ready_to_run(self): + if not isinstance(self.input["node"], Node): + raise TypeError(f"'node' input must be of type {Node.__name__}") + elif not self.input["node"].ready: + nl = "\n" + raise ValueError( + f"Node not ready:{nl}{self.input['node'].readiness_report}" + ) + + def run_static(self): + # Overrides the parent method + # Copy and paste except for the output update, which makes sure the output is + # flat and not tested beneath "result" + if ( + self._executor_type is not None + and "executor" in inspect.signature(self._function).parameters.keys() + ): + input_dict = self.input.to_builtin() + del input_dict["executor"] + output = self._function( + **input_dict, executor=self._get_executor(max_workers=self.server.cores) + ) + else: + output = self._function(**self.input.to_builtin()) + self.output.update(output) # DIFFERS FROM PARENT METHOD + self.to_hdf() + self.status.finished = True + + +JOB_CLASS_DICT[NodeOutputJob.__name__] = NodeOutputJob.__module__ + + _WARNINGS_STRING = """ Warnings: Node jobs rely on storing the node to file, which means these are also only diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index b20c1f39..7ce22b77 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -28,6 +28,118 @@ def tearDown(self) -> None: self.pr.remove(enable=True) +class TestNodeNodeOutputJobJob(_WithAJob): + def make_a_job_from_node(self, node): + job = self.pr.create.job.NodeOutputJob(node.label) + job.input["node"] = node + return job + + def test_node(self): + node = Workflow.create.standard.UserInput(42) + nj = self.make_a_job_from_node(node) + nj.run() + self.assertEqual( + 42, + nj.output.user_input, + msg="A single node should run just as well as a workflow" + ) + + def test_modal(self): + modal_wf = Workflow("modal_wf") + modal_wf.sleep = Sleep(0) + modal_wf.out = modal_wf.create.standard.UserInput(modal_wf.sleep) + nj = self.make_a_job_from_node(modal_wf) + + nj.run() + self.assertTrue( + nj.status.finished, + msg="The interpreter should not release until the job is done" + ) + self.assertEqual( + 0, + nj.output.out__user_input, + msg="The node should have run, and since it's modal there's no need to " + "update the instance" + ) + + lj = self.pr.load(nj.job_name) + self.assertIsNot( + lj, + nj, + msg="The loaded job should be a new instance." + ) + self.assertEqual( + nj.output.out__user_input, + lj.output.out__user_input, + msg="The loaded job should still have all the same values" + ) + + def test_nonmodal(self): + nonmodal_node = Workflow("non_modal") + nonmodal_node.out = Workflow.create.standard.UserInput(42) + + nj = self.make_a_job_from_node(nonmodal_node) + nj.run(run_mode="non_modal") + self.assertFalse( + nj.status.finished, + msg=f"The local process should released immediately per non-modal " + f"style, but got status {nj.status}" + ) + while not nj.status.finished: + sleep(0.1) + self.assertTrue( + nj.status.finished, + msg="The job status should update on completion" + ) + self.assertEqual( + 0, + len(nj.output), + msg="Non-modal stuff needs to be reloaded" + ) + + lj = self.pr.load(nj.job_name) + self.assertEqual( + 42, + lj.output.out__user_input, + msg="The loaded job should have the finished values" + ) + + def test_bad_input(self): + with self.subTest("Not a node"): + nj = self.pr.create.job.NodeOutputJob("will_fail") + nj.input["node"] = 42 + with self.assertRaises(TypeError, msg="The input is not a node"): + nj.run() + + with self.subTest("Node not ready"): + node = Workflow.create.standard.UserInput() # No value! + self.assertFalse(node.ready, msg="Sanity check") + + nj = self.make_a_job_from_node(node) + with self.assertRaises(ValueError, msg="The input is not ready"): + nj.run() + + def test_unloadable(self): + @Workflow.wrap_as.single_value_node("y") + def not_importable_directy_from_module(x): + return x + 1 + + nj = self.make_a_job_from_node(not_importable_directy_from_module(42)) + nj.run() + self.assertEqual( + 43, + nj.output.y, + msg="Things should run fine locally" + ) + with self.assertRaises( + AttributeError, + msg="We have promised that you'll hit trouble if you try to load a job " + "whose nodes are not all importable directly from their module" + # h5io also has this limitation, so I suspect that may be the source + ): + self.pr.load(nj.job_name) + + class TestStoredNodeJob(_WithAJob): def make_a_job_from_node(self, node): job = self.pr.create.job.StoredNodeJob(node.label) From ef2dee6063908fc8e1795e090e62886a0119dd49 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 20 Feb 2024 11:41:02 -0800 Subject: [PATCH 2/5] Monkey patch the OutputDataChannel so DataContainer can work with it --- pyiron_workflow/channels.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pyiron_workflow/channels.py b/pyiron_workflow/channels.py index da735497..611c546e 100644 --- a/pyiron_workflow/channels.py +++ b/pyiron_workflow/channels.py @@ -601,7 +601,13 @@ def _node_injection(self, injection_class, *args, inject_self=True): def __getattr__(self, name): from pyiron_workflow.node_library.standard import GetAttr - + if name == "to_hdf": + raise AttributeError( + "This is just a failsafe to protect us against other elements of the " + "pyiron ecosystem (pyiron_base's DataContainer) running a " + "`hasattr('to_hdf')` check on us and accidentally injecting a new " + "getattr node." + ) return self._node_injection(GetAttr, name) def __getitem__(self, item): From 5258452fd828767b20ea7c380d89217adbff4a9c Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 20 Feb 2024 11:46:17 -0800 Subject: [PATCH 3/5] Remove the wrapper interface It's just a less-good version of the `NodeOutputJob` --- pyiron_workflow/job.py | 96 ++++++++---------------------------------- tests/unit/test_job.py | 91 +-------------------------------------- 2 files changed, 18 insertions(+), 169 deletions(-) diff --git a/pyiron_workflow/job.py b/pyiron_workflow/job.py index 0db02791..bdfd6760 100644 --- a/pyiron_workflow/job.py +++ b/pyiron_workflow/job.py @@ -3,11 +3,9 @@ Two approaches are provided while we work out which is more convenient and how some edge cases may be handled differently: -- A direct sub-class of :class:`TemplateJob`, created using the usual job creation. -- A helper method for using :meth:`Project.wrap_python_function`. - -The wrapper function appears to be slightly slower (presumably because of the extra -layer of serialization). +- `NodeOutputJob(PythonFunctionContainerJob)`, which gets serialized like any other + wrapped python code. +- `StoredNodeJob(TemplateJob)`, which uses the node's underlying storage capabilities. The intent of this module is to provide immediate access to pyiron's queue submission functionality, while in the long run this should be integrated more directly with the @@ -124,25 +122,8 @@ def run_static(self): JOB_CLASS_DICT[NodeOutputJob.__name__] = NodeOutputJob.__module__ -_WARNINGS_STRING = """ - Warnings: - Node jobs rely on storing the node to file, which means these are also only - available for python >= 3.11. - - The job can be run with `run_mode="non_modal"`, but _only_ if all the nodes - being run are defined in an importable file location -- i.e. copying and - pasting the example above into a jupyter notebook works fine in modal mode, but - will throw an exception if you try to run it non-modally. - - This hasn't been tested for running on a remote queue. It should work, but it's - _possible_ the same requirement from non-modal mode (importable nodes) will - apply. -""" - - class StoredNodeJob(TemplateJob): - __doc__ = ( - """ + """ This job is an intermediate feature for accessing pyiron's queue submission infrastructure for nodes (function nodes, macros, or entire workflows). @@ -181,9 +162,20 @@ class StoredNodeJob(TemplateJob): >>> pr.remove_jobs(recursive=True, silently=True) >>> pr.remove(enable=True) + + Warnings: + Node jobs rely on storing the node to file, which means these are also only + available for python >= 3.11. + + The job can be run with `run_mode="non_modal"`, but _only_ if all the nodes + being run are defined in an importable file location -- i.e. copying and + pasting the example above into a jupyter notebook works fine in modal mode, but + will throw an exception if you try to run it non-modally. + + This hasn't been tested for running on a remote queue. It should work, but it's + _possible_ the same requirement from non-modal mode (importable nodes) will + apply. """ - + _WARNINGS_STRING - ) def __init__(self, project, job_name): if sys.version_info < (3, 11): @@ -261,57 +253,3 @@ def run_static(self): JOB_CLASS_DICT[StoredNodeJob.__name__] = StoredNodeJob.__module__ - - -def _run_node(node): - node.run() - return node - - -def create_job_with_python_wrapper(project, node): - __doc__ = ( - """ - A convenience wrapper around :meth:`pyiron_base.Project.wrap_python_function` for - running a `pyiron_workflow.Workflow`. (And _only_ workflows, `Function` and `Macro` - children will fail.) - - Args: - project (pyiron_base.Project): A pyiron project. - node (pyiron_workflow.node.Node): The node to run. - - Returns: - (pyiron_base.jobs.flex.pythonfunctioncontainer.PythonFunctionContainerJob): - A job which wraps a function for running the node, with the `"node"` input - pre-populated with the provided node. - - Examples: - >>> from pyiron_base import Project - >>> from pyiron_workflow import Workflow - >>> from pyiron_workflow.job import create_job_with_python_wrapper - >>> - >>> wf = Workflow("pyiron_node", overwrite_save=True) - >>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes - >>> - >>> pr = Project("test") - >>> - >>> nj = create_job_with_python_wrapper(pr, wf) - >>> nj.run() - >>> print(nj.output["result"].outputs.to_value_dict()) - {'answer__user_input': 42} - - >>> lj = pr.load(nj.job_name) - >>> print(nj.output["result"].outputs.to_value_dict()) - {'answer__user_input': 42} - - >>> pr.remove_jobs(recursive=True, silently=True) - >>> pr.remove(enable=True) - - """ - + _WARNINGS_STRING - ) - if sys.version_info < (3, 11): - raise NotImplementedError("Node jobs are only available in python 3.11+") - - job = project.wrap_python_function(_run_node) - job.input["node"] = node - return job diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index 7ce22b77..982d8d40 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -6,7 +6,7 @@ from pyiron_base import Project from pyiron_workflow import Workflow from pyiron_workflow.channels import NOT_DATA -from pyiron_workflow.job import create_job_with_python_wrapper +import pyiron_workflow.job # To get the job classes registered @Workflow.wrap_as.single_value_node("t") @@ -244,92 +244,3 @@ def test_bad_workflow(self): self.make_a_job_from_node(has_wd_wf) finally: has_wd_wf.working_directory.delete() - - -class TestWrapperFunction(_WithAJob): - def make_a_job_from_node(self, node): - return create_job_with_python_wrapper(self.pr, node) - - @unittest.skipIf(sys.version_info >= (3, 11), "Storage should only work in 3.11+") - def test_clean_failure(self): - with self.assertRaises( - NotImplementedError, - msg="Storage, and therefore node jobs, are only available in python 3.11+, " - "so we should fail hard and clean here" - ): - node = Workflow.create.standard.UserInput(42) - self.make_a_job_from_node(node) - - @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") - def test_modal(self): - modal_wf = Workflow("modal_wf") - modal_wf.sleep = Sleep(0) - modal_wf.out = modal_wf.create.standard.UserInput(modal_wf.sleep) - nj = self.make_a_job_from_node(modal_wf) - - nj.run() - self.assertTrue( - nj.status.finished, - msg="The interpreter should not release until the job is done" - ) - self.assertEqual( - 0, - nj.output["result"].outputs.out__user_input.value, - msg="The node should have run, and since it's modal there's no need to " - "update the instance" - ) - - lj = self.pr.load(nj.job_name) - self.assertIsNot( - lj, - nj, - msg="The loaded job should be a new instance." - ) - self.assertEqual( - nj.output["result"].outputs.out__user_input.value, - lj.output["result"].outputs.out__user_input.value, - msg="The loaded job should still have all the same values" - ) - - @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") - def test_nonmodal(self): - nonmodal_node = Workflow("non_modal") - nonmodal_node.out = Workflow.create.standard.UserInput(42) - - nj = self.make_a_job_from_node(nonmodal_node) - nj.run(run_mode="non_modal") - self.assertFalse( - nj.status.finished, - msg=f"The local process should released immediately per non-modal " - f"style, but got status {nj.status}" - ) - while not nj.status.finished: - sleep(0.1) - self.assertTrue( - nj.status.finished, - msg="The job status should update on completion" - ) - with self.assertRaises( - KeyError, - msg="As usual with remote processes, we expect to require a data read " - "before the local instance reflects its new state." - ): - nj.output["result"] - - lj = self.pr.load(nj.job_name) - self.assertEqual( - 42, - lj.output["result"].outputs.out__user_input.value, - msg="The loaded job should have the finished values" - ) - - @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") - def test_node(self): - node = Workflow.create.standard.UserInput(42) - nj = self.make_a_job_from_node(node) - with self.assertRaises( - AttributeError, - msg="The wrapping routine doesn't interact well with getattr overrides on " - "node state elements (output data)" - ): - nj.run() From 17e3d162add6e03904118f87720afc55bebb54f0 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 20 Feb 2024 11:56:25 -0800 Subject: [PATCH 4/5] Add version controls I suspect what's happening is `DataContainer` is leaning on `h5io_browser`'s `use_state` defaults, but I won't dig into it now. --- pyiron_workflow/job.py | 2 ++ tests/unit/test_job.py | 15 +++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/pyiron_workflow/job.py b/pyiron_workflow/job.py index bdfd6760..6a5814b6 100644 --- a/pyiron_workflow/job.py +++ b/pyiron_workflow/job.py @@ -74,6 +74,8 @@ class NodeOutputJob(PythonFunctionContainerJob): bets are off. """ def __init__(self, project, job_name): + if sys.version_info < (3, 11): + raise NotImplementedError("Node jobs are only available in python 3.11+") super().__init__(project, job_name) self._function = _node_out self.input.update(get_function_parameter_dict(funct=_node_out)) diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index 982d8d40..3ff24fae 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -34,6 +34,17 @@ def make_a_job_from_node(self, node): job.input["node"] = node return job + @unittest.skipIf(sys.version_info >= (3, 11), "Storage should only work in 3.11+") + def test_clean_failure(self): + with self.assertRaises( + NotImplementedError, + msg="Storage, and therefore node jobs, are only available in python 3.11+, " + "so we should fail hard and clean here" + ): + node = Workflow.create.standard.UserInput(42) + self.make_a_job_from_node(node) + + @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") def test_node(self): node = Workflow.create.standard.UserInput(42) nj = self.make_a_job_from_node(node) @@ -44,6 +55,7 @@ def test_node(self): msg="A single node should run just as well as a workflow" ) + @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") def test_modal(self): modal_wf = Workflow("modal_wf") modal_wf.sleep = Sleep(0) @@ -74,6 +86,7 @@ def test_modal(self): msg="The loaded job should still have all the same values" ) + @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") def test_nonmodal(self): nonmodal_node = Workflow("non_modal") nonmodal_node.out = Workflow.create.standard.UserInput(42) @@ -104,6 +117,7 @@ def test_nonmodal(self): msg="The loaded job should have the finished values" ) + @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") def test_bad_input(self): with self.subTest("Not a node"): nj = self.pr.create.job.NodeOutputJob("will_fail") @@ -119,6 +133,7 @@ def test_bad_input(self): with self.assertRaises(ValueError, msg="The input is not ready"): nj.run() + @unittest.skipIf(sys.version_info < (3, 11), "Storage will only work in 3.11+") def test_unloadable(self): @Workflow.wrap_as.single_value_node("y") def not_importable_directy_from_module(x): From 8a84f6050d5ce3852114a420e0af7f7f809a2f3b Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Tue, 20 Feb 2024 20:03:33 +0000 Subject: [PATCH 5/5] Format black --- pyiron_workflow/channels.py | 1 + pyiron_workflow/job.py | 14 ++++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pyiron_workflow/channels.py b/pyiron_workflow/channels.py index 611c546e..890f37fc 100644 --- a/pyiron_workflow/channels.py +++ b/pyiron_workflow/channels.py @@ -601,6 +601,7 @@ def _node_injection(self, injection_class, *args, inject_self=True): def __getattr__(self, name): from pyiron_workflow.node_library.standard import GetAttr + if name == "to_hdf": raise AttributeError( "This is just a failsafe to protect us against other elements of the " diff --git a/pyiron_workflow/job.py b/pyiron_workflow/job.py index 6a5814b6..8b8c99a4 100644 --- a/pyiron_workflow/job.py +++ b/pyiron_workflow/job.py @@ -26,7 +26,8 @@ from pyiron_base import TemplateJob, JOB_CLASS_DICT from pyiron_base.jobs.flex.pythonfunctioncontainer import ( - PythonFunctionContainerJob, get_function_parameter_dict + PythonFunctionContainerJob, + get_function_parameter_dict, ) from pyiron_workflow.node import Node from h5io._h5io import _import_class @@ -73,6 +74,7 @@ class NodeOutputJob(PythonFunctionContainerJob): be redefined. If node definitions get changed between saving and loading, all bets are off. """ + def __init__(self, project, job_name): if sys.version_info < (3, 11): raise NotImplementedError("Node jobs are only available in python 3.11+") @@ -131,25 +133,25 @@ class StoredNodeJob(TemplateJob): It leans directly on the storage capabilities of the node itself, except for the node class and name, and the storage backend mode, all of which are held in the - traditional job input. (WARNING: This might be fragile to adjusting the storage + traditional job input. (WARNING: This might be fragile to adjusting the storage backend on the node _after_ the node has been assign to the job.) The job provides direct access to its owned node (as both input and output) on the :attr:`node` attribute. The only requirement is that the node have an untouched working directory (so we can make sure its files get stored _inside_ the job's - directory tree), and that it be saveable (not all objects work with the "h5io" + directory tree), and that it be saveable (not all objects work with the "h5io" storage backend, e.g. `ase.Calculator` objects may break it). Examples: >>> from pyiron_base import Project >>> from pyiron_workflow import Workflow >>> import pyiron_workflow.job # To get the job registered in JOB_CLASS_DICT - >>> + >>> >>> wf = Workflow("pyiron_node", overwrite_save=True) >>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes - >>> + >>> >>> pr = Project("test") - >>> + >>> >>> nj = pr.create.job.StoredNodeJob("my_node") >>> nj.node = wf >>> nj.run() # doctest:+ELLIPSIS