Skip to content

Commit

Permalink
fix(subchunkable_apply_flow): generate deterministic flow_id
Browse files Browse the repository at this point in the history
  • Loading branch information
nkemnitz authored and supersergiy committed Feb 2, 2024
1 parent 00749b4 commit a34db3c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
44 changes: 44 additions & 0 deletions tests/unit/mazepa/test_id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
import attrs

from zetta_utils import mazepa
from zetta_utils.builder.building import BuilderPartial
from zetta_utils.geometry.bbox import BBox3D
from zetta_utils.layer.volumetric.cloudvol.build import build_cv_layer
from zetta_utils.mazepa import taskable_operation_cls
from zetta_utils.mazepa.id_generation import generate_invocation_id as gen_id
from zetta_utils.mazepa_layer_processing.common import build_subchunkable_apply_flow


class ClassA:
Expand Down Expand Up @@ -101,6 +105,37 @@ def flow(self, *args, **kwargs):
return self.callable_fn(*args, **kwargs)


def subchunkable_flow():
# Subchunkable is used so commonly that it warrants its own test
return build_subchunkable_apply_flow(
fn=BuilderPartial(spec={"@type": "invoke_lambda_str", "lambda_str": "lambda x: x"}),
dst_resolution=[1, 1, 1],
processing_chunk_sizes=[[1, 1, 1]],
dst=build_cv_layer(
path="/tmp/zutils/test/test_id_generation",
info_field_overrides={
"data_type": "int8",
"num_channels": 1,
"scales": [
{
"chunk_sizes": [[1, 1, 1]],
"encoding": "raw",
"key": "1_1_1",
"resolution": [1, 1, 1],
"size": [1, 1, 1],
"voxel_offset": [0, 0, 0],
}
],
"type": "image",
},
),
bbox=BBox3D.from_coords(start_coord=[0, 0, 0], end_coord=[1, 1, 1], resolution=[1, 1, 1]),
level_intermediaries_dirs=[
"/tmp/zutils/test/test_id_generation/tmp",
],
)


def test_generate_invocation_id_method() -> None:
assert gen_id(ClassA().method, [], {}) != gen_id(ClassB().method, [], {})
assert gen_id(ClassB().method, [], {}) != gen_id(ClassC().method, [], {})
Expand Down Expand Up @@ -183,6 +218,12 @@ def test_generate_invocation_id_flow_schema() -> None:
)


def test_generate_invocation_id_subchunkable_flow() -> None:
a = subchunkable_flow()
b = subchunkable_flow()
assert gen_id(a.fn, a.args, a.kwargs) == gen_id(b.fn, b.args, b.kwargs)


def _gen_id_calls(_) -> dict[str, str]:
gen_ids = {
'gen_id(ClassA().method, [], {"a": 1})': gen_id(ClassA().method, [], {"a": 1}),
Expand All @@ -208,6 +249,9 @@ def _gen_id_calls(_) -> dict[str, str]:
"gen_id(FlowSchema({}, ClassE(1).method).flow, [], {})": gen_id(
FlowSchema({}, ClassE(1).method).flow, [], {}
),
"gen_id(subchunkable_flow(), [], {})": gen_id(
subchunkable_flow().fn, subchunkable_flow().args, subchunkable_flow().kwargs
),
}
return gen_ids

Expand Down
18 changes: 14 additions & 4 deletions zetta_utils/mazepa/id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,21 @@ def get_unique_id(


def generate_invocation_id(
fn: Callable,
args: list,
kwargs: dict,
fn: Optional[Callable] = None,
args: Optional[list] = None,
kwargs: Optional[dict] = None,
prefix: Optional[str] = None,
):
) -> str:
"""Generate a unique and deterministic ID for a function invocation.
The ID is generated using xxhash and dill to hash the function and its arguments.
:param fn: the function, or really any Callable, defaults to None
:param args: the function arguments, or any list, defaults to None
:param kwargs: the function kwargs, or any dict, defaults to None
:param prefix: optional prefix str, separated by `-`, defaults to None
:return: A unique, yet deterministic string that identifies (fn, args, kwargs) in
the current Python environment.
"""
x = xxhash.xxh128()
try:
x.update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,11 +846,10 @@ def _build_subchunkable_apply_flow( # pylint: disable=keyword-arg-before-vararg
op_kwargs=op_kwargs,
)
"""
Generate flow id for deconflicting intermediaries
Generate flow id for deconflicting intermediaries - must be deterministic for proper
identification across Python sessions
"""
flow_id = id_generation.get_unique_id(
prefix="subchunkable", slug_len=4, add_uuid=False, max_len=50
)
flow_id = id_generation.generate_invocation_id(kwargs=locals(), prefix="subchunkable")

"""
Basic building blocks where the work gets done, at the very bottom
Expand Down

0 comments on commit a34db3c

Please sign in to comment.