diff --git a/docs/composition/styles.md b/docs/composition/styles.md index 4379e27a..be697b03 100644 --- a/docs/composition/styles.md +++ b/docs/composition/styles.md @@ -32,6 +32,21 @@ If using the {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordin ``` +```{note} +{class}`pangeo_forge_recipes.transforms.StoreToZarr` supports appending to existing Zarr stores +via the optional `append_dim` keyword argument. This option functions nearly identically to the +`append_dim` kwarg in +[`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.Dataset.to_zarr.html); +the two differences with this method are that Pangeo Forge will automatically introspect the inputs in +your {class}`FilePattern ` to determine how the existing Zarr +store dimensions need to be resized, and that writes are parallelized via Apache Beam. Apart from +ensuring that the named `append_dim` already exists in the dataset to which you are appending, use of +this option does not ensure logical consistency (e.g. contiguousness, etc.) of the appended data. When +selecting this option, it is therefore up to you, the user, to ensure that the inputs provided in the +{doc}`file pattern ` for the appending recipe are limited to those which you want to +append. +``` + ## Open with Kerchunk, write to virtual Zarr diff --git a/pangeo_forge_recipes/aggregation.py b/pangeo_forge_recipes/aggregation.py index 28b015ec..1ac20f99 100644 --- a/pangeo_forge_recipes/aggregation.py +++ b/pangeo_forge_recipes/aggregation.py @@ -287,13 +287,19 @@ def schema_to_zarr( attrs: Optional[Dict[str, str]] = None, consolidated_metadata: Optional[bool] = True, encoding: Optional[Dict] = None, + append_dim: Optional[str] = None, ) -> zarr.storage.FSStore: """Initialize a zarr group based on a schema.""" + if append_dim: + # if appending, only keep schema for coordinate to append. if we don't drop other + # coords, we may end up overwriting existing data on the `ds.to_zarr` call below. + schema["coords"] = {k: v for k, v in schema["coords"].items() if k == append_dim} ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs) - # using mode="w" makes this function idempotent + # using mode="w" makes this function idempotent when not appending ds.to_zarr( target_store, - mode="w", + append_dim=append_dim, + mode=("a" if append_dim else "w"), compute=False, consolidated=consolidated_metadata, encoding=encoding, diff --git a/pangeo_forge_recipes/patterns.py b/pangeo_forge_recipes/patterns.py index dbe7d224..9c74c9d8 100644 --- a/pangeo_forge_recipes/patterns.py +++ b/pangeo_forge_recipes/patterns.py @@ -63,18 +63,23 @@ class MergeDim(CombineDim): operation: ClassVar[CombineOp] = CombineOp.MERGE -def augment_index_with_start_stop(position: Position, item_lens: List[int]) -> IndexedPosition: +def augment_index_with_start_stop( + position: Position, + item_lens: List[int], + append_offset: int = 0, +) -> IndexedPosition: """Take an index _without_ start / stop and add them based on the lens defined in sequence_lens. :param index: The ``DimIndex`` instance to augment. :param item_lens: A list of integer lengths for all items in the sequence. + :param append_offset: If appending, the length of the existing ``append_dim``. """ if position.indexed: raise ValueError("This position is already indexed") start = sum(item_lens[: position.value]) dimsize = sum(item_lens) - return IndexedPosition(start, dimsize=dimsize) + return IndexedPosition(start + append_offset, dimsize=dimsize + append_offset) class AutoName(Enum): diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 08a90794..58b582c1 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -301,21 +301,26 @@ class IndexItems(beam.PTransform): """Augment dataset indexes with information about start and stop position.""" schema: beam.PCollection + append_offset: int = 0 @staticmethod - def index_item(item: Indexed[T], schema: XarraySchema) -> Indexed[T]: + def index_item(item: Indexed[T], schema: XarraySchema, append_offset: int) -> Indexed[T]: index, ds = item new_index = Index() for dimkey, dimval in index.items(): if dimkey.operation == CombineOp.CONCAT: item_len_dict = schema["chunks"][dimkey.name] item_lens = [item_len_dict[n] for n in range(len(item_len_dict))] - dimval = augment_index_with_start_stop(dimval, item_lens) + dimval = augment_index_with_start_stop(dimval, item_lens, append_offset) new_index[dimkey] = dimval return new_index, ds def expand(self, pcoll: beam.PCollection): - return pcoll | beam.Map(self.index_item, schema=beam.pvalue.AsSingleton(self.schema)) + return pcoll | beam.Map( + self.index_item, + schema=beam.pvalue.AsSingleton(self.schema), + append_offset=self.append_offset, + ) @dataclass @@ -341,6 +346,7 @@ class PrepareZarrTarget(beam.PTransform): then falling out of sync with coordinates if ConsolidateDimensionCoordinates() is applied to the output of StoreToZarr(). + :param append_dim: Optional name of the dimension to append to. """ target: str | FSSpecTarget @@ -348,6 +354,7 @@ class PrepareZarrTarget(beam.PTransform): attrs: Dict[str, str] = field(default_factory=dict) consolidated_metadata: Optional[bool] = True encoding: Optional[dict] = field(default_factory=dict) + append_dim: Optional[str] = None def expand(self, pcoll: beam.PCollection) -> beam.PCollection: if isinstance(self.target, str): @@ -362,6 +369,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: attrs=self.attrs, encoding=self.encoding, consolidated_metadata=False, + append_dim=self.append_dim, ) return initialized_target @@ -641,8 +649,8 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): out https://github.com/jbusecke/dynamic_chunks :param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``. :param attrs: Extra group-level attributes to inject into the dataset. - :param encoding: Dictionary encoding for xarray.to_zarr(). + :param append_dim: Optional name of the dimension to append to. """ # TODO: make it so we don't have to explicitly specify combine_dims @@ -657,17 +665,31 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict) attrs: Dict[str, str] = field(default_factory=dict) encoding: Optional[dict] = field(default_factory=dict) + append_dim: Optional[str] = None def __post_init__(self): if self.target_chunks and self.dynamic_chunking_fn: raise ValueError("Passing both `target_chunks` and `dynamic_chunking_fn` not allowed.") + self._append_offset = 0 + if self.append_dim: + logger.warn( + "When `append_dim` is given, StoreToZarr is NOT idempotent. Successive deployment " + "with the same inputs will append duplicate data to the existing store." + ) + dim = [d for d in self.combine_dims if d.name == self.append_dim] + assert dim, f"Append dim not in {self.combine_dims=}." + assert dim[0].operation == CombineOp.CONCAT, "Append dim operation must be CONCAT." + existing_ds = xr.open_dataset(self.get_full_target().get_mapper(), engine="zarr") + assert self.append_dim in existing_ds, "Append dim must be in existing dataset." + self._append_offset = len(existing_ds[self.append_dim]) + def expand( self, datasets: beam.PCollection[Tuple[Index, xr.Dataset]], ) -> beam.PCollection[zarr.storage.FSStore]: schema = datasets | DetermineSchema(combine_dims=self.combine_dims) - indexed_datasets = datasets | IndexItems(schema=schema) + indexed_datasets = datasets | IndexItems(schema=schema, append_offset=self._append_offset) target_chunks = ( self.target_chunks if not self.dynamic_chunking_fn @@ -684,6 +706,7 @@ def expand( target_chunks=target_chunks, attrs=self.attrs, encoding=self.encoding, + append_dim=self.append_dim, ) n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store) singleton_target_store = ( diff --git a/tests/conftest.py b/tests/conftest.py index 2d02739d..73445571 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -276,6 +276,11 @@ def daily_xarray_dataset(): return make_ds(nt=10) +@pytest.fixture(scope="session") +def daily_xarray_datasets_to_append(): + return make_ds(nt=10, start="2010-01-01"), make_ds(nt=10, start="2010-01-11") + + @pytest.fixture(scope="session") def daily_xarray_dataset_with_coordinateless_dimension(daily_xarray_dataset): """ @@ -295,6 +300,23 @@ def netcdf_local_paths_sequential_1d(daily_xarray_dataset, tmpdir_factory): ) +@pytest.fixture(scope="session") +def netcdf_local_paths_sequential_1d_to_append( + daily_xarray_datasets_to_append, + tmpdir_factory, +): + return [ + make_local_paths( + ds, + tmpdir_factory, + "D", + split_up_files_by_day, + file_type="netcdf4", + ) + for ds in daily_xarray_datasets_to_append + ] + + @pytest.fixture(scope="session") def netcdf3_local_paths_sequential_1d(daily_xarray_dataset, tmpdir_factory): return make_local_paths( @@ -448,6 +470,11 @@ def netcdf_local_paths_sequential_with_coordinateless_dimension( # FilePattern fixtures ---------------------------------------------------------------------------- +@pytest.fixture(scope="session") +def netcdf_local_file_patterns_to_append(netcdf_local_paths_sequential_1d_to_append): + return [make_file_pattern(paths) for paths in netcdf_local_paths_sequential_1d_to_append] + + @pytest.fixture(scope="session") def netcdf_local_file_pattern_sequential(netcdf_local_paths_sequential): return make_file_pattern(netcdf_local_paths_sequential) diff --git a/tests/data_generation.py b/tests/data_generation.py index 729d4267..8d2c02b7 100644 --- a/tests/data_generation.py +++ b/tests/data_generation.py @@ -3,13 +3,13 @@ import xarray as xr -def make_ds(nt=10, non_dim_coords=False): +def make_ds(nt=10, non_dim_coords=False, start="2010-01-01"): """Return a synthetic random xarray dataset.""" np.random.seed(2) # TODO: change nt to 11 in order to catch the edge case where # items_per_input does not evenly divide the length of the sequence dimension ny, nx = 18, 36 - time = pd.date_range(start="2010-01-01", periods=nt, freq="D") + time = pd.date_range(start=start, periods=nt, freq="D") lon = (np.arange(nx) + 0.5) * 360 / nx lon_attrs = {"units": "degrees_east", "long_name": "longitude"} lat = (np.arange(ny) + 0.5) * 180 / ny diff --git a/tests/test_aggregation.py b/tests/test_aggregation.py index eceaffb0..a69e6bab 100644 --- a/tests/test_aggregation.py +++ b/tests/test_aggregation.py @@ -8,7 +8,9 @@ dataset_to_schema, determine_target_chunks, schema_to_template_ds, + schema_to_zarr, ) +from pangeo_forge_recipes.storage import FSSpecTarget from .data_generation import make_ds @@ -190,3 +192,41 @@ def test_concat_accumulator(): assert ( merge_accumulator.schema["data_vars"]["bar"] == merge_accumulator.schema["data_vars"]["BAR"] ) + + +def test_schema_to_zarr(daily_xarray_dataset: xr.Dataset, tmp_target: FSSpecTarget): + target_store = tmp_target.get_mapper() + schema = dataset_to_schema(daily_xarray_dataset) + schema_to_zarr( + schema=schema, + target_store=target_store, + target_chunks={}, + attrs={}, + consolidated_metadata=False, + encoding=None, + append_dim=None, + ) + ds = xr.open_dataset(target_store, engine="zarr") + assert len(ds.time) == len(daily_xarray_dataset.time) + assert len(ds.lon) == len(daily_xarray_dataset.lon) + assert len(ds.lat) == len(daily_xarray_dataset.lat) + + +def test_schema_to_zarr_append_mode( + daily_xarray_datasets_to_append: tuple[xr.Dataset, xr.Dataset], + tmp_target: FSSpecTarget, +): + """Tests dimension resizing for append.""" + + ds0, ds1 = daily_xarray_datasets_to_append + target_store = tmp_target.get_mapper() + + schema_ds0 = dataset_to_schema(ds0) + schema_to_zarr(schema=schema_ds0, append_dim=None, target_store=target_store) + ds0_zarr = xr.open_dataset(target_store, engine="zarr") + assert len(ds0_zarr.time) == len(ds0.time) + + schema_ds1 = dataset_to_schema(ds1) + schema_to_zarr(schema=schema_ds1, append_dim="time", target_store=target_store) + appended_zarr = xr.open_dataset(target_store, engine="zarr") + assert len(appended_zarr.time) == len(ds0.time) + len(ds1.time) diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index 0503eb46..37c0d60c 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -83,6 +83,56 @@ def test_xarray_zarr_subpath( xr.testing.assert_equal(ds.load(), daily_xarray_dataset) +def test_xarray_zarr_append( + daily_xarray_datasets_to_append, + netcdf_local_file_patterns_to_append, + tmp_target, +): + ds0_fixture, ds1_fixture = daily_xarray_datasets_to_append + pattern0, pattern1 = netcdf_local_file_patterns_to_append + assert pattern0.combine_dim_keys == pattern1.combine_dim_keys + + # these kws are reused across both initial and append pipelines + common_kws = dict( + target_root=tmp_target, + store_name="store", + combine_dims=pattern0.combine_dim_keys, + ) + store_path = os.path.join(tmp_target.root_path, "store") + # build an initial zarr store, to which we will append + options = PipelineOptions(runtime_type_check=False) + # we run two pipelines in this test, so instantiate them separately to + # avoid any potential of strange co-mingling between the same pipeline + with TestPipeline(options=options) as p0: + ( + p0 + | "CreateInitial" >> beam.Create(pattern0.items()) + | "OpenInitial" >> OpenWithXarray() + | "StoreInitial" >> StoreToZarr(**common_kws) + ) + + # make sure the initial zarr store looks good + initial_actual = xr.open_dataset(store_path, engine="zarr") + assert len(initial_actual.time) == 10 + xr.testing.assert_equal(initial_actual.load(), ds0_fixture) + + # now append to it. the two differences here are + # passing `pattern1` in `Create` and `append_dim="time"` in `StoreToZarr` + with TestPipeline(options=options) as p1: + ( + p1 + | "CreateAppend" >> beam.Create(pattern1.items()) + | "OpenAppend" >> OpenWithXarray() + | "StoreAppend" >> StoreToZarr(append_dim="time", **common_kws) + ) + + # now see if we have appended to time dimension as intended + append_actual = xr.open_dataset(store_path, engine="zarr") + assert len(append_actual.time) == 20 + append_expected = xr.concat([ds0_fixture, ds1_fixture], dim="time") + xr.testing.assert_equal(append_actual.load(), append_expected) + + @pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"]) def test_reference_netcdf( daily_xarray_dataset, diff --git a/tests/test_patterns.py b/tests/test_patterns.py index 2fe81df7..646e5d82 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -205,8 +205,9 @@ def test_setting_file_types(file_type_value): "position,start", [(0, 0), (1, 2), (2, 4), (3, 7), (4, 9)], ) -def test_augment_index_with_start_stop(position, start): +@pytest.mark.parametrize("append_offset", [0, 5, 500]) +def test_augment_index_with_start_stop(position, start, append_offset): dk = Position(position) - expected = IndexedPosition(start, dimsize=11) - actual = augment_index_with_start_stop(dk, [2, 2, 3, 2, 2]) + expected = IndexedPosition(start + append_offset, dimsize=11 + append_offset) + actual = augment_index_with_start_stop(dk, [2, 2, 3, 2, 2], append_offset) assert actual == expected diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 1bf7c692..47976af5 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -7,7 +7,7 @@ from pytest_lazyfixture import lazy_fixture from pangeo_forge_recipes.aggregation import dataset_to_schema -from pangeo_forge_recipes.patterns import FilePattern, FileType +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, FileType, MergeDim from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget from pangeo_forge_recipes.transforms import ( DetermineSchema, @@ -304,7 +304,7 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1): assert isinstance(template_ds, xr.Dataset) return {"time": int(time_len / divisor)} - kws = {} if not with_kws else {"dynamic_chunking_fn_kwargs": {"divisor": 2}} + dynamic_chunking_fn_kwargs = {} if not with_kws else {"divisor": 2} with pipeline as p: datasets = p | beam.Create(pattern.items()) | OpenWithXarray() @@ -312,9 +312,8 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1): target_root=tmp_target, store_name="test.zarr", combine_dims=pattern.combine_dim_keys, - attrs={}, dynamic_chunking_fn=dynamic_chunking_fn, - **kws, + dynamic_chunking_fn_kwargs=dynamic_chunking_fn_kwargs, ) open_store = target_store | OpenZarrStore() assert_that(open_store, has_dynamically_set_chunks()) @@ -341,6 +340,25 @@ def fn(template_ds): ) +@pytest.mark.parametrize( + "append_dim, match", + [ + ("date", "Append dim not in self.combine_dims"), + ("var", "Append dim operation must be CONCAT."), + ], +) +def test_StoreToZarr_append_dim_asserts_raises(append_dim, match): + pattern = FilePattern(lambda x: x, ConcatDim("time", [1, 2]), MergeDim("var", ["foo", "bar"])) + kws = dict( + target_root="target", + store_name="test.zarr", + combine_dims=pattern.combine_dim_keys, + target_chunks={"time": 1}, + ) + with pytest.raises(AssertionError, match=match): + _ = StoreToZarr(append_dim=append_dim, **kws) + + def test_StoreToZarr_target_root_default_unrunnable( pipeline, netcdf_local_file_pattern_sequential,