Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recipe hashes #349

Merged
merged 24 commits into from
Apr 30, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
694f9d4
add serialization.py
cisaacstern Apr 27, 2022
1095095
add test_serialization.py
cisaacstern Apr 27, 2022
93c8231
add .sha256 methods to FilePattern and BaseRecipe
cisaacstern Apr 27, 2022
50b522d
in pattern.sha256 root, add nitems_per_file, remove is_opendap
cisaacstern Apr 27, 2022
e34afda
shorter date range for serialization test
cisaacstern Apr 27, 2022
5ee8342
only _hash_exclude_ 'storage_config', not 'file_pattern'
cisaacstern Apr 27, 2022
ed66064
parametrize hash exclude test with both recipe classes
cisaacstern Apr 28, 2022
3c13247
drop empty fields before sha256 calc
cisaacstern Apr 28, 2022
9cb550f
parametrize additional fields test
cisaacstern Apr 28, 2022
228eead
parametrize kwargs for recipe sha256 tests
cisaacstern Apr 28, 2022
840df67
test recipes sha256 with matching and not_matching patterns
cisaacstern Apr 29, 2022
568980c
type hints and docstrings for serialization.py
cisaacstern Apr 29, 2022
23f79fa
parametrize file pattern hash test with kwargs; drop empty fields fro…
cisaacstern Apr 29, 2022
160e3e9
clarify addl kwargs test names
cisaacstern Apr 29, 2022
0ca1776
add release notes
cisaacstern Apr 29, 2022
0300877
fix typo in match_pattern_blockchain docstring
cisaacstern Apr 29, 2022
4092932
move imports to top level
cisaacstern Apr 29, 2022
8303cfc
release notes text wrapping
cisaacstern Apr 30, 2022
f4c563b
duck type json_default
cisaacstern Apr 30, 2022
838005b
Merge remote-tracking branch 'charles/recipe-hashes' into recipe-hashes
cisaacstern Apr 30, 2022
a60d3f3
fix circular dependencies
cisaacstern Apr 30, 2022
d3489f5
rename json_default -> either_encode_or_hash
cisaacstern Apr 30, 2022
b48849c
make nitems_per_file a dict comprehension inclusive of op.name
cisaacstern Apr 30, 2022
a5bcf1e
test either_encode_or_hash raises TypeError
cisaacstern Apr 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/pangeo_forge_recipes/development/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Release Notes

## v0.8.4

- Added `serialization` module along with `BaseRecipe.sha256` and `FilePattern.sha256` methods. Collectively,
this provides for generation of deterministic hashes for both recipe and file pattern instances. Checking these
hashes against those from a prior version of the recipe can be used to determine whether or not a particular recipe
instance in a Python module (which may contain arbitrary numbers of recipe instances) has changed since the last time the instances in that module were executed. The file pattern hashes are based on blockchain built cumulatively from all of the index:filepath pairs yielded by the pattern's `self.items()` method. As such, in cases where a new pattern is intended to append to an existing dataset which was built from a prior version of that pattern, the pattern hash can be used to determine the index from which to begin appending. This is demonstrated in the tests. {pull}`349`
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved

## v0.8.3 - 2022-04-19

- Added `.file_type` attribute to {class}`pangeo_forge_recipes.patterns.FilePattern`. This attribute will eventually supercede
Expand Down
6 changes: 6 additions & 0 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ def items(self):
for key in self:
yield key, self[key]

def sha256(self):
"""Compute a sha256 hash for the instance."""
from .serialization import pattern_blockchain
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved

return pattern_blockchain(self).pop(-1)


def pattern_from_file_sequence(file_list, concat_dim, nitems_per_file=None, **kwargs):
"""Convenience function for creating a FilePattern from a list of files."""
Expand Down
6 changes: 6 additions & 0 deletions pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
@dataclass
class BaseRecipe(ABC):
_compiler: ClassVar[RecipeCompiler]
_hash_exclude_ = ["storage_config"]

