Skip to content

Commit

Permalink
progress on debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
norlandrhagen committed Feb 19, 2025
1 parent 1768990 commit 3f788bf
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docs/composition/styles.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ the recipe pipeline will contain at a minimum the following transforms applied t
- {class}`pangeo_forge_recipes.transforms.OpenWithXarray`: load each pattern file into an [`xarray.Dataset`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html).
- {class}`pangeo_forge_recipes.transforms.StoreToZarr`: generate a Zarr store by combining the datasets.
- {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordinates`: consolidate the Dimension Coordinates for dataset read performance.
- {class}`pangeo_forge_recipes.transforms.ConsolidateMetadata`: calls Zarr's convinience function to consolidate metadata.
- {class}`pangeo_forge_recipes.transforms.ConsolidateMetadata`: calls Zarr's convinience function to consolidate metadata. Note. This is not supported in Zarr V3.

### Open existing Zarr Store

Expand Down
4 changes: 3 additions & 1 deletion pangeo_forge_recipes/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,14 @@ def schema_to_zarr(
schema["coords"] = {k: v for k, v in schema["coords"].items() if k == append_dim}
ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs)
# using mode="w" makes this function idempotent when not appending

ds.to_zarr(
target_store,
append_dim=append_dim,
mode=("a" if append_dim else "w"),
compute=False,
consolidated=consolidated_metadata,
zarr_format=3, # TODO: We force Zarr format 3 here, we should address
consolidated=False,
encoding=encoding,
)
return target_store
6 changes: 4 additions & 2 deletions pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ def get_fsspec_remote_protocol(self):

def get_mapper(self) -> fsspec.mapping.FSMap:
"""Get a mutable mapping object suitable for storing Zarr data."""
return FsspecStore(path=self.root_path, fs=self.fs)
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

fs = AsyncFileSystemWrapper(self.fs)
return FsspecStore(path=self.root_path, fs=fs)

def _full_path(self, path: str):
return os.path.join(self.root_path, path)
Expand Down Expand Up @@ -192,7 +195,6 @@ def cache_file(
# check and see if the file already exists in the cache
logger.info(f"Caching file '{fname}'")
input_opener = _get_opener(fname, secrets, fsspec_sync_patch, **open_kwargs)

if self.exists(fname):
cached_size = self.size(fname)
with input_opener as of:
Expand Down
5 changes: 4 additions & 1 deletion pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,10 @@ def __post_init__(self):
dim = [d for d in self.combine_dims if d.name == self.append_dim]
assert dim, f"Append dim not in {self.combine_dims=}."
assert dim[0].operation == CombineOp.CONCAT, "Append dim operation must be CONCAT."
existing_ds = xr.open_dataset(self.get_full_target().get_mapper(), engine="zarr")
# TODO: address with Zarr format v2
existing_ds = xr.open_dataset(
self.get_full_target().get_mapper(), engine="zarr", consolidated=False
)
assert self.append_dim in existing_ds, "Append dim must be in existing dataset."
self._append_offset = len(existing_ds[self.append_dim])

Expand Down
22 changes: 18 additions & 4 deletions pangeo_forge_recipes/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ def _region_for(var: xr.Variable, index: Index) -> Tuple[slice, ...]:
return tuple(region_slice)


def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None:
def _store_data(
vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group, zarr_attrs: dict
) -> None:
zarr_array = zgroup[vname]
# get encoding for variable from zarr attributes
var_coded = var.copy() # copy needed for test suit to avoid modifying inputs in-place
var_coded.encoding.update(zarr_array.attrs)
var_coded.encoding.update(zarr_attrs)
var_coded.attrs = {}
var = xr.backends.zarr.encode_zarr_variable(var_coded)
data = np.asarray(var.data)
Expand Down Expand Up @@ -78,6 +80,7 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping:

import zarr

# TODO: raise error if Zarr format 3 b/c zarr v3 does not support consolidated metadata
if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem):
raise ValueError(
"""Creating consolidated metadata for Kerchunk references should not
Expand All @@ -100,6 +103,7 @@ def store_dataset_fragment(

index, ds = item
zgroup = zarr.open_group(target_store)
xr_store = xr.backends.zarr.ZarrStore.open_group(target_store)
# TODO: check that the dataset and the index are compatible

# only store coords if this is the first item in a merge dim
Expand All @@ -108,9 +112,19 @@ def store_dataset_fragment(
# if this variable contains a concat dim, we always store it
possible_concat_dims = [index.find_concat_dim(dim) for dim in da.dims]
if any(possible_concat_dims) or _is_first_item(index):
_store_data(vname, da.variable, index, zgroup)
store_var = xr_store.open_store_variable(vname)
_store_data(
vname,
da.variable,
index,
zgroup,
zarr_attrs=store_var.encoding | store_var.attrs,
)
for vname, da in ds.data_vars.items():
_store_data(vname, da.variable, index, zgroup)
store_var = xr_store.open_store_variable(vname)
_store_data(
vname, da.variable, index, zgroup, zarr_attrs=store_var.encoding | store_var.attrs
)

return target_store

Expand Down
11 changes: 6 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import xarray as xr
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

# need to import this way (rather than use pytest.lazy_fixture) to make it work with dask
from pytest_lazyfixture import lazy_fixture
Expand Down Expand Up @@ -523,17 +524,17 @@ def netcdf_local_file_pattern_sequential_with_coordinateless_dimension(

@pytest.fixture()
def tmp_target(tmpdir_factory):
fs = fsspec.get_filesystem_class("file")()
fs = fsspec.filesystem("file", auto_mkdir=True)
async_fs = AsyncFileSystemWrapper(fs)
path = str(tmpdir_factory.mktemp("target"))
return FSSpecTarget(fs, path)
return FSSpecTarget(async_fs, path)


@pytest.fixture()
def tmp_cache(tmpdir_factory):
fs = fsspec.filesystem("file", auto_mkdir=True)
path = str(tmpdir_factory.mktemp("cache"))
fs = fsspec.get_filesystem_class("file")()
cache = CacheFSSpecTarget(fs, path)
return cache
return CacheFSSpecTarget(fs, path)


@pytest.fixture()
Expand Down
6 changes: 4 additions & 2 deletions tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_xarray_zarr_append(
)

# make sure the initial zarr store looks good
initial_actual = xr.open_dataset(store_path, engine="zarr")
initial_actual = xr.open_dataset(store_path, consolidated=False, engine="zarr")
assert len(initial_actual.time) == 10
xr.testing.assert_equal(initial_actual.load(), ds0_fixture)

Expand All @@ -127,7 +127,8 @@ def test_xarray_zarr_append(
)

# now see if we have appended to time dimension as intended
append_actual = xr.open_dataset(store_path, engine="zarr")
append_actual = xr.open_dataset(store_path, consolidated=False, engine="zarr")

assert len(append_actual.time) == 20
append_expected = xr.concat([ds0_fixture, ds1_fixture], dim="time")
xr.testing.assert_equal(append_actual.load(), append_expected)
Expand Down Expand Up @@ -261,6 +262,7 @@ def test_reference_grib(
# xr.testing.assert_equal(ds.load(), ds2)


@pytest.mark.skip(reason="Fails in Zarr v3. Should be revisited. Depends on consolidate metadata.")
def test_xarray_zarr_consolidate_dimension_coordinates(
netcdf_local_file_pattern_sequential,
pipeline,
Expand Down
3 changes: 2 additions & 1 deletion tests/test_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def test_store_dataset_fragment(temp_store):
assert ds.time.encoding.get("units") == ds_target.time.encoding.get("units")


@pytest.mark.skip(reason="consolidated metadata is not supported in Zarr V3. ")
def test_zarr_consolidate_metadata(
netcdf_local_file_pattern,
pipeline,
Expand Down Expand Up @@ -200,7 +201,7 @@ def test_zarr_encoding(
combine_dims=pattern.combine_dim_keys,
encoding={"foo": {"compressor": compressor}},
)
| ConsolidateMetadata()
# | ConsolidateMetadata()
)
fs = fsspec.filesystem("file")
zc = zarr.storage.FsspecStore(path=os.path.join(tmp_target.root_path, "store"), fs=fs)
Expand Down

0 comments on commit 3f788bf

Please sign in to comment.