Skip to content

Commit

Permalink
feat: contexts for temporary DBLayers
Browse files Browse the repository at this point in the history
  • Loading branch information
dodamih authored and supersergiy committed Feb 21, 2025
1 parent a6bcbae commit 4b92875
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 34 deletions.
7 changes: 2 additions & 5 deletions zetta_utils/api/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
service_ctx_manager,
)
from zetta_utils.common.ctx_managers import set_env_ctx_mngr
from zetta_utils.common.misc import get_unique_id
from zetta_utils.common.partial import ComparablePartial
from zetta_utils.common.path import abspath
from zetta_utils.common.pprint import lrpad, lrpadprint, utcnow_ISO8601
Expand Down Expand Up @@ -227,11 +228,7 @@
flow_schema_cls,
sequential_flow,
)
from zetta_utils.mazepa.id_generation import (
generate_invocation_id,
get_literal_id_fn,
get_unique_id,
)
from zetta_utils.mazepa.id_generation import generate_invocation_id, get_literal_id_fn
from zetta_utils.mazepa.progress_tracker import (
ProgressUpdateFN,
get_confirm_sigint_fn,
Expand Down
2 changes: 1 addition & 1 deletion zetta_utils/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import user_input
from .user_input import get_user_input, get_user_confirmation

from .misc import get_unique_id
from .path import abspath, is_local
from .pprint import lrpad
from .signal_handlers import custom_signal_handler_ctx
Expand Down
25 changes: 25 additions & 0 deletions zetta_utils/common/misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import uuid
from typing import Optional

from coolname import generate_slug


def get_unique_id(
slug_len: int = 3,
add_uuid: bool = True,
prefix: Optional[str] = None,
max_len: int = 320,
) -> str: # pragma: no cover
slug = generate_slug(slug_len)
if prefix is not None:
result = f"{prefix}-{slug}"
else:
result = f"{slug}"

if add_uuid:
unique_id = str(uuid.uuid1())
result += unique_id
result = result[:max_len]
while not result[-1].isalpha():
result = result[:-1]
return result
1 change: 1 addition & 0 deletions zetta_utils/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Common project-wide constants."""

DEFAULT_PROJECT = "zetta-research"
DEFAULT_FIRESTORE_DB = "zetta-utils"
RUN_DATABASE: str | None = "run-db"
1 change: 1 addition & 0 deletions zetta_utils/layer/db_layer/datastore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .backend import DatastoreBackend
from .build import build_datastore_layer
from .temp_layer_ctx import temp_datastore_layer_ctx
18 changes: 18 additions & 0 deletions zetta_utils/layer/db_layer/datastore/temp_layer_ctx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from contextlib import contextmanager

from zetta_utils.common import get_unique_id
from zetta_utils.constants import DEFAULT_PROJECT

from .build import build_datastore_layer


@contextmanager
def temp_datastore_layer_ctx(
prefix: str | None = None, project: str = DEFAULT_PROJECT
): # pragma: no cover # pure delegation
db_layer = build_datastore_layer(
namespace=get_unique_id(prefix=prefix, add_uuid=False), project=project
)
yield db_layer
# TODO: Clear in batches to not go over the datastore limit
db_layer.backend.clear()
1 change: 1 addition & 0 deletions zetta_utils/layer/db_layer/firestore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .backend import FirestoreBackend
from .build import build_firestore_layer
from .temp_layer_ctx import temp_firestore_layer_ctx
17 changes: 17 additions & 0 deletions zetta_utils/layer/db_layer/firestore/temp_layer_ctx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from contextlib import contextmanager

from zetta_utils.common import get_unique_id
from zetta_utils.constants import DEFAULT_FIRESTORE_DB, DEFAULT_PROJECT

from .build import build_firestore_layer


@contextmanager
def temp_firestore_layer_ctx(
prefix: str | None = None, database=DEFAULT_FIRESTORE_DB, project: str = DEFAULT_PROJECT
): # pragma: no cover # pure delegation
db_layer = build_firestore_layer(
collection=get_unique_id(prefix=prefix, add_uuid=False), database=database, project=project
)
yield db_layer
db_layer.backend.clear()
3 changes: 1 addition & 2 deletions zetta_utils/mazepa/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
from typeguard import typechecked

from zetta_utils import log
from zetta_utils.common import ComparablePartial
from zetta_utils.common import ComparablePartial, get_unique_id
from zetta_utils.mazepa.autoexecute_task_queue import AutoexecuteTaskQueue
from zetta_utils.message_queues.base import PullMessageQueue, PushMessageQueue

from . import Flow, Task, dryrun, sequential_flow
from .execution_checkpoint import EXECUTION_CHECKPOINT_PATH, record_execution_checkpoint
from .execution_state import ExecutionState, InMemoryExecutionState
from .id_generation import get_unique_id
from .progress_tracker import progress_ctx_mngr
from .task_outcome import OutcomeReport, TaskStatus
from .tasks import _TaskableOperation
Expand Down
23 changes: 0 additions & 23 deletions zetta_utils/mazepa/id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,12 @@

import dill
import xxhash
from coolname import generate_slug

from zetta_utils import log

logger = log.get_logger("mazepa")


def get_unique_id(
slug_len: int = 3,
add_uuid: bool = True,
prefix: Optional[str] = None,
max_len: int = 320,
) -> str: # pragma: no cover
slug = generate_slug(slug_len)

if prefix is not None:
result = f"{prefix}-{slug}"
else:
result = f"{slug}"

if add_uuid:
unique_id = str(uuid.uuid1())
result += unique_id
result = result[:max_len]
while not result[-1].isalpha():
result = result[:-1]
return result


def generate_invocation_id(
fn: Optional[Callable] = None,
args: Optional[list] = None,
Expand Down
5 changes: 2 additions & 3 deletions zetta_utils/run/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
from gcsfs import GCSFileSystem

from zetta_utils import constants, log
from zetta_utils.common import RepeatTimer
from zetta_utils.common import RepeatTimer, get_unique_id
from zetta_utils.layer.db_layer import DBRowDataT
from zetta_utils.layer.db_layer.firestore import build_firestore_layer
from zetta_utils.mazepa import id_generation
from zetta_utils.parsing import json

from .resource import (
Expand Down Expand Up @@ -123,7 +122,7 @@ def _send_heartbeat():
update_run_info(RUN_ID, info)

if run_id is None:
run_id = id_generation.get_unique_id(slug_len=4, add_uuid=False, max_len=50)
run_id = get_unique_id(slug_len=4, add_uuid=False, max_len=50)

global RUN_ID # pylint: disable=global-statement
RUN_ID = run_id
Expand Down

0 comments on commit 4b92875

Please sign in to comment.