Skip to content

Commit

Permalink
Merge pull request #222 from pyiron/node_output_job
Browse files Browse the repository at this point in the history
[breaking] Replace the wrapper job with a more robust job subclass
  • Loading branch information
liamhuber authored Feb 20, 2024
2 parents a6efda7 + 8a84f60 commit ab25d67
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 126 deletions.
7 changes: 7 additions & 0 deletions pyiron_workflow/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,13 @@ def _node_injection(self, injection_class, *args, inject_self=True):
def __getattr__(self, name):
from pyiron_workflow.node_library.standard import GetAttr

if name == "to_hdf":
raise AttributeError(
"This is just a failsafe to protect us against other elements of the "
"pyiron ecosystem (pyiron_base's DataContainer) running a "
"`hasattr('to_hdf')` check on us and accidentally injecting a new "
"getattr node."
)
return self._node_injection(GetAttr, name)

def __getitem__(self, item):
Expand Down
197 changes: 117 additions & 80 deletions pyiron_workflow/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
Two approaches are provided while we work out which is more convenient and how some
edge cases may be handled differently:
- A direct sub-class of :class:`TemplateJob`, created using the usual job creation.
- A helper method for using :meth:`Project.wrap_python_function`.
The wrapper function appears to be slightly slower (presumably because of the extra
layer of serialization).
- `NodeOutputJob(PythonFunctionContainerJob)`, which gets serialized like any other
wrapped python code.
- `StoredNodeJob(TemplateJob)`, which uses the node's underlying storage capabilities.
The intent of this module is to provide immediate access to pyiron's queue submission
functionality, while in the long run this should be integrated more directly with the
Expand All @@ -22,56 +20,138 @@

from __future__ import annotations

import inspect
import os
import sys

from pyiron_base import TemplateJob, JOB_CLASS_DICT
from pyiron_base.jobs.flex.pythonfunctioncontainer import (
PythonFunctionContainerJob,
get_function_parameter_dict,
)
from pyiron_workflow.node import Node
from h5io._h5io import _import_class

_WARNINGS_STRING = """

def _node_out(node):
node.run()
return node.outputs.to_value_dict()


class NodeOutputJob(PythonFunctionContainerJob):
"""
A `PythonFunctionContainer` node that explicitly runs a node input and returns its
output value dictionary.
Examples:
>>> from pyiron_base import Project
>>> from pyiron_workflow import Workflow
>>> import pyiron_workflow.job # To get the job registered in JOB_CLASS_DICT
>>>
>>> wf = Workflow("pyiron_node", overwrite_save=True)
>>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes
>>>
>>> pr = Project("test")
>>>
>>> nj = pr.create.job.NodeOutputJob("my_node")
>>> nj.input["node"] = wf
>>> nj.run() # doctest:+ELLIPSIS
The job ...
>>> print(nj.output)
DataContainer({'answer__user_input': 42})
>>> lj = pr.load(nj.job_name)
>>> print(nj.output)
DataContainer({'answer__user_input': 42})
>>> pr.remove_jobs(recursive=True, silently=True)
>>> pr.remove(enable=True)
Warnings:
Node jobs rely on storing the node to file, which means these are also only
available for python >= 3.11.
The job can be run with `run_mode="non_modal"`, but _only_ if all the nodes
being run are defined in an importable file location -- i.e. copying and
pasting the example above into a jupyter notebook works fine in modal mode, but
will throw an exception if you try to run it non-modally.
All submitted nodes must be importable from their module at load time, or
loading will fail. This means node definitions can't be nested inside another
object, and any nodes defined in `__main__` (e.g. in a jupyter notebook) must
be redefined. If node definitions get changed between saving and loading, all
bets are off.
"""

This hasn't been tested for running on a remote queue. It should work, but it's
_possible_ the same requirement from non-modal mode (importable nodes) will
apply.
"""
def __init__(self, project, job_name):
if sys.version_info < (3, 11):
raise NotImplementedError("Node jobs are only available in python 3.11+")
super().__init__(project, job_name)
self._function = _node_out
self.input.update(get_function_parameter_dict(funct=_node_out))
self._executor_type = None

@property
def python_function(self):
return self._function

@python_function.setter
def python_function(self, funct):
raise NotImplementedError(
f"{self.__class__.__name__}'s python function is to run the node and get "
f"its output values, and this may not be overridden."
)

def validate_ready_to_run(self):
if not isinstance(self.input["node"], Node):
raise TypeError(f"'node' input must be of type {Node.__name__}")
elif not self.input["node"].ready:
nl = "\n"
raise ValueError(
f"Node not ready:{nl}{self.input['node'].readiness_report}"
)

