Skip to content

Commit

Permalink
Save and load output with file cache
Browse files Browse the repository at this point in the history
Signed-off-by: liamhuber <[email protected]>
  • Loading branch information
liamhuber committed Feb 17, 2025
1 parent 7841bff commit 025dd5a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
72 changes: 71 additions & 1 deletion pyiron_workflow/nodes/static_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import pathlib
from abc import ABC
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Any, Literal

import bagofholding as boh
from pandas import DataFrame
from pyiron_snippets.colors import SeabornColors

Expand All @@ -17,6 +18,8 @@
from pyiron_workflow.node import Node

if TYPE_CHECKING:
from concurrent import futures

from pyiron_workflow.nodes.composite import Composite
from pyiron_workflow.storage import StorageInterface

Expand Down Expand Up @@ -88,6 +91,73 @@ def inputs(self) -> Inputs:
def outputs(self) -> OutputsWithInjection:
return self._outputs

def _before_run(
self,
/,
check_readiness: bool,
run_data_tree: bool,
run_parent_trees_too: bool,
fetch_input: bool,
emit_ran_signal: bool,
**kwargs: Any,
) -> tuple[bool, Any]:
early_stopping, result = super()._before_run(
check_readiness=check_readiness,
run_data_tree=run_data_tree,
run_parent_trees_too=run_parent_trees_too,
fetch_input=fetch_input,
emit_ran_signal=emit_ran_signal,
)
if early_stopping:
return early_stopping, result

file_cached_output = self._read_file_cache()
if file_cached_output is not None:
for k, v in file_cached_output.items():
self.outputs[k]._value = v
return self._return_existing_result(emit_ran_signal)

return False, None

def _read_file_cache(self) -> dict[str, Any] | None:
if self.file_cache is not None:
from pyiron_database.instance_database.node import get_hash

hash_ = get_hash(self)
try:
return boh.ClassH5Bag(self.file_cache.joinpath(hash_)).load()
except FileNotFoundError:
return None
return None

def _finish_run(
self,
run_output: tuple | futures.Future,
/,
raise_run_exceptions: bool,
run_exception_kwargs: dict,
run_finally_kwargs: dict,
**kwargs,
) -> Any | tuple | None:
result = super()._finish_run(
run_output,
raise_run_exceptions=raise_run_exceptions,
run_exception_kwargs=run_exception_kwargs,
run_finally_kwargs=run_finally_kwargs,
**kwargs,
)
if self.file_cache is not None:
self._save_output_to_file_cache()
return result

def _save_output_to_file_cache(self):
from pyiron_database.instance_database.node import get_hash

hash_ = get_hash(self)
boh.ClassH5Bag.save(
self.outputs.to_value_dict(), self.file_cache.joinpath(hash_)
)

@classmethod
def for_node(
cls,
Expand Down
1 change: 1 addition & 0 deletions pyiron_workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def _before_run(
run_parent_trees_too: bool,
fetch_input: bool,
emit_ran_signal: bool,
**kwargs: Any,
) -> tuple[bool, Any]:
if self.automate_execution:
self.set_run_signals_to_dag_execution()
Expand Down

0 comments on commit 025dd5a

Please sign in to comment.