Skip to content

Commit

Permalink
Merge pull request #595 from pangeo-forge/dynamic-chunks-interface
Browse files Browse the repository at this point in the history
Dynamic chunking interface for StoreToZarr
  • Loading branch information
jbusecke authored Nov 15, 2023
2 parents dc125ab + 2afd727 commit 5d77d49
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 13 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/test-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ jobs:
- name: 🌈 Install pangeo-forge-recipes & pangeo-forge-runner
shell: bash -l {0}
run: |
python -m pip install -e ".[test,minio]"
python -m pip install ${{ matrix.runner-version }}
python -m pip install -e ".[test,minio]"
# order reversed to fix https://github.com/pangeo-forge/pangeo-forge-recipes/pull/595#issuecomment-1811630921
# this should however be fixed in the runner itself
- name: 🏄‍♂️ Run Tests
shell: bash -l {0}
run: |
Expand Down
4 changes: 4 additions & 0 deletions docs/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release Notes

## v0.10.4 - 2023-11-15

- Add `dynamic_chunking_fn`/`dynamic_chunking_fn_kwargs` keywords to StoreToZarr. This allows users to pass a function that will be called at runtime to determine the target chunks for the resulting datasets based on the in memory representation/size of the recipe dataset. {pull}`595`

## v0.10.3 - 2023-10-03

- Assign injection spec values for command line interface {pull}`566`
Expand Down
3 changes: 3 additions & 0 deletions examples/feedstock/gpcp_from_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
assert ds.title == (
"Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
)
# Making sure that the native chunking is different from the dynamic chunking
assert ds.chunks["time"][0] == 1

return store


Expand Down
54 changes: 54 additions & 0 deletions examples/feedstock/gpcp_from_gcs_dynamic_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Dict

import apache_beam as beam
import pandas as pd
import xarray as xr
import zarr

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr

dates = [
d.to_pydatetime().strftime("%Y%m%d")
for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
]


def make_url(time):
url_base = "https://storage.googleapis.com/pforge-test-data"
return f"{url_base}/gpcp/v01r03_daily_d{time}.nc"


concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(make_url, concat_dim)


def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# This fails integration test if not imported here
# TODO: see if --setup-file option for runner fixes this
import xarray as xr

ds = xr.open_dataset(store, engine="zarr", chunks={})
assert ds.title == (
"Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
)

assert ds.chunks["time"][0] == 2
return store


def chunk_func(ds: xr.Dataset) -> Dict[str, int]:
return {"time": 2}


recipe = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
| OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
| StoreToZarr(
dynamic_chunking_fn=chunk_func,
store_name="gpcp.zarr",
combine_dims=pattern.combine_dim_keys,
)
| "Test dataset" >> beam.Map(test_ds)
)
2 changes: 2 additions & 0 deletions examples/feedstock/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
recipes:
- id: "gpcp-from-gcs"
object: "gpcp_from_gcs:recipe"
- id: "gpcp-from-gcs-dynamic-chunks"
object: "gpcp_from_gcs_dynamic_chunks:recipe"
- id: "noaa-oisst"
object: "noaa_oisst:recipe"
- id: "terraclimate"
Expand Down
32 changes: 27 additions & 5 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import xarray as xr
import zarr

