Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
dodamih committed Jan 23, 2025
1 parent e0ee82c commit ba3d718
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ keywords = ["neuroscience connectomics EM"]
license = {text = "MIT"}
name = "zetta_utils"
readme = "README.md"
requires-python = ">3.10,<3.13"
requires-python = ">3.9,<3.13"
urls = {Homepage = "https://github.com/zettaai/zetta_utils"}
version = "0.0.2"

Expand Down
71 changes: 36 additions & 35 deletions tests/unit/mazepa/test_id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,49 +226,50 @@ def test_generate_invocation_id_subchunkable_flow() -> None:

def _gen_id_calls(_) -> dict[str, str]:
gen_ids = {
'gen_id(ClassA().method, [], {"a": 1})': gen_id(ClassA().method, [], {"a": 1}),
"gen_id(ClassD1().method, [], {})": gen_id(ClassD1().method, [], {}),
"gen_id(ClassE(1).method, [], {})": gen_id(ClassE(1).method, [], {}),
"gen_id(partial(ClassA().method, 42), [], {})": gen_id(
partial(ClassA().method, 42), [], {}
),
"gen_id(partial(ClassD1().method, 42), [], {})": gen_id(
partial(ClassD1().method, 42), [], {}
),
"gen_id(partial(ClassE(1).method, 42), [], {})": gen_id(
partial(ClassE(1).method, 42), [], {}
),
"gen_id(TaskableA(), [], {})": gen_id(TaskableA(), [], {}),
"gen_id(TaskableD(1), [], {})": gen_id(TaskableD(1), [], {}),
"gen_id(FlowSchema({}, ClassA().method).flow, [], {})": gen_id(
FlowSchema({}, ClassA().method).flow, [], {}
),
"gen_id(FlowSchema({}, ClassD1().method).flow, [], {})": gen_id(
FlowSchema({}, ClassD1().method).flow, [], {}
),
"gen_id(FlowSchema({}, ClassE(1).method).flow, [], {})": gen_id(
FlowSchema({}, ClassE(1).method).flow, [], {}
),
"gen_id(subchunkable_flow(), [], {})": gen_id(
subchunkable_flow().fn, subchunkable_flow().args, subchunkable_flow().kwargs
),
# 'gen_id(ClassA().method, [], {"a": 1})': gen_id(ClassA().method, [], {"a": 1}),
"gen_id(ClassD1().method, [], {})": gen_id(ClassD1().method, [], {}, None, True),
# "gen_id(ClassE(1).method, [], {})": gen_id(ClassE(1).method, [], {}),
# "gen_id(partial(ClassA().method, 42), [], {})": gen_id(
# partial(ClassA().method, 42), [], {}
# ),
# "gen_id(partial(ClassD1().method, 42), [], {})": gen_id(
# partial(ClassD1().method, 42), [], {}
# ),
# "gen_id(partial(ClassE(1).method, 42), [], {})": gen_id(
# partial(ClassE(1).method, 42), [], {}
# ),
# "gen_id(TaskableA(), [], {})": gen_id(TaskableA(), [], {}),
# "gen_id(TaskableD(1), [], {})": gen_id(TaskableD(1), [], {}),
# "gen_id(FlowSchema({}, ClassA().method).flow, [], {})": gen_id(
# FlowSchema({}, ClassA().method).flow, [], {}
# ),
# "gen_id(FlowSchema({}, ClassD1().method).flow, [], {})": gen_id(
# FlowSchema({}, ClassD1().method).flow, [], {}
# ),
# "gen_id(FlowSchema({}, ClassE(1).method).flow, [], {})": gen_id(
# FlowSchema({}, ClassE(1).method).flow, [], {}
# ),
# "gen_id(subchunkable_flow(), [], {})": gen_id(
# subchunkable_flow().fn, subchunkable_flow().args, subchunkable_flow().kwargs
# ),
}
return gen_ids


def test_persistence_across_sessions() -> None:
# Create two separate processes - spawn ensures a new PYTHONHASHSEED is used
ctx = multiprocessing.get_context("spawn")
with ctx.Pool(processes=2) as pool:
result = pool.map(_gen_id_calls, range(2))

assert result[0] == result[1]
for _ in range(1):
with ctx.Pool(processes=2) as pool:
result = pool.map(_gen_id_calls, range(2))

assert result[0] == result[1]

def test_unpickleable_fn(mocker) -> None:
# See https://github.com/uqfoundation/dill/issues/147 and possibly
# https://github.com/uqfoundation/dill/issues/56

unpickleable_fn = mocker.MagicMock()
"""
def test_unpickleable_invocation(mocker) -> None:
# gen_id will return a random UUID in case of pickle errors
assert gen_id(unpickleable_fn, [], {}) != gen_id(unpickleable_fn, [], {})
some_fn = lambda x: x
unpicklable_arg = [1]
assert gen_id(some_fn, unpicklable_arg, {}) != gen_id(some_fn, unpicklable_arg, {})
"""
20 changes: 19 additions & 1 deletion zetta_utils/mazepa/id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import uuid
from typing import Callable, Optional

from sympy import im

import cloudpickle
import xxhash
from coolname import generate_slug
Expand Down Expand Up @@ -40,6 +42,7 @@ def generate_invocation_id(
args: Optional[list] = None,
kwargs: Optional[dict] = None,
prefix: Optional[str] = None,
debug: Optional[bool] = False,
) -> str:
"""Generate a unique and deterministic ID for a function invocation.
The ID is generated using xxhash and cloudpickle to hash the function and its arguments.
Expand All @@ -51,10 +54,25 @@ def generate_invocation_id(
:return: A unique, yet deterministic string that identifies (fn, args, kwargs) in
the current Python environment.
"""
# import dill
import pickletools
#return cloudpickle.dumps((fn, args, kwargs), protocol=dill.DEFAULT_PROTOCOL)s
if debug:
pickletools.dis(pickletools.optimize(cloudpickle.dumps((fn, args, kwargs))))

return str(cloudpickle.dumps((fn, args, kwargs)))
#return cloudpickle.dumps((fn, args, kwargs), protocol=dill.DEFAULT_PROTOCOL)s
x = xxhash.xxh128()
try:
x.update(cloudpickle.dumps((fn, args, kwargs)))
except Exception as e: # pylint: disable=broad-exception-caught
#x.update(dill.dumps(
#(fn, args, kwargs),
#protocol=dill.DEFAULT_PROTOCOL,
#byref=False,
#recurse=True,
#fmode=dill.FILE_FMODE,
#))
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(f"Failed to pickle {fn} with args {args} and kwargs {kwargs}: {e}")
x.update(str(uuid.uuid4()))

Expand Down

0 comments on commit ba3d718

Please sign in to comment.