def to_function(self):
from ..executors import FunctionPipelineExecutor
Expand All @@ -38,6 +39,11 @@ def to_beam(self):

return BeamPipelineExecutor.compile(self._compiler())

def sha256(self):
from ..serialization import dataclass_sha256
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved

return dataclass_sha256(self, ignore_keys=self._hash_exclude_)


RecipeCompiler = Callable[[BaseRecipe], Pipeline]

Expand Down
133 changes: 133 additions & 0 deletions pangeo_forge_recipes/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from collections.abc import Collection
from dataclasses import asdict
from enum import Enum
from hashlib import sha256
from json import dumps
from typing import Any, List, Sequence

from .patterns import FilePattern, Index


def json_default(obj: Any):
"""For objects which are not serializable with ``json.dumps``, this function defines
type-specific handlers which extract a serializable value from the object.
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved

:param obj: Any object which is not serializable to ``json``.
"""

if isinstance(obj, Enum): # custom serializer for FileType, CombineOp, etc.
return obj.value
elif isinstance(obj, FilePattern):
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved
return obj.sha256().hex()
raise TypeError(f"object of type {type(obj).__name__} not serializable")


def dict_to_sha256(dictionary: dict) -> bytes:
"""Generates a deterministic sha256 hash for a dict by first serializing the dict to json.

:param dictionary: The dict for which to generate a hash.
"""

# https://death.andgravity.com/stable-hashing

b = dumps(
dictionary,
default=json_default,
ensure_ascii=False,
sort_keys=True,
indent=None,
separators=(",", ":"),
)
return sha256(b.encode("utf-8")).digest()


def dict_drop_empty(pairs: Sequence[Sequence]) -> dict:
"""Custom factory function for ``dataclasses.asdict`` which drops fields for which the value is
``None`` or for which the value is an empty collection (e.g. an empty ``list`` or ``dict``).

:param pairs: A sequence (list or tuple) of sequences of length 2, in which the first element of
each inner sequence is a hashable which can serve as a dictionary key, and the second element
of each inner sequence is the value to map to the key.
"""

# https://death.andgravity.com/stable-hashing#problem-we-need-to-ignore-empty-values

return dict((k, v) for k, v in pairs if not (v is None or not v and isinstance(v, Collection)))


def dataclass_sha256(dclass: type, ignore_keys: List[str]) -> bytes:
"""Generate a deterministic sha256 hash from a Python ``dataclass``. Fields for which the value
is either ``None`` or an empty collection are excluded from the hash calculation automatically.
To manually exclude other fields from the calculation, pass their names via ``igonore_keys``.
For field values which are not json serializable, type-specific handlers are defined by the
``json_default`` function in this module.

:param dclass: The dataclass for which to calculate a hash.
:param ignore_keys: A list of field names to exclude from the hash calculation.
"""

d = asdict(dclass, dict_factory=dict_drop_empty)
for k in ignore_keys:
del d[k]
return dict_to_sha256(d)


def pattern_blockchain(pattern: FilePattern) -> List[bytes]:
"""For a ``FilePattern`` instance, compute a blockchain, i.e. a list of hashes of length N+1,
where N is the number of index:filepath pairs yielded by the ``FilePattern`` instance's
``.items()`` method. The first item in the list is calculated by hashing instance attributes.
Each subsequent item is calculated by hashing the byte string produced by concatenating the next
index:filepath pair yielded by ``.items()`` with the previous hash in the list.

:param pattern: The ``FilePattern`` instance for which to calculate a blockchain.
"""

# we exclude the format function and combine dims from ``root`` because they determine the
# index:filepath pairs yielded by iterating over ``.items()``. if these pairs are generated in
# a different way in the future, we ultimately don't care.
root = {
"fsspec_open_kwargs": pattern.fsspec_open_kwargs,
"query_string_secrets": pattern.query_string_secrets,
"file_type": pattern.file_type,
"nitems_per_file": [
op.nitems_per_file # type: ignore
for op in pattern.combine_dims
if op.name in pattern.concat_dims
],
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved
}
# by dropping empty values from ``root``, we allow for the attributes of ``FilePattern`` to
# change while allowing for backwards-compatibility between hashes of patterns which do not
# set those new attributes.
root_drop_empty = dict_drop_empty([(k, v) for k, v in root.items()])
root_sha256 = dict_to_sha256(root_drop_empty)