from .aggregation import XarraySchema, dataset_to_schema, schema_to_zarr
from .aggregation import XarraySchema, dataset_to_schema, schema_to_template_ds, schema_to_zarr
from .combiners import CombineMultiZarrToZarr, CombineXarraySchemas
from .openers import open_url, open_with_kerchunk, open_with_xarray
from .patterns import CombineOp, Dimension, FileType, Index, augment_index_with_start_stop
Expand Down Expand Up @@ -505,7 +505,14 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
:param target_root: Root path the Zarr store will be created inside;
`store_name` will be appended to this prefix to create a full path.
:param target_chunks: Dictionary mapping dimension names to chunks sizes.
If a dimension is a not named, the chunks will be inferred from the data.
If a dimension is a not named, the chunks will be inferred from the data.
:param dynamic_chunking_fn: Optionally provide a function that takes an ``xarray.Dataset``
template dataset as its first argument and returns a dynamically generated chunking dict.
If provided, ``target_chunks`` cannot also be passed. You can use this to determine chunking
based on the full dataset (e.g. divide along a certain dimension based on a desired chunk
size in memory). For more advanced chunking strategies, check
out https://github.com/jbusecke/dynamic_chunks
:param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``.
:param attrs: Extra group-level attributes to inject into the dataset.
"""

Expand All @@ -517,19 +524,34 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
default_factory=RequiredAtRuntimeDefault
)
target_chunks: Dict[str, int] = field(default_factory=dict)
dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)

def __post_init__(self):
if self.target_chunks and self.dynamic_chunking_fn:
raise ValueError("Passing both `target_chunks` and `dynamic_chunking_fn` not allowed.")

def expand(
self,
datasets: beam.PCollection[Tuple[Index, xr.Dataset]],
) -> beam.PCollection[zarr.storage.FSStore]:
schema = datasets | DetermineSchema(combine_dims=self.combine_dims)
indexed_datasets = datasets | IndexItems(schema=schema)
rechunked_datasets = indexed_datasets | Rechunk(
target_chunks=self.target_chunks, schema=schema
target_chunks = (
self.target_chunks
if not self.dynamic_chunking_fn
else beam.pvalue.AsSingleton(
schema
| beam.Map(schema_to_template_ds)
| beam.Map(self.dynamic_chunking_fn, **self.dynamic_chunking_fn_kwargs)
)
)
rechunked_datasets = indexed_datasets | Rechunk(target_chunks=target_chunks, schema=schema)
target_store = schema | PrepareZarrTarget(
target=self.get_full_target(), target_chunks=self.target_chunks, attrs=self.attrs
target=self.get_full_target(),
target_chunks=target_chunks,
attrs=self.attrs,
)
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store)
singleton_target_store = (
Expand Down
85 changes: 78 additions & 7 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,20 @@ def _check_chunks(actual):
assert_that(rechunked, correct_chunks())


class OpenZarrStore(beam.PTransform):
@staticmethod
def _open_zarr(store):
return xr.open_dataset(store, engine="zarr", chunks={})

def expand(self, pcoll):
return pcoll | beam.Map(self._open_zarr)


def test_StoreToZarr_emits_openable_fsstore(
pipeline,
netcdf_local_file_pattern_sequential,
tmp_target_url,
):
def _open_zarr(store):
return xr.open_dataset(store, engine="zarr")

class OpenZarrStore(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.Map(_open_zarr)

def is_xrdataset():
def _is_xr_dataset(actual):
assert len(actual) == 1
Expand All @@ -266,6 +268,75 @@ def _is_xr_dataset(actual):
assert_that(open_store, is_xrdataset())


@pytest.mark.parametrize("with_kws", [True, False])
def test_StoreToZarr_dynamic_chunking_interface(
pipeline: beam.Pipeline,
netcdf_local_file_pattern_sequential: FilePattern,
tmp_target_url: str,
daily_xarray_dataset: xr.Dataset,
with_kws: bool,
):
def has_dynamically_set_chunks():
def _has_dynamically_set_chunks(actual):
assert len(actual) == 1
item = actual[0]
assert isinstance(item, xr.Dataset)
if not with_kws:
# we've dynamically set the number of timesteps per chunk to be equal to
# the length of the full time dimension of the aggregate dataset, therefore
# if this worked, there should only be one chunk
assert len(item.chunks["time"]) == 1
else:
# in this case, we've passed the kws {"divisor": 2}, so we expect two time chunks
assert len(item.chunks["time"]) == 2

return _has_dynamically_set_chunks

pattern: FilePattern = netcdf_local_file_pattern_sequential

time_len = len(daily_xarray_dataset.time)

def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1):
assert isinstance(template_ds, xr.Dataset)
return {"time": int(time_len / divisor)}

kws = {} if not with_kws else {"dynamic_chunking_fn_kwargs": {"divisor": 2}}

with pipeline as p:
datasets = p | beam.Create(pattern.items()) | OpenWithXarray()
target_store = datasets | StoreToZarr(
target_root=tmp_target_url,
store_name="test.zarr",
combine_dims=pattern.combine_dim_keys,
attrs={},
dynamic_chunking_fn=dynamic_chunking_fn,
**kws,
)
open_store = target_store | OpenZarrStore()
assert_that(open_store, has_dynamically_set_chunks())


def test_StoreToZarr_dynamic_chunking_with_target_chunks_raises(
netcdf_local_file_pattern_sequential: FilePattern,
):
def fn(template_ds):
pass

pattern: FilePattern = netcdf_local_file_pattern_sequential

with pytest.raises(
ValueError,
match="Passing both `target_chunks` and `dynamic_chunking_fn` not allowed",
):
_ = StoreToZarr(
target_root="target_root",
store_name="test.zarr",
combine_dims=pattern.combine_dim_keys,
target_chunks={"time": 1},
dynamic_chunking_fn=fn,
)


def test_StoreToZarr_target_root_default_unrunnable(
pipeline,
netcdf_local_file_pattern_sequential,
Expand Down

0 comments on commit 5d77d49

Please sign in to comment.