diff --git a/zetta_utils/cli/main.py b/zetta_utils/cli/main.py index 04cfc41fb..9ccd01c7b 100644 --- a/zetta_utils/cli/main.py +++ b/zetta_utils/cli/main.py @@ -7,7 +7,7 @@ import zetta_utils from zetta_utils import log from zetta_utils.parsing import json -from zetta_utils.run import run_ctx_manager +from zetta_utils.run import record_run, run_ctx_manager logger = log.get_logger("zetta_utils") @@ -125,6 +125,7 @@ def run( if heartbeat is False: run_ctx = run_ctx_manager(run_id=run_id, heartbeat_interval=-1) with run_ctx: + record_run(path) result = zetta_utils.builder.build(spec, parallel=parallel_builder) logger.debug(f"Outcome: {pprint.pformat(result, indent=4)}") if pdb: diff --git a/zetta_utils/run/__init__.py b/zetta_utils/run/__init__.py index 4e024c71b..9c580b536 100644 --- a/zetta_utils/run/__init__.py +++ b/zetta_utils/run/__init__.py @@ -6,6 +6,8 @@ from typing import Optional import attrs +import fsspec +from cloudfiles import paths from zetta_utils import log from zetta_utils.common import RepeatTimer @@ -54,6 +56,28 @@ def register_clusters(clusters: list) -> None: # pragma: no cover _update_run_info(info) +def record_run(spec_path: str | None = None) -> None: # pragma: no cover + """ + Records run info in a bucket for archiving. + """ + zetta_user = os.environ["ZETTA_USER"] + info_path = os.environ.get("RUN_INFO_BUCKET", RUN_INFO_BUCKET) + info_path_user = os.path.join(info_path, zetta_user) + run_info = { + "zetta_user": zetta_user, + "zetta_project": os.environ["ZETTA_PROJECT"], + "json_spec": json.loads(os.environ["ZETTA_RUN_SPEC"]), + } + with fsspec.open(os.path.join(info_path_user, f"{RUN_ID}.json"), "w") as f: + json.dump(run_info, f, indent=2) + + if spec_path is not None: + with open(spec_path, "r", encoding="utf-8") as src: + content = src.read() + with fsspec.open(os.path.join(info_path_user, f"{RUN_ID}.cue"), "w") as dst: + dst.write(content) + + def _update_run_info(info: DBRowDataT) -> None: # pragma: no cover row_key = f"run-{RUN_ID}" col_keys = tuple(info.keys())