Skip to content

Commit

Permalink
Recipe hashes (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisaacstern authored Apr 30, 2022
1 parent e78febe commit cc0a1be
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 0 deletions.
13 changes: 13 additions & 0 deletions docs/pangeo_forge_recipes/development/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Release Notes

## v0.8.4

- Added `serialization` module along with `BaseRecipe.sha256` and `FilePattern.sha256` methods.
Collectively, this provides for generation of deterministic hashes for both recipe and file
pattern instances. Checking these hashes against those from a prior version of the recipe can be
used to determine whether or not a particular recipe instance in a Python module (which may
contain arbitrary numbers of recipe instances) has changed since the last time the instances in
that module were executed. The file pattern hashes are based on blockchain built cumulatively
from all of the index:filepath pairs yielded by the pattern's `self.items()` method. As such, in
cases where a new pattern is intended to append to an existing dataset which was built from a
prior version of that pattern, the pattern hash can be used to determine the index from which to
begin appending. This is demonstrated in the tests. {pull}`349`

## v0.8.3 - 2022-04-19

- Added `.file_type` attribute to {class}`pangeo_forge_recipes.patterns.FilePattern`. This attribute will eventually supercede
Expand Down
69 changes: 69 additions & 0 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import warnings
from dataclasses import dataclass, field, replace
from enum import Enum, auto
from hashlib import sha256
from itertools import product
from typing import (
Any,
Expand All @@ -20,6 +21,8 @@
Union,
)

from .serialization import dataclass_sha256, dict_drop_empty, dict_to_sha256


class CombineOp(Enum):
"""Used to uniquely identify different combine operations across Pangeo Forge Recipes."""
Expand Down Expand Up @@ -248,6 +251,11 @@ def items(self):
for key in self:
yield key, self[key]

def sha256(self):
"""Compute a sha256 hash for the instance."""

return pattern_blockchain(self).pop(-1)


def pattern_from_file_sequence(file_list, concat_dim, nitems_per_file=None, **kwargs):
"""Convenience function for creating a FilePattern from a list of files."""
Expand Down Expand Up @@ -288,3 +296,64 @@ def prune_pattern(fp: FilePattern, nkeep: int = 2) -> FilePattern:
if param not in ["format_function", "combine_dims"]
}
return FilePattern(fp.format_function, *new_combine_dims, **kwargs)


def pattern_blockchain(pattern: FilePattern) -> List[bytes]:
"""For a ``FilePattern`` instance, compute a blockchain, i.e. a list of hashes of length N+1,
where N is the number of index:filepath pairs yielded by the ``FilePattern`` instance's
``.items()`` method. The first item in the list is calculated by hashing instance attributes.
Each subsequent item is calculated by hashing the byte string produced by concatenating the next
index:filepath pair yielded by ``.items()`` with the previous hash in the list.
:param pattern: The ``FilePattern`` instance for which to calculate a blockchain.
"""

# we exclude the format function and combine dims from ``root`` because they determine the
# index:filepath pairs yielded by iterating over ``.items()``. if these pairs are generated in
# a different way in the future, we ultimately don't care.
root = {
"fsspec_open_kwargs": pattern.fsspec_open_kwargs,
"query_string_secrets": pattern.query_string_secrets,
"file_type": pattern.file_type,
"nitems_per_file": {
op.name: op.nitems_per_file # type: ignore
for op in pattern.combine_dims
if op.name in pattern.concat_dims
},
}
# by dropping empty values from ``root``, we allow for the attributes of ``FilePattern`` to
# change while allowing for backwards-compatibility between hashes of patterns which do not
# set those new attributes.
root_drop_empty = dict_drop_empty([(k, v) for k, v in root.items()])
root_sha256 = dict_to_sha256(root_drop_empty)

blockchain = [root_sha256]
for k, v in pattern.items():
key_hash = b"".join(
sorted([dataclass_sha256(dimindex, ignore_keys=["sequence_len"]) for dimindex in k])
)
value_hash = sha256(v.encode("utf-8")).digest()
new_hash = key_hash + value_hash
new_block = sha256(blockchain[-1] + new_hash).digest()
blockchain.append(new_block)

return blockchain


def match_pattern_blockchain( # type: ignore
old_pattern_last_hash: bytes,
new_pattern: FilePattern,
) -> Index:
"""Given the last hash of the blockchain for a previous pattern, and a new pattern, determine
which (if any) ``Index`` key of the new pattern to begin processing data from, in order to
append to a dataset build using the previous pattern.
:param old_pattern_last_hash: The last hash of the blockchain for the ``FilePattern`` instance
which was used to build the existing dataset.
:param new_pattern: A new ``FilePattern`` instance from which to append to the existing dataset.
"""

new_chain = pattern_blockchain(new_pattern)
for k, h in zip(new_pattern, new_chain):
if h == old_pattern_last_hash:
return k
5 changes: 5 additions & 0 deletions pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

from ..executors.base import Pipeline
from ..patterns import FilePattern, prune_pattern
from ..serialization import dataclass_sha256
from ..storage import StorageConfig, temporary_storage_config


