Skip to content

Commit dd7f742

Browse files
rabernatkeewis
andauthored
Zarr chunking fixes (#5065)
Co-authored-by: keewis <[email protected]>
1 parent 69950a4 commit dd7f742

File tree

7 files changed

+77
-27
lines changed

7 files changed

+77
-27
lines changed

doc/internals/duck-arrays-integration.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ argument:
2525
...
2626
2727
def _repr_inline_(self, max_width):
28-
""" format to a single line with at most max_width characters """
28+
"""format to a single line with at most max_width characters"""
2929
...
3030
3131
...

doc/user-guide/io.rst

-5
Original file line numberDiff line numberDiff line change
@@ -837,11 +837,6 @@ Xarray's Zarr backend allows xarray to leverage these capabilities, including
837837
the ability to store and analyze datasets far too large fit onto disk
838838
(particularly :ref:`in combination with dask <dask>`).
839839

840-
.. warning::
841-
842-
Zarr support is still an experimental feature. Please report any bugs or
843-
unexepected behavior via github issues.
844-
845840
Xarray can't open just any zarr dataset, because xarray requires special
846841
metadata (attributes) describing the dataset dimensions and coordinates.
847842
At this time, xarray can only open zarr datasets that have been written by

doc/whats-new.rst

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ v0.17.1 (unreleased)
2323
New Features
2424
~~~~~~~~~~~~
2525

26+
- Add ``safe_chunks`` option to :py:meth:`Dataset.to_zarr` which allows overriding
27+
checks made to ensure Dask and Zarr chunk compatibility (:issue:`5056`).
28+
By `Ryan Abernathey <https://github.com/rabernat>`_
2629
- Add :py:meth:`Dataset.query` and :py:meth:`DataArray.query` which enable indexing
2730
of datasets and data arrays by evaluating query expressions against the values of the
2831
data variables (:pull:`4984`). By `Alistair Miles <https://github.com/alimanfoo>`_.

xarray/backends/api.py

+3
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,7 @@ def to_zarr(
13651365
consolidated: bool = False,
13661366
append_dim: Hashable = None,
13671367
region: Mapping[str, slice] = None,
1368+
safe_chunks: bool = True,
13681369
):
13691370
"""This function creates an appropriate datastore for writing a dataset to
13701371
a zarr ztore
@@ -1419,6 +1420,7 @@ def to_zarr(
14191420
consolidated=consolidated,
14201421
region=region,
14211422
encoding=encoding,
1423+
# do we need to pass safe_chunks through here?
14221424
)
14231425

14241426
zstore = backends.ZarrStore.open_group(
@@ -1430,6 +1432,7 @@ def to_zarr(
14301432
chunk_store=chunk_store,
14311433
append_dim=append_dim,
14321434
write_region=region,
1435+
safe_chunks=safe_chunks,
14331436
)
14341437
writer = ArrayWriter()
14351438
# TODO: figure out how to properly handle unlimited_dims

xarray/backends/zarr.py

+35-15
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def __getitem__(self, key):
8383
# could possibly have a work-around for 0d data here
8484

8585

86-
def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name):
86+
def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):
8787
"""
8888
Given encoding chunks (possibly None) and variable chunks (possibly None)
8989
"""
@@ -133,7 +133,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name):
133133

134134
if len(enc_chunks_tuple) != ndim:
135135
# throw away encoding chunks, start over
136-
return _determine_zarr_chunks(None, var_chunks, ndim, name)
136+
return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks)
137137

138138
for x in enc_chunks_tuple:
139139
if not isinstance(x, int):
@@ -164,24 +164,32 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name):
164164
continue
165165
for dchunk in dchunks[:-1]:
166166
if dchunk % zchunk:
167-
raise NotImplementedError(
167+
base_error = (
168168
f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for "
169169
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. "
170-
"This is not implemented in xarray yet. "
171-
"Consider either rechunking using `chunk()` or instead deleting "
172-
"or modifying `encoding['chunks']`."
170+
f"Writing this array in parallel with dask could lead to corrupted data."
173171
)
172+
if safe_chunks:
173+
raise NotImplementedError(
174+
base_error
175+
+ " Consider either rechunking using `chunk()`, deleting "
176+
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
177+
)
174178
if dchunks[-1] > zchunk:
175-
raise ValueError(
179+
base_error = (
176180
"Final chunk of Zarr array must be the same size or "
177181
"smaller than the first. "
178182
f"Specified Zarr chunk encoding['chunks']={enc_chunks_tuple}, "
179183
f"for variable named {name!r} "
180-
f"but {dchunks} in the variable's Dask chunks {var_chunks} is "
184+
f"but {dchunks} in the variable's Dask chunks {var_chunks} are "
181185
"incompatible with this encoding. "
182-
"Consider either rechunking using `chunk()` or instead deleting "
183-
"or modifying `encoding['chunks']`."
184186
)
187+
if safe_chunks:
188+
raise NotImplementedError(
189+
base_error
190+
+ " Consider either rechunking using `chunk()`, deleting "
191+
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
192+
)
185193
return enc_chunks_tuple
186194

187195
raise AssertionError("We should never get here. Function logic must be wrong.")
@@ -203,7 +211,9 @@ def _get_zarr_dims_and_attrs(zarr_obj, dimension_key):
203211
return dimensions, attributes
204212

205213

206-
def extract_zarr_variable_encoding(variable, raise_on_invalid=False, name=None):
214+
def extract_zarr_variable_encoding(
215+
variable, raise_on_invalid=False, name=None, safe_chunks=True
216+
):
207217
"""
208218
Extract zarr encoding dictionary from xarray Variable
209219
@@ -233,7 +243,7 @@ def extract_zarr_variable_encoding(variable, raise_on_invalid=False, name=None):
233243
del encoding[k]
234244

235245
chunks = _determine_zarr_chunks(
236-
encoding.get("chunks"), variable.chunks, variable.ndim, name
246+
encoding.get("chunks"), variable.chunks, variable.ndim, name, safe_chunks
237247
)
238248
encoding["chunks"] = chunks
239249
return encoding
@@ -285,6 +295,7 @@ class ZarrStore(AbstractWritableDataStore):
285295
"_read_only",
286296
"_synchronizer",
287297
"_write_region",
298+
"_safe_chunks",
288299
)
289300

