Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Append mode for StoreToZarr #721

Merged
merged 24 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b74905b
mode kw for schema_to_zarr
cisaacstern Mar 29, 2024
8e3d55e
pass mode kw through transforms
cisaacstern Mar 29, 2024
9d18ba8
appending end to end test WIP
cisaacstern Mar 29, 2024
342dd26
test_schema_to_zarr
cisaacstern Mar 29, 2024
401ee46
mypy
cisaacstern Mar 29, 2024
ee41b76
Merge remote-tracking branch 'origin/main' into append-mode
cisaacstern Mar 29, 2024
b9f6b8b
note writer offset + test
cisaacstern Mar 29, 2024
9defde9
append_dim, not mode
cisaacstern Mar 29, 2024
9eff610
schema_to_zarr append test
cisaacstern Mar 29, 2024
9a3cb4b
fix broken test
cisaacstern Mar 29, 2024
cd13679
simplify schema to zarr append test
cisaacstern Mar 29, 2024
d0cbb3f
get dimension resizing to work in end to end test
cisaacstern Mar 29, 2024
3b4c31e
append_offset for aughment_index...
cisaacstern Mar 31, 2024
5b43dae
pass append_offset through from transforms layer
cisaacstern Mar 31, 2024
99e41ed
only append schema for append dim coord
cisaacstern Mar 31, 2024
4bf2eae
appending end to end test passes
cisaacstern Mar 31, 2024
468f326
revert writers changes
cisaacstern Mar 31, 2024
5ce2d62
remove unused variable in test
cisaacstern Mar 31, 2024
7eb866c
test append dim asserts raises
cisaacstern Mar 31, 2024
6822f56
unit test augment with append offset
cisaacstern Mar 31, 2024
ffd21b9
idempotentcy warning
cisaacstern Mar 31, 2024
a51d3cc
remove unused test stub
cisaacstern Mar 31, 2024
aaa5a69
appending note in docs
cisaacstern Apr 2, 2024
c75a274
fix doc crosslink
cisaacstern Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/composition/styles.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ If using the {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordin

```

```{note}
{class}`pangeo_forge_recipes.transforms.StoreToZarr` supports appending to existing Zarr stores
via the optional `append_dim` keyword argument. This option functions nearly identically to the
`append_dim` kwarg in
[`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.Dataset.to_zarr.html);
the two differences with this method are that Pangeo Forge will automatically introspect the inputs in
your {class}`FilePattern <pangeo_forge_recipes.patterns.FilePattern>` to determine how the existing Zarr
store dimensions need to be resized, and that writes are parallelized via Apache Beam. Apart from
ensuring that the named `append_dim` already exists in the dataset to which you are appending, use of
this option does not ensure logical consistency (e.g. contiguousness, etc.) of the appended data. When
selecting this option, it is therefore up to you, the user, to ensure that the inputs provided in the
{doc}`file pattern <file_patterns>` for the appending recipe are limited to those which you want to
append.
```


## Open with Kerchunk, write to virtual Zarr

Expand Down
10 changes: 8 additions & 2 deletions pangeo_forge_recipes/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,19 @@ def schema_to_zarr(
attrs: Optional[Dict[str, str]] = None,
consolidated_metadata: Optional[bool] = True,
encoding: Optional[Dict] = None,
append_dim: Optional[str] = None,
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved
) -> zarr.storage.FSStore:
"""Initialize a zarr group based on a schema."""
if append_dim:
# if appending, only keep schema for coordinate to append. if we don't drop other
# coords, we may end up overwriting existing data on the `ds.to_zarr` call below.
schema["coords"] = {k: v for k, v in schema["coords"].items() if k == append_dim}
Comment on lines +293 to +296
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realizing that this, while seemingly necessary to avoid overwriting non-append_dim coords, may prevent us from benefiting from the dimension consistency checks offered by ds.to_zarr... we may need to re-implement such checks in StoreToZarr.__post_init__ if we want them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this further... in StoreToZarr.__post_init__ we actually don't know what the coordinates in the new data are (aside from the user-provided append_dim), because we haven't actually fetched and opened any source files yet.

So if we want to check for consistency, we could actually do it right here. Before dropping non-append coords from the schema, we would just need to open the target dataset and check that all coords match first. Could be a good idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having just reviewed xarray's checks again, I actually don't see that this is done there, so may be overly strict. I think we can move forward with what we have for now.

ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs)
# using mode="w" makes this function idempotent
# using mode="w" makes this function idempotent when not appending
ds.to_zarr(
target_store,
mode="w",
append_dim=append_dim,
mode=("a" if append_dim else "w"),
compute=False,
consolidated=consolidated_metadata,
encoding=encoding,
Expand Down
9 changes: 7 additions & 2 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,23 @@ class MergeDim(CombineDim):
operation: ClassVar[CombineOp] = CombineOp.MERGE


def augment_index_with_start_stop(position: Position, item_lens: List[int]) -> IndexedPosition:
def augment_index_with_start_stop(
position: Position,
item_lens: List[int],
append_offset: int = 0,
) -> IndexedPosition:
"""Take an index _without_ start / stop and add them based on the lens defined in sequence_lens.

:param index: The ``DimIndex`` instance to augment.
:param item_lens: A list of integer lengths for all items in the sequence.
:param append_offset: If appending, the length of the existing ``append_dim``.
"""

if position.indexed:
raise ValueError("This position is already indexed")
start = sum(item_lens[: position.value])
dimsize = sum(item_lens)
return IndexedPosition(start, dimsize=dimsize)
return IndexedPosition(start + append_offset, dimsize=dimsize + append_offset)


class AutoName(Enum):
Expand Down
33 changes: 28 additions & 5 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,21 +301,26 @@ class IndexItems(beam.PTransform):
"""Augment dataset indexes with information about start and stop position."""

schema: beam.PCollection
append_offset: int = 0

@staticmethod
def index_item(item: Indexed[T], schema: XarraySchema) -> Indexed[T]:
def index_item(item: Indexed[T], schema: XarraySchema, append_offset: int) -> Indexed[T]:
index, ds = item
new_index = Index()
for dimkey, dimval in index.items():
if dimkey.operation == CombineOp.CONCAT:
item_len_dict = schema["chunks"][dimkey.name]
item_lens = [item_len_dict[n] for n in range(len(item_len_dict))]
dimval = augment_index_with_start_stop(dimval, item_lens)
dimval = augment_index_with_start_stop(dimval, item_lens, append_offset)
new_index[dimkey] = dimval
return new_index, ds

def expand(self, pcoll: beam.PCollection):
return pcoll | beam.Map(self.index_item, schema=beam.pvalue.AsSingleton(self.schema))
return pcoll | beam.Map(
self.index_item,
schema=beam.pvalue.AsSingleton(self.schema),
append_offset=self.append_offset,
)


@dataclass
Expand All @@ -341,13 +346,15 @@ class PrepareZarrTarget(beam.PTransform):
then falling out of sync with coordinates if
ConsolidateDimensionCoordinates() is applied to the output of
StoreToZarr().
:param append_dim: Optional name of the dimension to append to.
"""

target: str | FSSpecTarget
target_chunks: Dict[str, int] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)
consolidated_metadata: Optional[bool] = True
encoding: Optional[dict] = field(default_factory=dict)
append_dim: Optional[str] = None

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
if isinstance(self.target, str):
Expand All @@ -362,6 +369,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
attrs=self.attrs,
encoding=self.encoding,
consolidated_metadata=False,
append_dim=self.append_dim,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing how easy it was to modify this step to allow appending.

)
return initialized_target

Expand Down Expand Up @@ -641,8 +649,8 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
out https://github.com/jbusecke/dynamic_chunks
:param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``.
:param attrs: Extra group-level attributes to inject into the dataset.

:param encoding: Dictionary encoding for xarray.to_zarr().
:param append_dim: Optional name of the dimension to append to.
"""

# TODO: make it so we don't have to explicitly specify combine_dims
Expand All @@ -657,17 +665,31 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)
encoding: Optional[dict] = field(default_factory=dict)
append_dim: Optional[str] = None

def __post_init__(self):
if self.target_chunks and self.dynamic_chunking_fn:
raise ValueError("Passing both `target_chunks` and `dynamic_chunking_fn` not allowed.")

self._append_offset = 0
if self.append_dim:
logger.warn(
"When `append_dim` is given, StoreToZarr is NOT idempotent. Successive deployment "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In future PR's to this, do you see a good path forward to making append safe/idempotent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! For future readers, we had a good discussion of this in today's Coordination meeting, minutes here.

"with the same inputs will append duplicate data to the existing store."
)
dim = [d for d in self.combine_dims if d.name == self.append_dim]
assert dim, f"Append dim not in {self.combine_dims=}."
assert dim[0].operation == CombineOp.CONCAT, "Append dim operation must be CONCAT."
existing_ds = xr.open_dataset(self.get_full_target().get_mapper(), engine="zarr")
assert self.append_dim in existing_ds, "Append dim must be in existing dataset."
self._append_offset = len(existing_ds[self.append_dim])

def expand(
self,
datasets: beam.PCollection[Tuple[Index, xr.Dataset]],
) -> beam.PCollection[zarr.storage.FSStore]:
schema = datasets | DetermineSchema(combine_dims=self.combine_dims)
indexed_datasets = datasets | IndexItems(schema=schema)
indexed_datasets = datasets | IndexItems(schema=schema, append_offset=self._append_offset)
target_chunks = (
self.target_chunks
if not self.dynamic_chunking_fn
Expand All @@ -684,6 +706,7 @@ def expand(
target_chunks=target_chunks,
attrs=self.attrs,
encoding=self.encoding,
append_dim=self.append_dim,
)
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store)
singleton_target_store = (
Expand Down
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ def daily_xarray_dataset():
return make_ds(nt=10)


@pytest.fixture(scope="session")
def daily_xarray_datasets_to_append():
return make_ds(nt=10, start="2010-01-01"), make_ds(nt=10, start="2010-01-11")


@pytest.fixture(scope="session")
def daily_xarray_dataset_with_coordinateless_dimension(daily_xarray_dataset):
"""
Expand All @@ -295,6 +300,23 @@ def netcdf_local_paths_sequential_1d(daily_xarray_dataset, tmpdir_factory):
)


@pytest.fixture(scope="session")
def netcdf_local_paths_sequential_1d_to_append(
daily_xarray_datasets_to_append,
tmpdir_factory,
):
return [
make_local_paths(
ds,
tmpdir_factory,
"D",
split_up_files_by_day,
file_type="netcdf4",
)
for ds in daily_xarray_datasets_to_append
]


@pytest.fixture(scope="session")
def netcdf3_local_paths_sequential_1d(daily_xarray_dataset, tmpdir_factory):
return make_local_paths(
Expand Down Expand Up @@ -448,6 +470,11 @@ def netcdf_local_paths_sequential_with_coordinateless_dimension(
# FilePattern fixtures ----------------------------------------------------------------------------


@pytest.fixture(scope="session")
def netcdf_local_file_patterns_to_append(netcdf_local_paths_sequential_1d_to_append):
return [make_file_pattern(paths) for paths in netcdf_local_paths_sequential_1d_to_append]


@pytest.fixture(scope="session")
def netcdf_local_file_pattern_sequential(netcdf_local_paths_sequential):
return make_file_pattern(netcdf_local_paths_sequential)
Expand Down
4 changes: 2 additions & 2 deletions tests/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import xarray as xr


def make_ds(nt=10, non_dim_coords=False):
def make_ds(nt=10, non_dim_coords=False, start="2010-01-01"):
"""Return a synthetic random xarray dataset."""
np.random.seed(2)
# TODO: change nt to 11 in order to catch the edge case where
# items_per_input does not evenly divide the length of the sequence dimension
ny, nx = 18, 36
time = pd.date_range(start="2010-01-01", periods=nt, freq="D")
time = pd.date_range(start=start, periods=nt, freq="D")
lon = (np.arange(nx) + 0.5) * 360 / nx
lon_attrs = {"units": "degrees_east", "long_name": "longitude"}
lat = (np.arange(ny) + 0.5) * 180 / ny
Expand Down
40 changes: 40 additions & 0 deletions tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
dataset_to_schema,
determine_target_chunks,
schema_to_template_ds,
schema_to_zarr,
)
from pangeo_forge_recipes.storage import FSSpecTarget

from .data_generation import make_ds

Expand Down Expand Up @@ -190,3 +192,41 @@ def test_concat_accumulator():
assert (
merge_accumulator.schema["data_vars"]["bar"] == merge_accumulator.schema["data_vars"]["BAR"]
)


def test_schema_to_zarr(daily_xarray_dataset: xr.Dataset, tmp_target: FSSpecTarget):
target_store = tmp_target.get_mapper()
schema = dataset_to_schema(daily_xarray_dataset)
schema_to_zarr(
schema=schema,
target_store=target_store,
target_chunks={},
attrs={},
consolidated_metadata=False,
encoding=None,
append_dim=None,
)
ds = xr.open_dataset(target_store, engine="zarr")
assert len(ds.time) == len(daily_xarray_dataset.time)
assert len(ds.lon) == len(daily_xarray_dataset.lon)
assert len(ds.lat) == len(daily_xarray_dataset.lat)


def test_schema_to_zarr_append_mode(
daily_xarray_datasets_to_append: tuple[xr.Dataset, xr.Dataset],
tmp_target: FSSpecTarget,
):
"""Tests dimension resizing for append."""

ds0, ds1 = daily_xarray_datasets_to_append
target_store = tmp_target.get_mapper()

schema_ds0 = dataset_to_schema(ds0)
schema_to_zarr(schema=schema_ds0, append_dim=None, target_store=target_store)
ds0_zarr = xr.open_dataset(target_store, engine="zarr")
assert len(ds0_zarr.time) == len(ds0.time)

schema_ds1 = dataset_to_schema(ds1)
schema_to_zarr(schema=schema_ds1, append_dim="time", target_store=target_store)
appended_zarr = xr.open_dataset(target_store, engine="zarr")
assert len(appended_zarr.time) == len(ds0.time) + len(ds1.time)
50 changes: 50 additions & 0 deletions tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,56 @@ def test_xarray_zarr_subpath(
xr.testing.assert_equal(ds.load(), daily_xarray_dataset)


def test_xarray_zarr_append(
daily_xarray_datasets_to_append,
netcdf_local_file_patterns_to_append,
tmp_target,
):
ds0_fixture, ds1_fixture = daily_xarray_datasets_to_append
pattern0, pattern1 = netcdf_local_file_patterns_to_append
assert pattern0.combine_dim_keys == pattern1.combine_dim_keys

# these kws are reused across both initial and append pipelines
common_kws = dict(
target_root=tmp_target,
store_name="store",
combine_dims=pattern0.combine_dim_keys,
)
store_path = os.path.join(tmp_target.root_path, "store")
# build an initial zarr store, to which we will append
options = PipelineOptions(runtime_type_check=False)
# we run two pipelines in this test, so instantiate them separately to
# avoid any potential of strange co-mingling between the same pipeline
with TestPipeline(options=options) as p0:
(
p0
| "CreateInitial" >> beam.Create(pattern0.items())
| "OpenInitial" >> OpenWithXarray()
| "StoreInitial" >> StoreToZarr(**common_kws)
)

# make sure the initial zarr store looks good
initial_actual = xr.open_dataset(store_path, engine="zarr")
assert len(initial_actual.time) == 10
xr.testing.assert_equal(initial_actual.load(), ds0_fixture)

# now append to it. the two differences here are
# passing `pattern1` in `Create` and `append_dim="time"` in `StoreToZarr`
with TestPipeline(options=options) as p1:
(
p1
| "CreateAppend" >> beam.Create(pattern1.items())
| "OpenAppend" >> OpenWithXarray()
| "StoreAppend" >> StoreToZarr(append_dim="time", **common_kws)
)

# now see if we have appended to time dimension as intended
append_actual = xr.open_dataset(store_path, engine="zarr")
assert len(append_actual.time) == 20
append_expected = xr.concat([ds0_fixture, ds1_fixture], dim="time")
xr.testing.assert_equal(append_actual.load(), append_expected)


@pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"])
def test_reference_netcdf(
daily_xarray_dataset,
Expand Down
7 changes: 4 additions & 3 deletions tests/test_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ def test_setting_file_types(file_type_value):
"position,start",
[(0, 0), (1, 2), (2, 4), (3, 7), (4, 9)],
)
def test_augment_index_with_start_stop(position, start):
@pytest.mark.parametrize("append_offset", [0, 5, 500])
def test_augment_index_with_start_stop(position, start, append_offset):
dk = Position(position)
expected = IndexedPosition(start, dimsize=11)
actual = augment_index_with_start_stop(dk, [2, 2, 3, 2, 2])
expected = IndexedPosition(start + append_offset, dimsize=11 + append_offset)
actual = augment_index_with_start_stop(dk, [2, 2, 3, 2, 2], append_offset)
assert actual == expected
Loading
Loading