@dataclass
class BaseRecipe(ABC):
_compiler: ClassVar[RecipeCompiler]
_hash_exclude_ = ["storage_config"]

def to_function(self):
from ..executors import FunctionPipelineExecutor
Expand All @@ -38,6 +40,9 @@ def to_beam(self):

return BeamPipelineExecutor.compile(self._compiler())

def sha256(self):
return dataclass_sha256(self, ignore_keys=self._hash_exclude_)


RecipeCompiler = Callable[[BaseRecipe], Pipeline]

Expand Down
70 changes: 70 additions & 0 deletions pangeo_forge_recipes/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from collections.abc import Collection
from dataclasses import asdict
from enum import Enum
from hashlib import sha256
from json import dumps
from typing import Any, List, Sequence


def either_encode_or_hash(obj: Any):
"""For objects which are not serializable with ``json.dumps``, this function defines
type-specific handlers which extract either a serializable value or a hash from the object.
:param obj: Any object which is not serializable to ``json``.
"""

if isinstance(obj, Enum): # custom serializer for FileType, CombineOp, etc.
return obj.value
elif hasattr(obj, "sha256"):
return obj.sha256().hex()
raise TypeError(f"object of type {type(obj).__name__} not serializable")


def dict_to_sha256(dictionary: dict) -> bytes:
"""Generates a deterministic sha256 hash for a dict by first serializing the dict to json.
:param dictionary: The dict for which to generate a hash.
"""

# https://death.andgravity.com/stable-hashing

b = dumps(
dictionary,
default=either_encode_or_hash,
ensure_ascii=False,
sort_keys=True,
indent=None,
separators=(",", ":"),
)
return sha256(b.encode("utf-8")).digest()


def dict_drop_empty(pairs: Sequence[Sequence]) -> dict:
"""Custom factory function for ``dataclasses.asdict`` which drops fields for which the value is
``None`` or for which the value is an empty collection (e.g. an empty ``list`` or ``dict``).
:param pairs: A sequence (list or tuple) of sequences of length 2, in which the first element of
each inner sequence is a hashable which can serve as a dictionary key, and the second element
of each inner sequence is the value to map to the key.
"""

# https://death.andgravity.com/stable-hashing#problem-we-need-to-ignore-empty-values

return dict((k, v) for k, v in pairs if not (v is None or not v and isinstance(v, Collection)))


def dataclass_sha256(dclass: type, ignore_keys: List[str]) -> bytes:
"""Generate a deterministic sha256 hash from a Python ``dataclass``. Fields for which the value
is either ``None`` or an empty collection are excluded from the hash calculation automatically.
To manually exclude other fields from the calculation, pass their names via ``igonore_keys``.
For field values which are not json serializable, type-specific handlers are defined by the
``json_default`` function in this module.
:param dclass: The dataclass for which to calculate a hash.
:param ignore_keys: A list of field names to exclude from the hash calculation.
"""

d = asdict(dclass, dict_factory=dict_drop_empty)
for k in ignore_keys:
del d[k]
return dict_to_sha256(d)
189 changes: 189 additions & 0 deletions tests/test_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Optional

import pandas as pd
import pytest
from fsspec.implementations.local import LocalFileSystem

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, FileType, match_pattern_blockchain
from pangeo_forge_recipes.recipes import HDFReferenceRecipe, XarrayZarrRecipe
from pangeo_forge_recipes.serialization import dict_to_sha256, either_encode_or_hash
from pangeo_forge_recipes.storage import FSSpecTarget, StorageConfig

URL_FORMAT = (
"https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/"
"v2.1/access/avhrr/{time:%Y%m}/oisst-avhrr-v02r01.{time:%Y%m%d}.nc"
)


def make_file_pattern(dates, nitems_per_file):
def make_url(time):
return URL_FORMAT.format(time=time)

time_concat_dim = ConcatDim("time", dates, nitems_per_file=nitems_per_file)
pattern = FilePattern(make_url, time_concat_dim)

return pattern


@pytest.fixture
def end_date():
return "1981-10-01"


@pytest.fixture
def base_pattern(end_date):
dates = pd.date_range("1981-09-01", end_date, freq="D")
return make_file_pattern(dates, nitems_per_file=1)


def get_new_pattern_with_next_url(end_date, nitems_per_file):

fmt = "%Y-%m-%d"

def increment_end_date(ndays):
return datetime.strptime(end_date, fmt) + timedelta(days=ndays)

next_day = increment_end_date(ndays=1)
new_end_date = increment_end_date(ndays=10).strftime(fmt)
new_dates = pd.date_range("1981-09-01", new_end_date, freq="D")
new_pattern = make_file_pattern(new_dates, nitems_per_file=nitems_per_file)
return new_pattern, URL_FORMAT.format(time=next_day)


@pytest.fixture(params=["matching", "not_matching"])
def pattern_pair(base_pattern, end_date, request):
if request.param == "matching":
return (base_pattern, base_pattern)
elif request.param == "not_matching":
new_pattern, _ = get_new_pattern_with_next_url(end_date, nitems_per_file=1)
return (base_pattern, new_pattern)


