Skip to content

Commit

Permalink
feat: record_run json and cue files
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Jan 21, 2024
1 parent ac54e99 commit 25b6f9b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
3 changes: 2 additions & 1 deletion zetta_utils/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import zetta_utils
from zetta_utils import log
from zetta_utils.parsing import json
from zetta_utils.run import run_ctx_manager
from zetta_utils.run import record_run, run_ctx_manager

logger = log.get_logger("zetta_utils")

Expand Down Expand Up @@ -125,6 +125,7 @@ def run(
if heartbeat is False:
run_ctx = run_ctx_manager(run_id=run_id, heartbeat_interval=-1)
with run_ctx:
record_run(path)
result = zetta_utils.builder.build(spec, parallel=parallel_builder)
logger.debug(f"Outcome: {pprint.pformat(result, indent=4)}")
if pdb:
Expand Down
24 changes: 24 additions & 0 deletions zetta_utils/run/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from typing import Optional

import attrs
import fsspec
from cloudfiles import paths

from zetta_utils import log
from zetta_utils.common import RepeatTimer
Expand Down Expand Up @@ -54,6 +56,28 @@ def register_clusters(clusters: list) -> None: # pragma: no cover
_update_run_info(info)


def record_run(spec_path: str | None = None) -> None: # pragma: no cover
"""
Records run info in a bucket for archiving.
"""
zetta_user = os.environ["ZETTA_USER"]
info_path = os.environ.get("RUN_INFO_BUCKET", RUN_INFO_BUCKET)
info_path_user = os.path.join(info_path, zetta_user)
run_info = {
"zetta_user": zetta_user,
"zetta_project": os.environ["ZETTA_PROJECT"],
"json_spec": json.loads(os.environ["ZETTA_RUN_SPEC"]),
}
with fsspec.open(os.path.join(info_path_user, f"{RUN_ID}.json"), "w") as f:
json.dump(run_info, f, indent=2)

if spec_path is not None:
with open(spec_path, "r", encoding="utf-8") as src:
content = src.read()
with fsspec.open(os.path.join(info_path_user, f"{RUN_ID}.cue"), "w") as dst:
dst.write(content)


def _update_run_info(info: DBRowDataT) -> None: # pragma: no cover
row_key = f"run-{RUN_ID}"
col_keys = tuple(info.keys())
Expand Down

0 comments on commit 25b6f9b

Please sign in to comment.