blockchain = [root_sha256]
for k, v in pattern.items():
key_hash = b"".join(
sorted([dataclass_sha256(dimindex, ignore_keys=["sequence_len"]) for dimindex in k])
)
value_hash = sha256(v.encode("utf-8")).digest()
new_hash = key_hash + value_hash
new_block = sha256(blockchain[-1] + new_hash).digest()
blockchain.append(new_block)

return blockchain


def match_pattern_blockchain( # type: ignore
old_pattern_last_hash: bytes,
new_pattern: FilePattern,
) -> Index:
"""Given the last hash of the blockchain for a previous pattern, and a new pattern, determine
which (if any) ``Index`` key of the new pattern to begin processing data from, in order to
append to a dataset built using the previous pattern.

:param old_pattern_last_hash: The last hash of the blockchain for the ``FilePattern`` instance
which was used to build the existing dataset.
:param new_pattern: A new ``FilePattern`` instance from which to append to the existing dataset.
"""

new_chain = pattern_blockchain(new_pattern)
for k, h in zip(new_pattern, new_chain):
if h == old_pattern_last_hash:
return k
170 changes: 170 additions & 0 deletions tests/test_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional

import pandas as pd
import pytest
from fsspec.implementations.local import LocalFileSystem

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, FileType
from pangeo_forge_recipes.recipes import HDFReferenceRecipe, XarrayZarrRecipe
from pangeo_forge_recipes.serialization import match_pattern_blockchain
from pangeo_forge_recipes.storage import FSSpecTarget, StorageConfig

URL_FORMAT = (
"https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/"
"v2.1/access/avhrr/{time:%Y%m}/oisst-avhrr-v02r01.{time:%Y%m%d}.nc"
)


def make_file_pattern(dates, nitems_per_file):
def make_url(time):
return URL_FORMAT.format(time=time)

time_concat_dim = ConcatDim("time", dates, nitems_per_file=nitems_per_file)
pattern = FilePattern(make_url, time_concat_dim)

return pattern


@pytest.fixture
def end_date():
return "1981-10-01"


@pytest.fixture
def base_pattern(end_date):
dates = pd.date_range("1981-09-01", end_date, freq="D")
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved
return make_file_pattern(dates, nitems_per_file=1)


def get_new_pattern_with_next_url(end_date, nitems_per_file):

fmt = "%Y-%m-%d"

def increment_end_date(ndays):
return datetime.strptime(end_date, fmt) + timedelta(days=ndays)

next_day = increment_end_date(ndays=1)
new_end_date = increment_end_date(ndays=10).strftime(fmt)
new_dates = pd.date_range("1981-09-01", new_end_date, freq="D")
new_pattern = make_file_pattern(new_dates, nitems_per_file=nitems_per_file)
return new_pattern, URL_FORMAT.format(time=next_day)


@pytest.fixture(params=["matching", "not_matching"])
def pattern_pair(base_pattern, end_date, request):
if request.param == "matching":
return (base_pattern, base_pattern)
elif request.param == "not_matching":
new_pattern, _ = get_new_pattern_with_next_url(end_date, nitems_per_file=1)
return (base_pattern, new_pattern)
cisaacstern marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("new_pattern_nitems_per_file", [1, 2])
@pytest.mark.parametrize(
"kwargs",
[
({}, {}),
({}, dict(fsspec_open_kwargs={"block_size": 0})),
(dict(fsspec_open_kwargs={"block_size": 0}), dict(fsspec_open_kwargs={"block_size": 0})),
(dict(file_type=FileType.opendap), dict(fsspec_open_kwargs={"block_size": 0})),
(dict(file_type=FileType.opendap), dict(file_type=FileType.opendap)),
],
)
def test_match_pattern_blockchain(base_pattern, end_date, new_pattern_nitems_per_file, kwargs):
new_pattern, next_url = get_new_pattern_with_next_url(end_date, new_pattern_nitems_per_file)