290301
@classmethod
@@ -300,6 +311,7 @@ def open_group(
300311
storage_options=None,
301312
append_dim=None,
302313
write_region=None,
314+
safe_chunks=True,
303315
):
304316

305317
# zarr doesn't support pathlib.Path objects yet. zarr-python#601
@@ -323,10 +335,17 @@ def open_group(
323335
zarr_group = zarr.open_consolidated(store, **open_kwargs)
324336
else:
325337
zarr_group = zarr.open_group(store, **open_kwargs)
326-
return cls(zarr_group, consolidate_on_close, append_dim, write_region)
338+
return cls(
339+
zarr_group, consolidate_on_close, append_dim, write_region, safe_chunks
340+
)
327341

328342
def __init__(
329-
self, zarr_group, consolidate_on_close=False, append_dim=None, write_region=None
343+
self,
344+
zarr_group,
345+
consolidate_on_close=False,
346+
append_dim=None,
347+
write_region=None,
348+
safe_chunks=True,
330349
):
331350
self.ds = zarr_group
332351
self._read_only = self.ds.read_only
@@ -335,6 +354,7 @@ def __init__(
335354
self._consolidate_on_close = consolidate_on_close
336355
self._append_dim = append_dim
337356
self._write_region = write_region
357+
self._safe_chunks = safe_chunks
338358

339359
def open_store_variable(self, name, zarr_array):
340360
data = indexing.LazilyIndexedArray(ZarrArrayWrapper(name, self))
@@ -497,7 +517,7 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
497517
else:
498518
# new variable
499519
encoding = extract_zarr_variable_encoding(
500-
v, raise_on_invalid=check, name=vn
520+
v, raise_on_invalid=check, name=vn, safe_chunks=self._safe_chunks
501521
)
502522
encoded_attrs = {}
503523
# the magic for storing the hidden dimension data

xarray/core/dataset.py

+21-3
Original file line numberDiff line numberDiff line change
@@ -1776,12 +1776,22 @@ def to_zarr(
17761776
consolidated: bool = False,
17771777
append_dim: Hashable = None,
17781778
region: Mapping[str, slice] = None,
1779+
safe_chunks: bool = True,
17791780
) -> "ZarrStore":
17801781
"""Write dataset contents to a zarr group.
17811782
1782-
.. note:: Experimental
1783-
The Zarr backend is new and experimental. Please report any
1784-
unexpected behavior via github issues.
1783+
Zarr chunks are determined in the following way:
1784+
1785+
- From the ``chunks`` attribute in each variable's ``encoding``
1786+
- If the variable is a Dask array, from the dask chunks
1787+
- If neither Dask chunks nor encoding chunks are present, chunks will
1788+
be determined automatically by Zarr
1789+
- If both Dask chunks and encoding chunks are present, encoding chunks
1790+
will be used, provided that there is a many-to-one relationship between
1791+
encoding chunks and dask chunks (i.e. Dask chunks are bigger than and
1792+
evenly divide encoding chunks); otherwise raise a ``ValueError``.
1793+
This restriction ensures that no synchronization / locks are required
1794+
when writing. To disable this restriction, use ``safe_chunks=False``.
17851795
17861796
Parameters
17871797
----------
@@ -1833,6 +1843,13 @@ def to_zarr(
18331843
in with ``region``, use a separate call to ``to_zarr()`` with
18341844
``compute=False``. See "Appending to existing Zarr stores" in
18351845
the reference documentation for full details.
1846+
safe_chunks : bool, optional
1847+
If True, only allow writes to when there is a many-to-one relationship
1848+
between Zarr chunks (specified in encoding) and Dask chunks.
1849+
Set False to override this restriction; however, data may become corrupted
1850+
if Zarr arrays are written in parallel. This option may be useful in combination
1851+
with ``compute=False`` to initialize a Zarr from an existing
1852+
Dataset with aribtrary chunk structure.
18361853
18371854
References
18381855
----------
@@ -1869,6 +1886,7 @@ def to_zarr(
18691886
consolidated=consolidated,
18701887
append_dim=append_dim,
18711888
region=region,
1889+
safe_chunks=safe_chunks,
18721890
)
18731891

18741892
def __repr__(self) -> str:

xarray/tests/test_backends.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -1871,14 +1871,21 @@ def test_chunk_encoding_with_dask(self):
18711871
with self.roundtrip(badenc) as actual:
18721872
pass
18731873

1874+
# unless...
1875+
with self.roundtrip(badenc, save_kwargs={"safe_chunks": False}) as actual:
1876+
# don't actually check equality because the data could be corrupted
1877+
pass
1878+
18741879
badenc.var1.encoding["chunks"] = (2,)
1875-
with pytest.raises(ValueError, match=r"Specified Zarr chunk encoding"):
1880+
with pytest.raises(NotImplementedError, match=r"Specified Zarr chunk encoding"):
18761881
with self.roundtrip(badenc) as actual:
18771882
pass
18781883

18791884
badenc = badenc.chunk({"x": (3, 3, 6)})
18801885
badenc.var1.encoding["chunks"] = (3,)
1881-
with pytest.raises(ValueError, match=r"incompatible with this encoding"):
1886+
with pytest.raises(
1887+
NotImplementedError, match=r"incompatible with this encoding"
1888+
):
18821889
with self.roundtrip(badenc) as actual:
18831890
pass
18841891

@@ -1901,9 +1908,13 @@ def test_chunk_encoding_with_dask(self):
19011908
# TODO: remove this failure once syncronized overlapping writes are
19021909
# supported by xarray
19031910
ds_chunk4["var1"].encoding.update({"chunks": 5})
1904-
with pytest.raises(NotImplementedError):
1911+
with pytest.raises(NotImplementedError, match=r"named 'var1' would overlap"):
19051912
with self.roundtrip(ds_chunk4) as actual:
19061913
pass
1914+
# override option
1915+
with self.roundtrip(ds_chunk4, save_kwargs={"safe_chunks": False}) as actual:
1916+
# don't actually check equality because the data could be corrupted
1917+
pass
19071918

19081919
def test_hidden_zarr_keys(self):
19091920
expected = create_test_data()

0 commit comments

Comments
 (0)