@pytest.mark.parametrize("new_pattern_nitems_per_file", [1, 2])
@pytest.mark.parametrize(
"kwargs",
[
({}, {}),
({}, dict(fsspec_open_kwargs={"block_size": 0})),
(dict(fsspec_open_kwargs={"block_size": 0}), dict(fsspec_open_kwargs={"block_size": 0})),
(dict(file_type=FileType.opendap), dict(fsspec_open_kwargs={"block_size": 0})),
(dict(file_type=FileType.opendap), dict(file_type=FileType.opendap)),
],
)
def test_match_pattern_blockchain(base_pattern, end_date, new_pattern_nitems_per_file, kwargs):
new_pattern, next_url = get_new_pattern_with_next_url(end_date, new_pattern_nitems_per_file)

for i, pattern in enumerate((base_pattern, new_pattern)):
for k, v in kwargs[i].items():
setattr(pattern, k, v)

matching_key = match_pattern_blockchain(base_pattern.sha256(), new_pattern)

if kwargs[0] == kwargs[1] and new_pattern_nitems_per_file == 1:
assert new_pattern[matching_key] == next_url
elif kwargs[0] != kwargs[1] or new_pattern_nitems_per_file == 2:
assert matching_key is None


@pytest.mark.parametrize("recipe_cls", [XarrayZarrRecipe, HDFReferenceRecipe])
def test_recipe_sha256_hash_exclude(base_pattern, recipe_cls, tmpdir_factory):
recipe_0 = recipe_cls(base_pattern)
recipe_1 = recipe_cls(base_pattern)

assert recipe_0.sha256() == recipe_1.sha256()

local_fs = LocalFileSystem()
custom_target_path = tmpdir_factory.mktemp("custom_target")
custom_storage_config = StorageConfig(target=FSSpecTarget(local_fs, custom_target_path))
recipe_1.storage_config = custom_storage_config

assert recipe_0.sha256() == recipe_1.sha256()


@pytest.mark.parametrize(
"kwargs",
[
({}, {}),
({}, dict(target_chunks={"time": 1})),
(dict(target_chunks={"time": 1}), dict(target_chunks={"time": 1})),
(dict(target_chunks={"time": 1}), dict(target_chunks={"time": 2})),
(dict(subset_inputs={"time": 2}), dict(target_chunks={"time": 2})),
],
)
def test_xarray_zarr_sha265(pattern_pair, kwargs):
recipe_0 = XarrayZarrRecipe(pattern_pair[0], **kwargs[0])
recipe_1 = XarrayZarrRecipe(pattern_pair[1], **kwargs[1])

if pattern_pair[0] == pattern_pair[1] and kwargs[0] == kwargs[1]:
assert recipe_0.sha256() == recipe_1.sha256()
else:
assert recipe_0.sha256() != recipe_1.sha256()


@pytest.mark.parametrize(
"kwargs",
[
({}, {}),
({}, dict(output_json_fname="custom_name.json")),
(dict(output_json_fname="custom_name.json"), dict(output_json_fname="custom_name.json")),
(dict(netcdf_storage_options={"anon": True}), dict(output_json_fname="custom_name.json")),
],
)
def test_kerchunk_sha265(pattern_pair, kwargs):
recipe_0 = HDFReferenceRecipe(pattern_pair[0], **kwargs[0])
recipe_1 = HDFReferenceRecipe(pattern_pair[1], **kwargs[1])

if pattern_pair[0] == pattern_pair[1] and kwargs[0] == kwargs[1]:
assert recipe_0.sha256() == recipe_1.sha256()
else:
assert recipe_0.sha256() != recipe_1.sha256()


@pytest.mark.parametrize("cls", [XarrayZarrRecipe, HDFReferenceRecipe])
@pytest.mark.parametrize(
"kwargs",
[
{},
{"new_optional_str": "hello"},
{"new_dict": dict(a=1)},
{"new_list": [1, 2, 3]},
],
)
def test_additional_fields(base_pattern, cls, kwargs):
# simulates a new release in which new fields are added; because we drop empty fields from
# the hash calculation, backwards compatibility is preserved as long as new fields are unset

@dataclass
class NewRelease(cls):
new_optional_str: Optional[str] = None
new_dict: dict = field(default_factory=dict)
new_list: list = field(default_factory=list)

old_release_obj = cls(base_pattern)
new_release_obj = NewRelease(base_pattern, **kwargs)

if not kwargs:
assert old_release_obj.sha256() == new_release_obj.sha256()
else:
assert old_release_obj.sha256() != new_release_obj.sha256()


def test_either_encode_or_hash_raises():
def f():
pass

@dataclass
class HasUnserializableField:
unserializable_field: Callable = f

expected_msg = f"object of type {type(f).__name__} not serializable"

with pytest.raises(TypeError, match=expected_msg):
either_encode_or_hash(f)

with pytest.raises(TypeError, match=expected_msg):
# in practice, we never actually call ``either_encode_or_hash`` directly.
# it's actually called from within ``dict_to_sha256``.
dict_to_sha256(asdict(HasUnserializableField()))

0 comments on commit cc0a1be

Please sign in to comment.