def run_static(self):
# Overrides the parent method
# Copy and paste except for the output update, which makes sure the output is
# flat and not tested beneath "result"
if (
self._executor_type is not None
and "executor" in inspect.signature(self._function).parameters.keys()
):
input_dict = self.input.to_builtin()
del input_dict["executor"]
output = self._function(
**input_dict, executor=self._get_executor(max_workers=self.server.cores)
)
else:
output = self._function(**self.input.to_builtin())
self.output.update(output) # DIFFERS FROM PARENT METHOD
self.to_hdf()
self.status.finished = True


JOB_CLASS_DICT[NodeOutputJob.__name__] = NodeOutputJob.__module__


class StoredNodeJob(TemplateJob):
__doc__ = (
"""
"""
This job is an intermediate feature for accessing pyiron's queue submission
infrastructure for nodes (function nodes, macros, or entire workflows).
It leans directly on the storage capabilities of the node itself, except for
the node class and name, and the storage backend mode, all of which are held in the
traditional job input. (WARNING: This might be fragile to adjusting the storage
traditional job input. (WARNING: This might be fragile to adjusting the storage
backend on the node _after_ the node has been assign to the job.)
The job provides direct access to its owned node (as both input and output) on the
:attr:`node` attribute. The only requirement is that the node have an untouched
working directory (so we can make sure its files get stored _inside_ the job's
directory tree), and that it be saveable (not all objects work with the "h5io"
directory tree), and that it be saveable (not all objects work with the "h5io"
storage backend, e.g. `ase.Calculator` objects may break it).
Examples:
>>> from pyiron_base import Project
>>> from pyiron_workflow import Workflow
>>> import pyiron_workflow.job # To get the job registered in JOB_CLASS_DICT
>>>
>>>
>>> wf = Workflow("pyiron_node", overwrite_save=True)
>>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes
>>>
>>>
>>> pr = Project("test")
>>>
>>>
>>> nj = pr.create.job.StoredNodeJob("my_node")
>>> nj.node = wf
>>> nj.run() # doctest:+ELLIPSIS
Expand All @@ -86,9 +166,20 @@ class StoredNodeJob(TemplateJob):
>>> pr.remove_jobs(recursive=True, silently=True)
>>> pr.remove(enable=True)
Warnings:
Node jobs rely on storing the node to file, which means these are also only
available for python >= 3.11.
The job can be run with `run_mode="non_modal"`, but _only_ if all the nodes
being run are defined in an importable file location -- i.e. copying and
pasting the example above into a jupyter notebook works fine in modal mode, but
will throw an exception if you try to run it non-modally.
This hasn't been tested for running on a remote queue. It should work, but it's
_possible_ the same requirement from non-modal mode (importable nodes) will
apply.
"""
+ _WARNINGS_STRING
)

def __init__(self, project, job_name):
if sys.version_info < (3, 11):
Expand Down Expand Up @@ -166,57 +257,3 @@ def run_static(self):


JOB_CLASS_DICT[StoredNodeJob.__name__] = StoredNodeJob.__module__


def _run_node(node):
node.run()
return node


def create_job_with_python_wrapper(project, node):
__doc__ = (
"""
A convenience wrapper around :meth:`pyiron_base.Project.wrap_python_function` for
running a `pyiron_workflow.Workflow`. (And _only_ workflows, `Function` and `Macro`
children will fail.)
Args:
project (pyiron_base.Project): A pyiron project.
node (pyiron_workflow.node.Node): The node to run.
Returns:
(pyiron_base.jobs.flex.pythonfunctioncontainer.PythonFunctionContainerJob):
A job which wraps a function for running the node, with the `"node"` input
pre-populated with the provided node.
Examples:
>>> from pyiron_base import Project
>>> from pyiron_workflow import Workflow
>>> from pyiron_workflow.job import create_job_with_python_wrapper
>>>
>>> wf = Workflow("pyiron_node", overwrite_save=True)
>>> wf.answer = Workflow.create.standard.UserInput(42) # Or your nodes
>>>
>>> pr = Project("test")
>>>
>>> nj = create_job_with_python_wrapper(pr, wf)
>>> nj.run()
>>> print(nj.output["result"].outputs.to_value_dict())
{'answer__user_input': 42}
>>> lj = pr.load(nj.job_name)
>>> print(nj.output["result"].outputs.to_value_dict())
{'answer__user_input': 42}
>>> pr.remove_jobs(recursive=True, silently=True)
>>> pr.remove(enable=True)
"""
+ _WARNINGS_STRING
)
if sys.version_info < (3, 11):
raise NotImplementedError("Node jobs are only available in python 3.11+")

job = project.wrap_python_function(_run_node)
job.input["node"] = node
return job
Loading

0 comments on commit ab25d67

Please sign in to comment.