diff --git a/zetta_utils/api/v0.py b/zetta_utils/api/v0.py index 9d1c7832b..326190485 100644 --- a/zetta_utils/api/v0.py +++ b/zetta_utils/api/v0.py @@ -71,6 +71,7 @@ service_ctx_manager, ) from zetta_utils.common.ctx_managers import set_env_ctx_mngr +from zetta_utils.common.misc import get_unique_id from zetta_utils.common.partial import ComparablePartial from zetta_utils.common.path import abspath from zetta_utils.common.pprint import lrpad, lrpadprint, utcnow_ISO8601 @@ -227,11 +228,7 @@ flow_schema_cls, sequential_flow, ) -from zetta_utils.mazepa.id_generation import ( - generate_invocation_id, - get_literal_id_fn, - get_unique_id, -) +from zetta_utils.mazepa.id_generation import generate_invocation_id, get_literal_id_fn from zetta_utils.mazepa.progress_tracker import ( ProgressUpdateFN, get_confirm_sigint_fn, diff --git a/zetta_utils/common/__init__.py b/zetta_utils/common/__init__.py index 1150a881f..9d240bf53 100644 --- a/zetta_utils/common/__init__.py +++ b/zetta_utils/common/__init__.py @@ -6,7 +6,7 @@ from . import user_input from .user_input import get_user_input, get_user_confirmation - +from .misc import get_unique_id from .path import abspath, is_local from .pprint import lrpad from .signal_handlers import custom_signal_handler_ctx diff --git a/zetta_utils/common/misc.py b/zetta_utils/common/misc.py new file mode 100644 index 000000000..d684807e9 --- /dev/null +++ b/zetta_utils/common/misc.py @@ -0,0 +1,25 @@ +import uuid +from typing import Optional + +from coolname import generate_slug + + +def get_unique_id( + slug_len: int = 3, + add_uuid: bool = True, + prefix: Optional[str] = None, + max_len: int = 320, +) -> str: # pragma: no cover + slug = generate_slug(slug_len) + if prefix is not None: + result = f"{prefix}-{slug}" + else: + result = f"{slug}" + + if add_uuid: + unique_id = str(uuid.uuid1()) + result += unique_id + result = result[:max_len] + while not result[-1].isalpha(): + result = result[:-1] + return result diff --git a/zetta_utils/constants.py b/zetta_utils/constants.py index ec7e5822f..d1524af29 100644 --- a/zetta_utils/constants.py +++ b/zetta_utils/constants.py @@ -1,4 +1,5 @@ """Common project-wide constants.""" DEFAULT_PROJECT = "zetta-research" +DEFAULT_FIRESTORE_DB = "zetta-utils" RUN_DATABASE: str | None = "run-db" diff --git a/zetta_utils/layer/db_layer/datastore/__init__.py b/zetta_utils/layer/db_layer/datastore/__init__.py index b12698026..4b6f60272 100644 --- a/zetta_utils/layer/db_layer/datastore/__init__.py +++ b/zetta_utils/layer/db_layer/datastore/__init__.py @@ -1,2 +1,3 @@ from .backend import DatastoreBackend from .build import build_datastore_layer +from .temp_layer_ctx import temp_datastore_layer_ctx diff --git a/zetta_utils/layer/db_layer/datastore/temp_layer_ctx.py b/zetta_utils/layer/db_layer/datastore/temp_layer_ctx.py new file mode 100644 index 000000000..ccd8e1e05 --- /dev/null +++ b/zetta_utils/layer/db_layer/datastore/temp_layer_ctx.py @@ -0,0 +1,18 @@ +from contextlib import contextmanager + +from zetta_utils.common import get_unique_id +from zetta_utils.constants import DEFAULT_PROJECT + +from .build import build_datastore_layer + + +@contextmanager +def temp_datastore_layer_ctx( + prefix: str | None = None, project: str = DEFAULT_PROJECT +): # pragma: no cover # pure delegation + db_layer = build_datastore_layer( + namespace=get_unique_id(prefix=prefix, add_uuid=False), project=project + ) + yield db_layer + # TODO: Clear in batches to not go over the datastore limit + db_layer.backend.clear() diff --git a/zetta_utils/layer/db_layer/firestore/__init__.py b/zetta_utils/layer/db_layer/firestore/__init__.py index bb2a14d53..46c820ce6 100644 --- a/zetta_utils/layer/db_layer/firestore/__init__.py +++ b/zetta_utils/layer/db_layer/firestore/__init__.py @@ -1,2 +1,3 @@ from .backend import FirestoreBackend from .build import build_firestore_layer +from .temp_layer_ctx import temp_firestore_layer_ctx diff --git a/zetta_utils/layer/db_layer/firestore/temp_layer_ctx.py b/zetta_utils/layer/db_layer/firestore/temp_layer_ctx.py new file mode 100644 index 000000000..3b6f4f4f6 --- /dev/null +++ b/zetta_utils/layer/db_layer/firestore/temp_layer_ctx.py @@ -0,0 +1,17 @@ +from contextlib import contextmanager + +from zetta_utils.common import get_unique_id +from zetta_utils.constants import DEFAULT_FIRESTORE_DB, DEFAULT_PROJECT + +from .build import build_firestore_layer + + +@contextmanager +def temp_firestore_layer_ctx( + prefix: str | None = None, database=DEFAULT_FIRESTORE_DB, project: str = DEFAULT_PROJECT +): # pragma: no cover # pure delegation + db_layer = build_firestore_layer( + collection=get_unique_id(prefix=prefix, add_uuid=False), database=database, project=project + ) + yield db_layer + db_layer.backend.clear() diff --git a/zetta_utils/mazepa/execution.py b/zetta_utils/mazepa/execution.py index 721e16089..27809b26d 100644 --- a/zetta_utils/mazepa/execution.py +++ b/zetta_utils/mazepa/execution.py @@ -12,14 +12,13 @@ from typeguard import typechecked from zetta_utils import log -from zetta_utils.common import ComparablePartial +from zetta_utils.common import ComparablePartial, get_unique_id from zetta_utils.mazepa.autoexecute_task_queue import AutoexecuteTaskQueue from zetta_utils.message_queues.base import PullMessageQueue, PushMessageQueue from . import Flow, Task, dryrun, sequential_flow from .execution_checkpoint import EXECUTION_CHECKPOINT_PATH, record_execution_checkpoint from .execution_state import ExecutionState, InMemoryExecutionState -from .id_generation import get_unique_id from .progress_tracker import progress_ctx_mngr from .task_outcome import OutcomeReport, TaskStatus from .tasks import _TaskableOperation diff --git a/zetta_utils/mazepa/id_generation.py b/zetta_utils/mazepa/id_generation.py index 62884393a..d83cb30de 100644 --- a/zetta_utils/mazepa/id_generation.py +++ b/zetta_utils/mazepa/id_generation.py @@ -6,35 +6,12 @@ import dill import xxhash -from coolname import generate_slug from zetta_utils import log logger = log.get_logger("mazepa") -def get_unique_id( - slug_len: int = 3, - add_uuid: bool = True, - prefix: Optional[str] = None, - max_len: int = 320, -) -> str: # pragma: no cover - slug = generate_slug(slug_len) - - if prefix is not None: - result = f"{prefix}-{slug}" - else: - result = f"{slug}" - - if add_uuid: - unique_id = str(uuid.uuid1()) - result += unique_id - result = result[:max_len] - while not result[-1].isalpha(): - result = result[:-1] - return result - - def generate_invocation_id( fn: Optional[Callable] = None, args: Optional[list] = None, diff --git a/zetta_utils/run/__init__.py b/zetta_utils/run/__init__.py index c64d2a177..2dcebd90a 100644 --- a/zetta_utils/run/__init__.py +++ b/zetta_utils/run/__init__.py @@ -12,10 +12,9 @@ from gcsfs import GCSFileSystem from zetta_utils import constants, log -from zetta_utils.common import RepeatTimer +from zetta_utils.common import RepeatTimer, get_unique_id from zetta_utils.layer.db_layer import DBRowDataT from zetta_utils.layer.db_layer.firestore import build_firestore_layer -from zetta_utils.mazepa import id_generation from zetta_utils.parsing import json from .resource import ( @@ -123,7 +122,7 @@ def _send_heartbeat(): update_run_info(RUN_ID, info) if run_id is None: - run_id = id_generation.get_unique_id(slug_len=4, add_uuid=False, max_len=50) + run_id = get_unique_id(slug_len=4, add_uuid=False, max_len=50) global RUN_ID # pylint: disable=global-statement RUN_ID = run_id