for i, pattern in enumerate((base_pattern, new_pattern)):
for k, v in kwargs[i].items():
setattr(pattern, k, v)

matching_key = match_pattern_blockchain(base_pattern.sha256(), new_pattern)

if kwargs[0] == kwargs[1] and new_pattern_nitems_per_file == 1:
assert new_pattern[matching_key] == next_url
elif kwargs[0] != kwargs[1] or new_pattern_nitems_per_file == 2:
assert matching_key is None


@pytest.mark.parametrize("recipe_cls", [XarrayZarrRecipe, HDFReferenceRecipe])
def test_recipe_sha256_hash_exclude(base_pattern, recipe_cls, tmpdir_factory):
recipe_0 = recipe_cls(base_pattern)
recipe_1 = recipe_cls(base_pattern)

assert recipe_0.sha256() == recipe_1.sha256()

local_fs = LocalFileSystem()
custom_target_path = tmpdir_factory.mktemp("custom_target")
custom_storage_config = StorageConfig(target=FSSpecTarget(local_fs, custom_target_path))
recipe_1.storage_config = custom_storage_config

assert recipe_0.sha256() == recipe_1.sha256()


@pytest.mark.parametrize(
"kwargs",
[
({}, {}),
({}, dict(target_chunks={"time": 1})),
(dict(target_chunks={"time": 1}), dict(target_chunks={"time": 1})),
(dict(target_chunks={"time": 1}), dict(target_chunks={"time": 2})),
(dict(subset_inputs={"time": 2}), dict(target_chunks={"time": 2})),
],
)
def test_xarray_zarr_sha265(pattern_pair, kwargs):
recipe_0 = XarrayZarrRecipe(pattern_pair[0], **kwargs[0])
recipe_1 = XarrayZarrRecipe(pattern_pair[1], **kwargs[1])

if pattern_pair[0] == pattern_pair[1] and kwargs[0] == kwargs[1]:
assert recipe_0.sha256() == recipe_1.sha256()
else:
assert recipe_0.sha256() != recipe_1.sha256()


@pytest.mark.parametrize(
"kwargs",
[
({}, {}),
({}, dict(output_json_fname="custom_name.json")),
(dict(output_json_fname="custom_name.json"), dict(output_json_fname="custom_name.json")),
(dict(netcdf_storage_options={"anon": True}), dict(output_json_fname="custom_name.json")),
],
)
def test_kerchunk_sha265(pattern_pair, kwargs):
recipe_0 = HDFReferenceRecipe(pattern_pair[0], **kwargs[0])
recipe_1 = HDFReferenceRecipe(pattern_pair[1], **kwargs[1])

if pattern_pair[0] == pattern_pair[1] and kwargs[0] == kwargs[1]:
assert recipe_0.sha256() == recipe_1.sha256()
else:
assert recipe_0.sha256() != recipe_1.sha256()


@pytest.mark.parametrize("cls", [XarrayZarrRecipe, HDFReferenceRecipe])
@pytest.mark.parametrize(
"kwargs",
[
{},
{"new_optional_str": "hello"},
{"new_dict": dict(a=1)},
{"new_list": [1, 2, 3]},
],
)
def test_additional_fields(base_pattern, cls, kwargs):
# simulates a new release in which new fields are added; because we drop empty fields from
# the hash calculation, backwards compatibility is preserved as long as new fields are unset

@dataclass
class NewRelease(cls):
new_optional_str: Optional[str] = None
new_dict: dict = field(default_factory=dict)
new_list: list = field(default_factory=list)

old_release_obj = cls(base_pattern)
new_release_obj = NewRelease(base_pattern, **kwargs)

if not kwargs:
assert old_release_obj.sha256() == new_release_obj.sha256()
else:
assert old_release_obj.sha256() != new_release_obj.sha256()