From f860ce5ab28a40e1df0323a9ddffbc70187b56c8 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 13:22:30 -0700 Subject: [PATCH 01/30] test that rechunk can be passed through non registered chunked arrays --- xarray/tests/test_parallelcompat.py | 38 +++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/xarray/tests/test_parallelcompat.py b/xarray/tests/test_parallelcompat.py index dbe40be710c..3969ed725ac 100644 --- a/xarray/tests/test_parallelcompat.py +++ b/xarray/tests/test_parallelcompat.py @@ -8,6 +8,7 @@ from xarray.core.types import T_Chunks, T_DuckArray, T_NormalizedChunks from xarray.namedarray._typing import _Chunks +from xarray.namedarray.core import NamedArray from xarray.namedarray.daskmanager import DaskManager from xarray.namedarray.parallelcompat import ( ChunkManagerEntrypoint, @@ -31,13 +32,13 @@ class DummyChunkedArray(np.ndarray): def __new__( cls, - shape, + shape: tuple[int, ...], + chunks: T_NormalizedChunks, dtype=float, buffer=None, offset=0, strides=None, order=None, - chunks=None, ): obj = super().__new__(cls, shape, dtype, buffer, offset, strides, order) obj.chunks = chunks @@ -46,9 +47,9 @@ def __new__( def __array_finalize__(self, obj): if obj is None: return - self.chunks = getattr(obj, "chunks", None) + self.chunks = getattr(obj, "chunks") - def rechunk(self, chunks, **kwargs): + def rechunk(self, chunks: T_NormalizedChunks, **kwargs) -> DummyChunkedArray: copied = self.copy() copied.chunks = chunks return copied @@ -147,6 +148,27 @@ def register_dummy_chunkmanager(monkeypatch): yield +class TestPassThroughNonRegisteredChunkedArrays: + """ + Check that types which implement .chunks and .rechunk are still dispatched to for these methods, even if they are not registered via a ChunkManager. + + Basically regression tests for GH issue #8733. + + Notice we specifically do not use the register_dummy_chunkmanager fixture in these tests. + """ + + def test_chunks(self) -> None: + dummy_arr = DummyChunkedArray(shape=(9,), chunks=((3,),)) + na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]) + assert na.chunks == ((3,),) + + def test_rechunk(self) -> None: + dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) + na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]).chunk(chunks=((2,),)) + assert isinstance(na.data, DummyChunkedArray) + assert na.chunks == ((2,),) + + class TestGetChunkManager: def test_get_chunkmanger(self, register_dummy_chunkmanager) -> None: chunkmanager = guess_chunkmanager("dummy") @@ -176,13 +198,13 @@ def test_choose_dask_over_other_chunkmanagers( class TestGetChunkedArrayType: def test_detect_chunked_arrays(self, register_dummy_chunkmanager) -> None: - dummy_arr = DummyChunkedArray([1, 2, 3]) + dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) chunk_manager = get_chunked_array_type(dummy_arr) assert isinstance(chunk_manager, DummyChunkManager) def test_ignore_inmemory_arrays(self, register_dummy_chunkmanager) -> None: - dummy_arr = DummyChunkedArray([1, 2, 3]) + dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) chunk_manager = get_chunked_array_type(*[dummy_arr, 1.0, np.array([5, 6])]) assert isinstance(chunk_manager, DummyChunkManager) @@ -195,7 +217,7 @@ def test_raise_if_no_arrays_chunked(self, register_dummy_chunkmanager) -> None: get_chunked_array_type(*[1.0, np.array([5, 6])]) def test_raise_if_no_matching_chunkmanagers(self) -> None: - dummy_arr = DummyChunkedArray([1, 2, 3]) + dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) with pytest.raises( TypeError, match="Could not find a Chunk Manager which recognises" @@ -215,7 +237,7 @@ def test_detect_dask_if_installed(self) -> None: def test_raise_on_mixed_array_types(self, register_dummy_chunkmanager) -> None: import dask.array as da - dummy_arr = DummyChunkedArray([1, 2, 3]) + dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) dask_arr = da.from_array([1, 2, 3], chunks=(1,)) with pytest.raises(TypeError, match="received multiple types"): From 77947c35ea7f9a3a310c86abcf39b47a2392a888 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 13:31:02 -0700 Subject: [PATCH 02/30] remove chunks and rechunk from chunkmanager --- xarray/namedarray/core.py | 10 +++--- xarray/namedarray/parallelcompat.py | 55 ----------------------------- 2 files changed, 6 insertions(+), 59 deletions(-) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index fe47bf50533..3533491ceda 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -819,11 +819,12 @@ def chunk( if dim in chunks } - chunkmanager = guess_chunkmanager(chunked_array_type) - data_old = self._data - if chunkmanager.is_chunked_array(data_old): - data_chunked = chunkmanager.rechunk(data_old, chunks) # type: ignore[arg-type] + if hasattr(data_old, "chunks"): + # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. + # Deliberately don't go through the chunkmanager so as to support chunked array types that don't need all the special computation methods. + # See GH issue #8733 + data_chunked = data_old.rechunk(chunks) else: if not isinstance(data_old, ExplicitlyIndexed): ndata = data_old @@ -841,6 +842,7 @@ def chunk( if is_dict_like(chunks): chunks = tuple(chunks.get(n, s) for n, s in enumerate(ndata.shape)) # type: ignore[assignment] + chunkmanager = guess_chunkmanager(chunked_array_type) data_chunked = chunkmanager.from_array(ndata, chunks, **from_array_kwargs) # type: ignore[arg-type] return self._replace(data=data_chunked) diff --git a/xarray/namedarray/parallelcompat.py b/xarray/namedarray/parallelcompat.py index dd555fe200a..c84813e2001 100644 --- a/xarray/namedarray/parallelcompat.py +++ b/xarray/namedarray/parallelcompat.py @@ -218,30 +218,6 @@ def is_chunked_array(self, data: duckarray[Any, Any]) -> bool: """ return isinstance(data, self.array_cls) - @abstractmethod - def chunks(self, data: T_ChunkedArray) -> _NormalizedChunks: - """ - Return the current chunks of the given array. - - Returns chunks explicitly as a tuple of tuple of ints. - - Used internally by xarray objects' .chunks and .chunksizes properties. - - Parameters - ---------- - data : chunked array - - Returns - ------- - chunks : tuple[tuple[int, ...], ...] - - See Also - -------- - dask.array.Array.chunks - cubed.Array.chunks - """ - raise NotImplementedError() - @abstractmethod def normalize_chunks( self, @@ -305,37 +281,6 @@ def from_array( """ raise NotImplementedError() - def rechunk( - self, - data: T_ChunkedArray, - chunks: _NormalizedChunks | tuple[int, ...] | _Chunks, - **kwargs: Any, - ) -> Any: - """ - Changes the chunking pattern of the given array. - - Called when the .chunk method is called on an xarray object that is already chunked. - - Parameters - ---------- - data : dask array - Array to be rechunked. - chunks : int, tuple, dict or str, optional - The new block dimensions to create. -1 indicates the full size of the - corresponding dimension. Default is "auto" which automatically - determines chunk sizes. - - Returns - ------- - chunked array - - See Also - -------- - dask.array.Array.rechunk - cubed.Array.rechunk - """ - return data.rechunk(chunks, **kwargs) - @abstractmethod def compute( self, *data: T_ChunkedArray | Any, **kwargs: Any From da6799afc54eb60fbc5fce7c750dd5d384066d36 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 14:17:38 -0700 Subject: [PATCH 03/30] remove now-redundant chunks method from dask chunkmanager --- xarray/namedarray/daskmanager.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index 963d12fd865..df8681ec290 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -41,9 +41,6 @@ def __init__(self) -> None: def is_chunked_array(self, data: duckarray[Any, Any]) -> bool: return is_duck_dask_array(data) - def chunks(self, data: Any) -> _NormalizedChunks: - return data.chunks # type: ignore[no-any-return] - def normalize_chunks( self, chunks: T_Chunks | _NormalizedChunks, From 34dadae8a1fa6269c3a1a9094c3ce960f376d48e Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 14:25:22 -0700 Subject: [PATCH 04/30] use is_chunked_array instead of hasattr(chunks) --- xarray/namedarray/core.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index 3533491ceda..8c116a6552e 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -41,7 +41,7 @@ _SupportsReal, ) from xarray.namedarray.parallelcompat import guess_chunkmanager -from xarray.namedarray.pycompat import to_numpy +from xarray.namedarray.pycompat import to_numpy, is_chunked_array from xarray.namedarray.utils import ( either_dict_or_kwargs, infix_dims, @@ -820,7 +820,7 @@ def chunk( } data_old = self._data - if hasattr(data_old, "chunks"): + if is_chunked_array(data_old): # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. # Deliberately don't go through the chunkmanager so as to support chunked array types that don't need all the special computation methods. # See GH issue #8733 @@ -849,7 +849,6 @@ def chunk( def to_numpy(self) -> np.ndarray[Any, Any]: """Coerces wrapped data to numpy and returns a numpy.ndarray""" - # TODO an entrypoint so array libraries can choose coercion method? return to_numpy(self._data) def as_numpy(self) -> Self: From 6e26a2df0634f5e1e39c5ada0aafc2e5e69dde26 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 14:25:54 -0700 Subject: [PATCH 05/30] add has_chunkmanager function --- xarray/namedarray/pycompat.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/xarray/namedarray/pycompat.py b/xarray/namedarray/pycompat.py index 3ce33d4d8ea..95cff3e9a89 100644 --- a/xarray/namedarray/pycompat.py +++ b/xarray/namedarray/pycompat.py @@ -8,6 +8,7 @@ from packaging.version import Version from xarray.core.utils import is_scalar +from xarray.namedarray._typing import _chunkedarray from xarray.namedarray.utils import is_duck_array, is_duck_dask_array integer_types = (int, np.integer) @@ -89,7 +90,21 @@ def mod_version(mod: ModType) -> Version: def is_chunked_array(x: duckarray[Any, Any]) -> bool: - return is_duck_dask_array(x) or (is_duck_array(x) and hasattr(x, "chunks")) + return is_duck_dask_array(x) or isinstance(x, _chunkedarray) + + +def has_chunkmanager(x: _chunkedarray) -> bool: + from xarray.namedarray.parallelcompat import get_chunked_array_type + + try: + get_chunked_array_type(x) + except TypeError as e: + if str(e).startswith("Could not find a Chunk Manager which recognises type"): + return False + else: + raise # something else went wrong + else: + return True def is_0d_dask_array(x: duckarray[Any, Any]) -> bool: @@ -106,7 +121,7 @@ def to_numpy( data = data.get_duck_array() # type: ignore[no-untyped-call] # TODO first attempt to call .to_numpy() once some libraries implement it - if is_chunked_array(data): + if is_chunked_array(data) and has_chunkmanager(data): chunkmanager = get_chunked_array_type(data) data, *_ = chunkmanager.compute(data, **kwargs) if isinstance(data, array_type("cupy")): @@ -125,7 +140,7 @@ def to_duck_array(data: Any, **kwargs: dict[str, Any]) -> duckarray[_ShapeType, from xarray.core.indexing import ExplicitlyIndexed from xarray.namedarray.parallelcompat import get_chunked_array_type - if is_chunked_array(data): + if is_chunked_array(data) and has_chunkmanager(data): chunkmanager = get_chunked_array_type(data) loaded_data, *_ = chunkmanager.compute(data, **kwargs) # type: ignore[var-annotated] return loaded_data From 78e8ff455177dfbe99271bd2747603cfd954fdfa Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 14:26:45 -0700 Subject: [PATCH 06/30] use has_chunkmanager --- xarray/coding/strings.py | 6 +++--- xarray/coding/times.py | 11 +++++++---- xarray/coding/variables.py | 4 ++-- xarray/core/common.py | 8 ++++++-- xarray/core/computation.py | 4 ++-- xarray/core/dataset.py | 6 ++++-- xarray/core/duck_array_ops.py | 6 +++--- xarray/core/indexing.py | 9 +++++++-- xarray/core/missing.py | 4 ++-- xarray/core/utils.py | 6 ++++-- xarray/namedarray/core.py | 2 +- 11 files changed, 41 insertions(+), 25 deletions(-) diff --git a/xarray/coding/strings.py b/xarray/coding/strings.py index d16ec52d645..740346c540b 100644 --- a/xarray/coding/strings.py +++ b/xarray/coding/strings.py @@ -18,7 +18,7 @@ from xarray.core.utils import module_available from xarray.core.variable import Variable from xarray.namedarray.parallelcompat import get_chunked_array_type -from xarray.namedarray.pycompat import is_chunked_array +from xarray.namedarray.pycompat import has_chunkmanager, is_chunked_array HAS_NUMPY_2_0 = module_available("numpy", minversion="2.0.0.dev0") @@ -144,7 +144,7 @@ def bytes_to_char(arr): if arr.dtype.kind != "S": raise ValueError("argument must have a fixed-width bytes dtype") - if is_chunked_array(arr): + if has_chunkmanager(arr): chunkmanager = get_chunked_array_type(arr) return chunkmanager.map_blocks( @@ -183,7 +183,7 @@ def char_to_bytes(arr): # can't make an S0 dtype return np.zeros(arr.shape[:-1], dtype=np.bytes_) - if is_chunked_array(arr): + if is_chunked_array(arr) and has_chunkmanager(arr): chunkmanager = get_chunked_array_type(arr) if len(arr.chunks[-1]) > 1: diff --git a/xarray/coding/times.py b/xarray/coding/times.py index badb9259b06..44b948c4e10 100644 --- a/xarray/coding/times.py +++ b/xarray/coding/times.py @@ -27,8 +27,11 @@ from xarray.core.pdcompat import nanosecond_precision_timestamp from xarray.core.utils import emit_user_level_warning from xarray.core.variable import Variable -from xarray.namedarray.parallelcompat import T_ChunkedArray, get_chunked_array_type -from xarray.namedarray.pycompat import is_chunked_array +from xarray.namedarray.parallelcompat import ( + T_ChunkedArray, + get_chunked_array_type, +) +from xarray.namedarray.pycompat import has_chunkmanager, is_chunked_array from xarray.namedarray.utils import is_duck_dask_array try: @@ -719,7 +722,7 @@ def encode_cf_datetime( cftime.date2num """ dates = asarray(dates) - if is_chunked_array(dates): + if is_chunked_array(dates) and has_chunkmanager(dates): return _lazily_encode_cf_datetime(dates, units, calendar, dtype) else: return _eagerly_encode_cf_datetime(dates, units, calendar, dtype) @@ -864,7 +867,7 @@ def encode_cf_timedelta( dtype: np.dtype | None = None, ) -> tuple[T_DuckArray, str]: timedeltas = asarray(timedeltas) - if is_chunked_array(timedeltas): + if is_chunked_array(timedeltas) and has_chunkmanager(timedeltas): return _lazily_encode_cf_timedelta(timedeltas, units, dtype) else: return _eagerly_encode_cf_timedelta(timedeltas, units, dtype) diff --git a/xarray/coding/variables.py b/xarray/coding/variables.py index 8a3afe650f2..4dbe3b48834 100644 --- a/xarray/coding/variables.py +++ b/xarray/coding/variables.py @@ -13,7 +13,7 @@ from xarray.core import dtypes, duck_array_ops, indexing from xarray.core.variable import Variable from xarray.namedarray.parallelcompat import get_chunked_array_type -from xarray.namedarray.pycompat import is_chunked_array +from xarray.namedarray.pycompat import has_chunkmanager, is_chunked_array if TYPE_CHECKING: T_VarTuple = tuple[tuple[Hashable, ...], Any, dict, dict] @@ -176,7 +176,7 @@ def lazy_elemwise_func(array, func: Callable, dtype: np.typing.DTypeLike): ------- Either a dask.array.Array or _ElementwiseFunctionArray. """ - if is_chunked_array(array): + if is_chunked_array(array) and has_chunkmanager(array): chunkmanager = get_chunked_array_type(array) return chunkmanager.map_blocks(func, array, dtype=dtype) # type: ignore[arg-type] diff --git a/xarray/core/common.py b/xarray/core/common.py index 52a00911d19..12d1f9eb0d8 100644 --- a/xarray/core/common.py +++ b/xarray/core/common.py @@ -19,8 +19,11 @@ is_scalar, ) from xarray.namedarray.core import _raise_if_any_duplicate_dimensions -from xarray.namedarray.parallelcompat import get_chunked_array_type, guess_chunkmanager -from xarray.namedarray.pycompat import is_chunked_array +from xarray.namedarray.parallelcompat import ( + get_chunked_array_type, + guess_chunkmanager, +) +from xarray.namedarray.pycompat import has_chunkmanager, is_chunked_array try: import cftime @@ -1717,6 +1720,7 @@ def _full_like_variable( if ( is_chunked_array(other.data) + and has_chunkmanager(other.data) or chunked_array_type is not None or chunks is not None ): diff --git a/xarray/core/computation.py b/xarray/core/computation.py index 5d21d0836b9..5de242b4e7c 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -26,7 +26,7 @@ from xarray.core.utils import is_dict_like, is_duck_dask_array, is_scalar, parse_dims from xarray.core.variable import Variable from xarray.namedarray.parallelcompat import get_chunked_array_type -from xarray.namedarray.pycompat import is_chunked_array +from xarray.namedarray.pycompat import has_chunkmanager, is_chunked_array from xarray.util.deprecation_helpers import deprecate_dims if TYPE_CHECKING: @@ -2169,7 +2169,7 @@ def _calc_idxminmax( indx = func(array, dim=dim, axis=None, keep_attrs=keep_attrs, skipna=skipna) # Handle chunked arrays (e.g. dask). - if is_chunked_array(array.data): + if is_chunked_array(array.data) and has_chunkmanager(array.data): chunkmanager = get_chunked_array_type(array.data) chunks = dict(zip(array.dims, array.chunks)) dask_coord = chunkmanager.from_array(array[dim].data, chunks=chunks[dim]) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 3d3a051c070..86378f5cc71 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -125,7 +125,7 @@ calculate_dimensions, ) from xarray.namedarray.parallelcompat import get_chunked_array_type, guess_chunkmanager -from xarray.namedarray.pycompat import array_type, is_chunked_array +from xarray.namedarray.pycompat import array_type, has_chunkmanager, is_chunked_array from xarray.plot.accessor import DatasetPlotAccessor from xarray.util.deprecation_helpers import _deprecate_positional_args, deprecate_dims @@ -856,7 +856,9 @@ def load(self, **kwargs) -> Self: """ # access .data to coerce everything to numpy or dask arrays lazy_data = { - k: v._data for k, v in self.variables.items() if is_chunked_array(v._data) + k: v._data + for k, v in self.variables.items() + if is_chunked_array(v._data) and has_chunkmanager(v._data) } if lazy_data: chunkmanager = get_chunked_array_type(*lazy_data.values()) diff --git a/xarray/core/duck_array_ops.py b/xarray/core/duck_array_ops.py index 8993c136ba6..ad31261fcd1 100644 --- a/xarray/core/duck_array_ops.py +++ b/xarray/core/duck_array_ops.py @@ -39,7 +39,7 @@ from xarray.core.utils import is_duck_array, is_duck_dask_array, module_available from xarray.namedarray import pycompat from xarray.namedarray.parallelcompat import get_chunked_array_type -from xarray.namedarray.pycompat import array_type, is_chunked_array +from xarray.namedarray.pycompat import array_type, has_chunkmanager, is_chunked_array # remove once numpy 2.0 is the oldest supported version if module_available("numpy", minversion="2.0.0.dev0"): @@ -736,7 +736,7 @@ def first(values, axis, skipna=None): dtypes.isdtype(values.dtype, "signed integer") or dtypes.is_string(values.dtype) ): # only bother for dtypes that can hold NaN - if is_chunked_array(values): + if is_chunked_array(values) and has_chunkmanager(values): return chunked_nanfirst(values, axis) else: return nputils.nanfirst(values, axis) @@ -749,7 +749,7 @@ def last(values, axis, skipna=None): dtypes.isdtype(values.dtype, "signed integer") or dtypes.is_string(values.dtype) ): # only bother for dtypes that can hold NaN - if is_chunked_array(values): + if is_chunked_array(values) and has_chunkmanager(values): return chunked_nanlast(values, axis) else: return nputils.nanlast(values, axis) diff --git a/xarray/core/indexing.py b/xarray/core/indexing.py index 19937270268..0069027ab13 100644 --- a/xarray/core/indexing.py +++ b/xarray/core/indexing.py @@ -28,7 +28,12 @@ to_0d_array, ) from xarray.namedarray.parallelcompat import get_chunked_array_type -from xarray.namedarray.pycompat import array_type, integer_types, is_chunked_array +from xarray.namedarray.pycompat import ( + array_type, + has_chunkmanager, + integer_types, + is_chunked_array, +) if TYPE_CHECKING: from numpy.typing import DTypeLike @@ -1349,7 +1354,7 @@ def _masked_result_drop_slice(key, data: duckarray[Any, Any] | None = None): new_keys = [] for k in key: if isinstance(k, np.ndarray): - if is_chunked_array(data): # type: ignore[arg-type] + if is_chunked_array(data) and has_chunkmanager: # type: ignore[arg-type] chunkmanager = get_chunked_array_type(data) new_keys.append( _chunked_array_with_chunks_hint(k, chunks_hint, chunkmanager) diff --git a/xarray/core/missing.py b/xarray/core/missing.py index bfbad72649a..9be4f8588b7 100644 --- a/xarray/core/missing.py +++ b/xarray/core/missing.py @@ -24,7 +24,7 @@ from xarray.core.utils import OrderedSet, is_scalar from xarray.core.variable import Variable, broadcast_variables from xarray.namedarray.parallelcompat import get_chunked_array_type -from xarray.namedarray.pycompat import is_chunked_array +from xarray.namedarray.pycompat import has_chunkmanager, is_chunked_array if TYPE_CHECKING: from xarray.core.dataarray import DataArray @@ -690,7 +690,7 @@ def interp_func(var, x, new_x, method: InterpOptions, kwargs): else: func, kwargs = _get_interpolator_nd(method, **kwargs) - if is_chunked_array(var): + if is_chunked_array(var) and has_chunkmanager(var): chunkmanager = get_chunked_array_type(var) ndim = var.ndim diff --git a/xarray/core/utils.py b/xarray/core/utils.py index c2859632360..8af76c4bfa6 100644 --- a/xarray/core/utils.py +++ b/xarray/core/utils.py @@ -1036,14 +1036,16 @@ def contains_only_chunked_or_numpy(obj) -> bool: Expects obj to be Dataset or DataArray""" from xarray.core.dataarray import DataArray - from xarray.namedarray.pycompat import is_chunked_array + from xarray.namedarray.pycompat import has_chunkmanager, is_chunked_array if isinstance(obj, DataArray): obj = obj._to_temp_dataset() return all( [ - isinstance(var.data, np.ndarray) or is_chunked_array(var.data) + isinstance(var.data, np.ndarray) + or is_chunked_array(var.data) + and has_chunkmanager(var.data) for var in obj.variables.values() ] ) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index 8c116a6552e..4c7f5395e0d 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -41,7 +41,7 @@ _SupportsReal, ) from xarray.namedarray.parallelcompat import guess_chunkmanager -from xarray.namedarray.pycompat import to_numpy, is_chunked_array +from xarray.namedarray.pycompat import is_chunked_array, to_numpy from xarray.namedarray.utils import ( either_dict_or_kwargs, infix_dims, From 6feede3529b57215dcdca6c83f13517e1d8a6d9e Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 14:41:22 -0700 Subject: [PATCH 07/30] fix errors in other tests --- xarray/namedarray/pycompat.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/xarray/namedarray/pycompat.py b/xarray/namedarray/pycompat.py index 95cff3e9a89..712d623e55b 100644 --- a/xarray/namedarray/pycompat.py +++ b/xarray/namedarray/pycompat.py @@ -101,6 +101,8 @@ def has_chunkmanager(x: _chunkedarray) -> bool: except TypeError as e: if str(e).startswith("Could not find a Chunk Manager which recognises type"): return False + elif str(e) == "Expected a chunked array but none were found": + return False else: raise # something else went wrong else: From 621ea0c4b8aeceaec3c03cc1e4e7aa8873c75ec5 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 14:49:22 -0700 Subject: [PATCH 08/30] improve docs --- doc/internals/chunked-arrays.rst | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/doc/internals/chunked-arrays.rst b/doc/internals/chunked-arrays.rst index ba7ce72c834..ac1aed2e7d6 100644 --- a/doc/internals/chunked-arrays.rst +++ b/doc/internals/chunked-arrays.rst @@ -7,13 +7,13 @@ Alternative chunked array types .. warning:: - This is a *highly* experimental feature. Please report any bugs or other difficulties on `xarray's issue tracker `_. + This is an experimental feature. Please report any bugs or other difficulties on `xarray's issue tracker `_. In particular see discussion on `xarray issue #6807 `_ -Xarray can wrap chunked dask arrays (see :ref:`dask`), but can also wrap any other chunked array type that exposes the correct interface. +Xarray can wrap chunked dask arrays (see :ref:`dask`), but can also wrap any other chunked array type which exposes the correct interface. This allows us to support using other frameworks for distributed and out-of-core processing, with user code still written as xarray commands. In particular xarray also supports wrapping :py:class:`cubed.Array` objects -(see `Cubed's documentation `_ and the `cubed-xarray package `_). +(see `Cubed's documentation `_ via the `cubed-xarray package `_). The basic idea is that by wrapping an array that has an explicit notion of ``.chunks``, xarray can expose control over the choice of chunking scheme to users via methods like :py:meth:`DataArray.chunk` whilst the wrapped array actually @@ -25,11 +25,12 @@ Chunked array methods and "core operations" A chunked array needs to meet all the :ref:`requirements for normal duck arrays `, but must also implement additional features. -Chunked arrays have additional attributes and methods, such as ``.chunks`` and ``.rechunk``. -Furthermore, Xarray dispatches chunk-aware computations across one or more chunked arrays using special functions known -as "core operations". Examples include ``map_blocks``, ``blockwise``, and ``apply_gufunc``. +Chunked arrays will have additional attributes and methods, such as ``.chunks`` and ``.rechunk``. +If the wrapped class only implements these additional methods then xarray will handle them in the same way it handles other duck arrays - i.e. with no further action on the user's part. + +However to support applying computations across chunks, Xarray dispatches all chunk-aware computations across one or more chunked arrays using special functions known +as "core operations". The core operations are generalizations of functions first implemented in :py:mod:`dask.array`, and examples include ``map_blocks``, ``blockwise``, and ``apply_gufunc``. -The core operations are generalizations of functions first implemented in :py:mod:`dask.array`. The implementation of these functions is specific to the type of arrays passed to them. For example, when applying the ``map_blocks`` core operation, :py:class:`dask.array.Array` objects must be processed by :py:func:`dask.array.map_blocks`, whereas :py:class:`cubed.Array` objects must be processed by :py:func:`cubed.map_blocks`. @@ -100,3 +101,9 @@ To use a parallel array type that does not expose a concept of chunks explicitly is theoretically required. Such an array type (e.g. `Ramba `_ or `Arkouda `_) could be wrapped using xarray's existing support for :ref:`numpy-like "duck" arrays `. + +Chunks without parallel processing +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Some chunked array types exist which don't support parallel processing. +These will define `.chunks` and possibly also `.rechunk`, but do not require a `ChunkManagerEntrypoint` in order for these method to be called by `DataArray.chunk`. From c305b61c3ed0cd1a3b4e0b8ba3a515dc6e63a736 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 14:52:53 -0700 Subject: [PATCH 09/30] test that computation proceeds without dask on unregistered chunked arrays --- xarray/tests/test_parallelcompat.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/xarray/tests/test_parallelcompat.py b/xarray/tests/test_parallelcompat.py index 3969ed725ac..95080d83d0d 100644 --- a/xarray/tests/test_parallelcompat.py +++ b/xarray/tests/test_parallelcompat.py @@ -168,6 +168,11 @@ def test_rechunk(self) -> None: assert isinstance(na.data, DummyChunkedArray) assert na.chunks == ((2,),) + def test_computation(self) -> None: + dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) + na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]) + na.mean() + class TestGetChunkManager: def test_get_chunkmanger(self, register_dummy_chunkmanager) -> None: From 4f82d9d6c3f57c3c9cb2cfea166d06bc56035216 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 18:09:52 -0700 Subject: [PATCH 10/30] type hinting fixes --- xarray/namedarray/core.py | 2 +- xarray/namedarray/pycompat.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index 4c7f5395e0d..39ea3fd53ad 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -824,7 +824,7 @@ def chunk( # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. # Deliberately don't go through the chunkmanager so as to support chunked array types that don't need all the special computation methods. # See GH issue #8733 - data_chunked = data_old.rechunk(chunks) + data_chunked = data_old.rechunk(chunks) # type: ignore[union-attr] else: if not isinstance(data_old, ExplicitlyIndexed): ndata = data_old diff --git a/xarray/namedarray/pycompat.py b/xarray/namedarray/pycompat.py index 712d623e55b..d444908c75f 100644 --- a/xarray/namedarray/pycompat.py +++ b/xarray/namedarray/pycompat.py @@ -93,7 +93,7 @@ def is_chunked_array(x: duckarray[Any, Any]) -> bool: return is_duck_dask_array(x) or isinstance(x, _chunkedarray) -def has_chunkmanager(x: _chunkedarray) -> bool: +def has_chunkmanager(x: duckarray[Any, Any]) -> bool: from xarray.namedarray.parallelcompat import get_chunked_array_type try: From a9bd35dff63afb655cb4bfdf5580535d1e71ccb3 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 26 Jul 2024 18:13:47 -0700 Subject: [PATCH 11/30] fix other issues revealed by typing --- xarray/core/indexing.py | 2 +- xarray/tests/test_parallelcompat.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/xarray/core/indexing.py b/xarray/core/indexing.py index 0069027ab13..80aaf426fb0 100644 --- a/xarray/core/indexing.py +++ b/xarray/core/indexing.py @@ -1354,7 +1354,7 @@ def _masked_result_drop_slice(key, data: duckarray[Any, Any] | None = None): new_keys = [] for k in key: if isinstance(k, np.ndarray): - if is_chunked_array(data) and has_chunkmanager: # type: ignore[arg-type] + if is_chunked_array(data) and has_chunkmanager(data): # type: ignore[arg-type] chunkmanager = get_chunked_array_type(data) new_keys.append( _chunked_array_with_chunks_hint(k, chunks_hint, chunkmanager) diff --git a/xarray/tests/test_parallelcompat.py b/xarray/tests/test_parallelcompat.py index 95080d83d0d..923b28d1e31 100644 --- a/xarray/tests/test_parallelcompat.py +++ b/xarray/tests/test_parallelcompat.py @@ -164,7 +164,9 @@ def test_chunks(self) -> None: def test_rechunk(self) -> None: dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) - na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]).chunk(chunks=((2,),)) + na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]).chunk( + chunks={"x": (2,)} + ) assert isinstance(na.data, DummyChunkedArray) assert na.chunks == ((2,),) From a24489be4c842804e8b6127d0a0d396a5c25a97d Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 27 Jul 2024 12:28:39 -0700 Subject: [PATCH 12/30] ensure tests check that chunks are properly normalized --- xarray/tests/test_parallelcompat.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/xarray/tests/test_parallelcompat.py b/xarray/tests/test_parallelcompat.py index 923b28d1e31..5c3dddc12ea 100644 --- a/xarray/tests/test_parallelcompat.py +++ b/xarray/tests/test_parallelcompat.py @@ -20,6 +20,7 @@ from xarray.tests import has_dask, requires_dask +# TODO can I subclass the chunkedduckarray protocol here? class DummyChunkedArray(np.ndarray): """ Mock-up of a chunked array class. @@ -28,7 +29,7 @@ class DummyChunkedArray(np.ndarray): https://numpy.org/doc/stable/user/basics.subclassing.html#simple-example-adding-an-extra-attribute-to-ndarray """ - chunks: T_NormalizedChunks + _chunks: T_NormalizedChunks def __new__( cls, @@ -49,6 +50,20 @@ def __array_finalize__(self, obj): return self.chunks = getattr(obj, "chunks") + @property + def chunks(self) -> T_NormalizedChunks: + return self._chunks + + @chunks.setter + def chunks(self, value: T_NormalizedChunks) -> None: + # ensure the chunks actually are normalized before setting them + assert isinstance(value, tuple) + for lengths_along_axis in value: + assert isinstance(lengths_along_axis, tuple) + for length in lengths_along_axis: + assert isinstance(length, int) + self._chunks = value + def rechunk(self, chunks: T_NormalizedChunks, **kwargs) -> DummyChunkedArray: copied = self.copy() copied.chunks = chunks @@ -161,14 +176,15 @@ def test_chunks(self) -> None: dummy_arr = DummyChunkedArray(shape=(9,), chunks=((3,),)) na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]) assert na.chunks == ((3,),) + assert na.chunksizes == {"x": (3,)} def test_rechunk(self) -> None: dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) - na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]).chunk( - chunks={"x": (2,)} - ) - assert isinstance(na.data, DummyChunkedArray) - assert na.chunks == ((2,),) + na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]) + rechunked_na = na.chunk(chunks={"x": (2,)}) + assert isinstance(rechunked_na.data, DummyChunkedArray) + assert rechunked_na.data.chunks == ((2,),) + assert rechunked_na.chunksizes == {"x": (2,)} def test_computation(self) -> None: dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) From 2cce6a0dc7def3e539716f8949a473808d22bafa Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 27 Jul 2024 12:30:17 -0700 Subject: [PATCH 13/30] remove now-redundant chunks and rechunk methods from DummyChunkManager --- xarray/tests/test_parallelcompat.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/xarray/tests/test_parallelcompat.py b/xarray/tests/test_parallelcompat.py index 5c3dddc12ea..30ef16f76db 100644 --- a/xarray/tests/test_parallelcompat.py +++ b/xarray/tests/test_parallelcompat.py @@ -79,9 +79,6 @@ def __init__(self): def is_chunked_array(self, data: Any) -> bool: return isinstance(data, DummyChunkedArray) - def chunks(self, data: DummyChunkedArray) -> T_NormalizedChunks: - return data.chunks - def normalize_chunks( self, chunks: T_Chunks | T_NormalizedChunks, @@ -101,9 +98,6 @@ def from_array( return da.from_array(data, chunks, **kwargs) - def rechunk(self, data: DummyChunkedArray, chunks, **kwargs) -> DummyChunkedArray: - return data.rechunk(chunks, **kwargs) - def compute(self, *data: DummyChunkedArray, **kwargs) -> tuple[np.ndarray, ...]: from dask.array import compute From 3cadd5379ebcdaa61f3d030084825195398e47e4 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 27 Jul 2024 14:12:13 -0700 Subject: [PATCH 14/30] commented-out code indicating problem with chunk normalization --- xarray/namedarray/core.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index 39ea3fd53ad..bbe0688ddba 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -821,6 +821,12 @@ def chunk( data_old = self._data if is_chunked_array(data_old): + print(f"problematic chunks = {chunks}") + # if is_dict_like(chunks) and chunks != {}: + # chunks = tuple(chunks.get(n, s) for n, s in enumerate(data_old.shape)) # type: ignore[assignment] + + print(f"hopefully normalized chunks = {chunks}") + # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. # Deliberately don't go through the chunkmanager so as to support chunked array types that don't need all the special computation methods. # See GH issue #8733 From 556161d0c99cf435e35483bc80ffaa6139751b49 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 29 Jul 2024 02:13:17 -0700 Subject: [PATCH 15/30] fixed bug with chunks passed as dict --- xarray/namedarray/core.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index bbe0688ddba..428b40c5a74 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -812,6 +812,8 @@ def chunk( chunks = either_dict_or_kwargs(chunks, chunks_kwargs, "chunk") if is_dict_like(chunks): + # turns dict[str, tuple[in, ..]] -> dict[int, tuple[int, ...]] + # This method of iteration allows for duplicated dimension names, GH8579 chunks = { dim_number: chunks[dim] @@ -825,6 +827,21 @@ def chunk( # if is_dict_like(chunks) and chunks != {}: # chunks = tuple(chunks.get(n, s) for n, s in enumerate(data_old.shape)) # type: ignore[assignment] + old_chunks = data_old.chunks + + if is_dict_like(chunks): + # turns dict[int, tuple[int, ...]] -> tuple[tuple[int, ...], ...], filling in unspecified dimensions using previous chunking + chunks = tuple( + [ + ( + chunks[dim_number] + if dim_number in chunks.keys() + else old_chunks[dim_number] + ) + for dim_number in range(self.ndim) + ] + ) + print(f"hopefully normalized chunks = {chunks}") # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. @@ -846,6 +863,7 @@ def chunk( ndata = ImplicitToExplicitIndexingAdapter(data_old, OuterIndexer) # type: ignore[assignment] if is_dict_like(chunks): + # turns dict[int, tuple[int, ...]] -> tuple[tuple[int, ...], ...], filling in unspecified dimensions using full length along each axis (i.e. array shape) chunks = tuple(chunks.get(n, s) for n, s in enumerate(ndata.shape)) # type: ignore[assignment] chunkmanager = guess_chunkmanager(chunked_array_type) From 0296f92d207afb7319b4e31ff03a76c145ddb251 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 29 Jul 2024 03:53:32 -0700 Subject: [PATCH 16/30] fix dodgy chunking patterns in tests --- xarray/tests/test_parallelcompat.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/xarray/tests/test_parallelcompat.py b/xarray/tests/test_parallelcompat.py index 30ef16f76db..a9d85bee862 100644 --- a/xarray/tests/test_parallelcompat.py +++ b/xarray/tests/test_parallelcompat.py @@ -167,21 +167,21 @@ class TestPassThroughNonRegisteredChunkedArrays: """ def test_chunks(self) -> None: - dummy_arr = DummyChunkedArray(shape=(9,), chunks=((3,),)) + dummy_arr = DummyChunkedArray(shape=(6,), chunks=((3, 3),)) na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]) - assert na.chunks == ((3,),) - assert na.chunksizes == {"x": (3,)} + assert na.chunks == ((3, 3),) + assert na.chunksizes == {"x": (3, 3)} def test_rechunk(self) -> None: - dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) + dummy_arr = DummyChunkedArray(shape=(4,), chunks=((4,),)) na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]) - rechunked_na = na.chunk(chunks={"x": (2,)}) + rechunked_na = na.chunk(chunks={"x": (2, 2)}) assert isinstance(rechunked_na.data, DummyChunkedArray) - assert rechunked_na.data.chunks == ((2,),) - assert rechunked_na.chunksizes == {"x": (2,)} + assert rechunked_na.data.chunks == ((2, 2),) + assert rechunked_na.chunksizes == {"x": (2, 2)} def test_computation(self) -> None: - dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),)) + dummy_arr = DummyChunkedArray(shape=(4,), chunks=((2, 2),)) na: NamedArray = NamedArray(data=dummy_arr, dims=["x"]) na.mean() From 9c2ab5ecff972f7444d86f3667f269ad7ca56ddb Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 29 Jul 2024 04:00:43 -0700 Subject: [PATCH 17/30] Revert "fixed bug with chunks passed as dict" This reverts commit 556161d0c99cf435e35483bc80ffaa6139751b49. --- xarray/namedarray/core.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index 428b40c5a74..bbe0688ddba 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -812,8 +812,6 @@ def chunk( chunks = either_dict_or_kwargs(chunks, chunks_kwargs, "chunk") if is_dict_like(chunks): - # turns dict[str, tuple[in, ..]] -> dict[int, tuple[int, ...]] - # This method of iteration allows for duplicated dimension names, GH8579 chunks = { dim_number: chunks[dim] @@ -827,21 +825,6 @@ def chunk( # if is_dict_like(chunks) and chunks != {}: # chunks = tuple(chunks.get(n, s) for n, s in enumerate(data_old.shape)) # type: ignore[assignment] - old_chunks = data_old.chunks - - if is_dict_like(chunks): - # turns dict[int, tuple[int, ...]] -> tuple[tuple[int, ...], ...], filling in unspecified dimensions using previous chunking - chunks = tuple( - [ - ( - chunks[dim_number] - if dim_number in chunks.keys() - else old_chunks[dim_number] - ) - for dim_number in range(self.ndim) - ] - ) - print(f"hopefully normalized chunks = {chunks}") # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. @@ -863,7 +846,6 @@ def chunk( ndata = ImplicitToExplicitIndexingAdapter(data_old, OuterIndexer) # type: ignore[assignment] if is_dict_like(chunks): - # turns dict[int, tuple[int, ...]] -> tuple[tuple[int, ...], ...], filling in unspecified dimensions using full length along each axis (i.e. array shape) chunks = tuple(chunks.get(n, s) for n, s in enumerate(ndata.shape)) # type: ignore[assignment] chunkmanager = guess_chunkmanager(chunked_array_type) From 665727b48da179f432486bff35648cfec373f82b Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 29 Jul 2024 02:13:17 -0700 Subject: [PATCH 18/30] fixed bug with chunks passed as dict --- xarray/namedarray/core.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index bbe0688ddba..428b40c5a74 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -812,6 +812,8 @@ def chunk( chunks = either_dict_or_kwargs(chunks, chunks_kwargs, "chunk") if is_dict_like(chunks): + # turns dict[str, tuple[in, ..]] -> dict[int, tuple[int, ...]] + # This method of iteration allows for duplicated dimension names, GH8579 chunks = { dim_number: chunks[dim] @@ -825,6 +827,21 @@ def chunk( # if is_dict_like(chunks) and chunks != {}: # chunks = tuple(chunks.get(n, s) for n, s in enumerate(data_old.shape)) # type: ignore[assignment] + old_chunks = data_old.chunks + + if is_dict_like(chunks): + # turns dict[int, tuple[int, ...]] -> tuple[tuple[int, ...], ...], filling in unspecified dimensions using previous chunking + chunks = tuple( + [ + ( + chunks[dim_number] + if dim_number in chunks.keys() + else old_chunks[dim_number] + ) + for dim_number in range(self.ndim) + ] + ) + print(f"hopefully normalized chunks = {chunks}") # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. @@ -846,6 +863,7 @@ def chunk( ndata = ImplicitToExplicitIndexingAdapter(data_old, OuterIndexer) # type: ignore[assignment] if is_dict_like(chunks): + # turns dict[int, tuple[int, ...]] -> tuple[tuple[int, ...], ...], filling in unspecified dimensions using full length along each axis (i.e. array shape) chunks = tuple(chunks.get(n, s) for n, s in enumerate(ndata.shape)) # type: ignore[assignment] chunkmanager = guess_chunkmanager(chunked_array_type) From 54adae7fc5c5631e8064080d98fe27e642bbad0f Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 29 Jul 2024 04:05:09 -0700 Subject: [PATCH 19/30] remove outdated comments --- xarray/namedarray/core.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index 428b40c5a74..a9b09878cec 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -823,10 +823,6 @@ def chunk( data_old = self._data if is_chunked_array(data_old): - print(f"problematic chunks = {chunks}") - # if is_dict_like(chunks) and chunks != {}: - # chunks = tuple(chunks.get(n, s) for n, s in enumerate(data_old.shape)) # type: ignore[assignment] - old_chunks = data_old.chunks if is_dict_like(chunks): @@ -842,8 +838,6 @@ def chunk( ] ) - print(f"hopefully normalized chunks = {chunks}") - # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. # Deliberately don't go through the chunkmanager so as to support chunked array types that don't need all the special computation methods. # See GH issue #8733 From b1024c918555b2e63ec75a079eaa0e21f392952d Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 31 Jul 2024 15:31:07 -0400 Subject: [PATCH 20/30] refactor to always use the same codepath for chunk normalization --- xarray/namedarray/core.py | 42 ++++++----------- xarray/namedarray/utils.py | 92 +++++++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 29 deletions(-) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index ea2f2e5703c..f91e7f6a27a 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -45,8 +45,8 @@ from xarray.namedarray.utils import ( either_dict_or_kwargs, infix_dims, - is_dict_like, is_duck_dask_array, + normalize_chunks_to_tuples, to_0d_object_array, ) @@ -811,37 +811,22 @@ def chunk( else: chunks = either_dict_or_kwargs(chunks, chunks_kwargs, "chunk") - if is_dict_like(chunks): - # turns dict[str, tuple[in, ..]] -> dict[int, tuple[int, ...]] - - # This method of iteration allows for duplicated dimension names, GH8579 - chunks = { - dim_number: chunks[dim] - for dim_number, dim in enumerate(self.dims) - if dim in chunks - } - data_old = self._data if is_chunked_array(data_old): old_chunks = data_old.chunks - if is_dict_like(chunks): - # turns dict[int, tuple[int, ...]] -> tuple[tuple[int, ...], ...], filling in unspecified dimensions using previous chunking - chunks = tuple( - [ - ( - chunks[dim_number] - if dim_number in chunks.keys() - else old_chunks[dim_number] - ) - for dim_number in range(self.ndim) - ] - ) + normalized_chunks = normalize_chunks_to_tuples( + chunks, + self.dims, + data_old.shape, + data_old.dtype, + previous_chunks=old_chunks, + ) # Assume any chunked array supports .rechunk - if it doesn't then at least a clear AttributeError will be raised. # Deliberately don't go through the chunkmanager so as to support chunked array types that don't need all the special computation methods. # See GH issue #8733 - data_chunked = data_old.rechunk(chunks) # type: ignore[union-attr] + data_chunked = data_old.rechunk(normalized_chunks) # type: ignore[union-attr] else: if not isinstance(data_old, ExplicitlyIndexed): ndata = data_old @@ -856,12 +841,13 @@ def chunk( # https://github.com/dask/dask/issues/2883 ndata = ImplicitToExplicitIndexingAdapter(data_old, OuterIndexer) # type: ignore[assignment] - if is_dict_like(chunks): - # turns dict[int, tuple[int, ...]] -> tuple[tuple[int, ...], ...], filling in unspecified dimensions using full length along each axis (i.e. array shape) - chunks = tuple(chunks.get(n, s) for n, s in enumerate(ndata.shape)) # type: ignore[assignment] + # will fallback to one chunk per axis as previous_chunks is not supplied + normalized_chunks = normalize_chunks_to_tuples( + chunks, self.dims, ndata.shape, ndata.dtype + ) chunkmanager = guess_chunkmanager(chunked_array_type) - data_chunked = chunkmanager.from_array(ndata, chunks, **from_array_kwargs) # type: ignore[arg-type] + data_chunked = chunkmanager.from_array(ndata, normalized_chunks, **from_array_kwargs) # type: ignore[arg-type] return self._replace(data=data_chunked) diff --git a/xarray/namedarray/utils.py b/xarray/namedarray/utils.py index b82a80b546a..6ba87977d2e 100644 --- a/xarray/namedarray/utils.py +++ b/xarray/namedarray/utils.py @@ -27,7 +27,7 @@ DaskArray = NDArray # type: ignore DaskCollection: Any = NDArray # type: ignore - from xarray.namedarray._typing import _Dim, duckarray + from xarray.namedarray._typing import T_Chunks, _Dim, _Dims, _Shape, duckarray K = TypeVar("K") @@ -222,3 +222,93 @@ def __dask_tokenize__(self) -> object: from dask.base import normalize_token return normalize_token((type(self), self._value)) + + +from xarray.namedarray._typing import _DType, _NormalizedChunks + + +def normalize_chunks_to_tuples( + chunks: T_Chunks, + dims: _Dims, + shape: _Shape, + dtype: _DType, + previous_chunks: _NormalizedChunks | None = None, +) -> _NormalizedChunks: + """ + Converts any specification of chunking to a tuple-of-tuple of ints along every axis. + + Handles: + tuples or lists of repeated chunk lengths + tuples of tuples of individual chunk lengths + dicts mapping dim name to chunk lengths + chunks passed as 'auto' + chunks passed as -1 + + If a chunk axis is not specified it will fallback to using `previous_chunks` if given, else the array shape (i.e. one chunk per axis). + """ + + if previous_chunks is None: + # default to using array shape, i.e. one chunk per axis + _previous_chunks: _NormalizedChunks = tuple((lc,) for lc in shape) + else: + _previous_chunks = previous_chunks + + if is_dict_like(chunks): + # turns dict[str, tuple[in, ..]] -> dict[int, tuple[int, ...]] + # This method of iteration allows for duplicated dimension names, GH8579 + chunks = { + dim_number: chunks[dim] + for dim_number, dim in enumerate(dims) + if dim in chunks + } + + # (everything below here is vendored from dask) + + # validate that chunk lengths are valid choices + ndim = len(dims) + chunks = {validate_axis(c, ndim): v for c, v in chunks.items()} + + # fill in any missing dimensions in the dict + for i in range(ndim): + if i not in chunks: + chunks[i] = _previous_chunks[i] + elif chunks[i] is None: + chunks[i] = _previous_chunks[i] + + # coerce list-like iterables to tuple-of-tuples + if isinstance(chunks, (tuple, list)): + chunks = tuple( + lc if lc is not None else rc for lc, rc in zip(chunks, _previous_chunks) + ) + + # TODO vendor the normalize_chunks function and remove it from the ChunkManager + from dask.array.core import normalize_chunks + + # supports the 'auto' option, using previous_chunks as a fallback + return cast( + _NormalizedChunks, + normalize_chunks( # type: ignore[no-untyped-call] + chunks, shape, dtype=dtype, previous_chunks=_previous_chunks + ), + ) + + +import numbers + +if Version(np.__version__).release >= (2, 0): + from numpy.exceptions import AxisError +else: + from numpy import AxisError + + +def validate_axis(axis: int, ndim: int) -> int: + """Validate an input to axis= keywords""" + if isinstance(axis, (tuple, list)): + return tuple(validate_axis(ax, ndim) for ax in axis) + if not isinstance(axis, numbers.Integral): + raise TypeError(f"Axis value must be an integer, got {axis}") + if axis < -ndim or axis >= ndim: + raise AxisError(f"Axis {axis} is out of bounds for array of dimension {ndim}") + if axis < 0: + axis += ndim + return axis From e926748e6f6976fb71c548faabb7af62b279dd72 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 31 Jul 2024 15:39:50 -0400 Subject: [PATCH 21/30] also use new codepath when determining preferred_chunks for backends --- xarray/core/dataset.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index daf56cb381f..9d409979d7c 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -126,6 +126,7 @@ ) from xarray.namedarray.parallelcompat import get_chunked_array_type, guess_chunkmanager from xarray.namedarray.pycompat import array_type, has_chunkmanager, is_chunked_array +from xarray.namedarray.utils import normalize_chunks_to_tuples from xarray.plot.accessor import DatasetPlotAccessor from xarray.util.deprecation_helpers import _deprecate_positional_args, deprecate_dims @@ -240,15 +241,13 @@ def _get_chunk(var: Variable, chunks, chunkmanager: ChunkManagerEntrypoint): preferred_chunk_shape = tuple( preferred_chunks.get(dim, size) for dim, size in zip(dims, shape) ) - if isinstance(chunks, Number) or (chunks == "auto"): - chunks = dict.fromkeys(dims, chunks) - chunk_shape = tuple( - chunks.get(dim, None) or preferred_chunk_sizes - for dim, preferred_chunk_sizes in zip(dims, preferred_chunk_shape) - ) - chunk_shape = chunkmanager.normalize_chunks( - chunk_shape, shape=shape, dtype=var.dtype, previous_chunks=preferred_chunk_shape + chunk_shape = normalize_chunks_to_tuples( + chunks, + var.dims, + var.shape, + var.dtype, + previous_chunks=preferred_chunk_shape, ) # Warn where requested chunks break preferred chunks, provided that the variable From def31317dde8a9cceb5d1a5bb3ed5c9bd19ceccf Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 31 Jul 2024 15:43:31 -0400 Subject: [PATCH 22/30] update TODO about what normalization is handled by dask --- xarray/namedarray/core.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index f91e7f6a27a..918e0de5bc8 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -805,9 +805,7 @@ def chunk( chunks = {} if isinstance(chunks, (float, str, int, tuple, list)): - # TODO we shouldn't assume here that other chunkmanagers can handle these types - # TODO should we call normalize_chunks here? - pass # dask.array.from_array can handle these directly + pass # normalize_chunks_to_tuples can handle these types directly, via dask.array.core.normalize_chunks else: chunks = either_dict_or_kwargs(chunks, chunks_kwargs, "chunk") From 8db961eaeac0d238187c973aa1a13d4866d1dde0 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 31 Jul 2024 15:46:20 -0400 Subject: [PATCH 23/30] remove normalize_chunks method from ChunkManagerEntrypoint --- xarray/namedarray/daskmanager.py | 19 -------------- xarray/namedarray/parallelcompat.py | 39 ----------------------------- xarray/tests/test_parallelcompat.py | 14 +---------- 3 files changed, 1 insertion(+), 71 deletions(-) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index df8681ec290..c2b0ed6f72b 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -41,25 +41,6 @@ def __init__(self) -> None: def is_chunked_array(self, data: duckarray[Any, Any]) -> bool: return is_duck_dask_array(data) - def normalize_chunks( - self, - chunks: T_Chunks | _NormalizedChunks, - shape: tuple[int, ...] | None = None, - limit: int | None = None, - dtype: _DType_co | None = None, - previous_chunks: _NormalizedChunks | None = None, - ) -> Any: - """Called by open_dataset""" - from dask.array.core import normalize_chunks - - return normalize_chunks( - chunks, - shape=shape, - limit=limit, - dtype=dtype, - previous_chunks=previous_chunks, - ) # type: ignore[no-untyped-call] - def from_array( self, data: Any, chunks: T_Chunks | _NormalizedChunks, **kwargs: Any ) -> DaskArray | Any: diff --git a/xarray/namedarray/parallelcompat.py b/xarray/namedarray/parallelcompat.py index c84813e2001..1451ddf7c5c 100644 --- a/xarray/namedarray/parallelcompat.py +++ b/xarray/namedarray/parallelcompat.py @@ -21,10 +21,8 @@ if TYPE_CHECKING: from xarray.namedarray._typing import ( _Chunks, - _DType, _DType_co, _NormalizedChunks, - _ShapeType, duckarray, ) @@ -218,43 +216,6 @@ def is_chunked_array(self, data: duckarray[Any, Any]) -> bool: """ return isinstance(data, self.array_cls) - @abstractmethod - def normalize_chunks( - self, - chunks: _Chunks | _NormalizedChunks, - shape: _ShapeType | None = None, - limit: int | None = None, - dtype: _DType | None = None, - previous_chunks: _NormalizedChunks | None = None, - ) -> _NormalizedChunks: - """ - Normalize given chunking pattern into an explicit tuple of tuples representation. - - Exposed primarily because different chunking backends may want to make different decisions about how to - automatically chunk along dimensions not given explicitly in the input chunks. - - Called internally by xarray.open_dataset. - - Parameters - ---------- - chunks : tuple, int, dict, or string - The chunks to be normalized. - shape : Tuple[int] - The shape of the array - limit : int (optional) - The maximum block size to target in bytes, - if freedom is given to choose - dtype : np.dtype - previous_chunks : Tuple[Tuple[int]], optional - Chunks from a previous array that we should use for inspiration when - rechunking dimensions automatically. - - See Also - -------- - dask.array.core.normalize_chunks - """ - raise NotImplementedError() - @abstractmethod def from_array( self, data: duckarray[Any, Any], chunks: _Chunks, **kwargs: Any diff --git a/xarray/tests/test_parallelcompat.py b/xarray/tests/test_parallelcompat.py index a9d85bee862..57a9eae8730 100644 --- a/xarray/tests/test_parallelcompat.py +++ b/xarray/tests/test_parallelcompat.py @@ -6,7 +6,7 @@ import numpy as np import pytest -from xarray.core.types import T_Chunks, T_DuckArray, T_NormalizedChunks +from xarray.core.types import T_DuckArray, T_NormalizedChunks from xarray.namedarray._typing import _Chunks from xarray.namedarray.core import NamedArray from xarray.namedarray.daskmanager import DaskManager @@ -79,18 +79,6 @@ def __init__(self): def is_chunked_array(self, data: Any) -> bool: return isinstance(data, DummyChunkedArray) - def normalize_chunks( - self, - chunks: T_Chunks | T_NormalizedChunks, - shape: tuple[int, ...] | None = None, - limit: int | None = None, - dtype: np.dtype | None = None, - previous_chunks: T_NormalizedChunks | None = None, - ) -> T_NormalizedChunks: - from dask.array.core import normalize_chunks - - return normalize_chunks(chunks, shape, limit, dtype, previous_chunks) - def from_array( self, data: T_DuckArray | np.typing.ArrayLike, chunks: _Chunks, **kwargs ) -> DummyChunkedArray: From 955c56e0c29fd7d58536f225ebb0640f4e477317 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 12 Aug 2024 15:11:10 -0400 Subject: [PATCH 24/30] vendor dask.array.normalize_chunks and its dependencies --- xarray/namedarray/utils.py | 38 +-- xarray/vendor/dask/array/core.py | 379 ++++++++++++++++++++++++++++++ xarray/vendor/dask/array/utils.py | 23 ++ xarray/vendor/dask/utils.py | 89 +++++++ xarray/vendor/toolz/itertoolz.py | 17 ++ 5 files changed, 519 insertions(+), 27 deletions(-) create mode 100644 xarray/vendor/dask/array/core.py create mode 100644 xarray/vendor/dask/array/utils.py create mode 100644 xarray/vendor/dask/utils.py create mode 100644 xarray/vendor/toolz/itertoolz.py diff --git a/xarray/namedarray/utils.py b/xarray/namedarray/utils.py index 6ba87977d2e..6fa59832535 100644 --- a/xarray/namedarray/utils.py +++ b/xarray/namedarray/utils.py @@ -10,7 +10,7 @@ import numpy as np from packaging.version import Version -from xarray.namedarray._typing import ErrorOptionsWithWarn, _DimsLike +from xarray.namedarray._typing import ErrorOptionsWithWarn, _DimsLike, _NormalizedChunks if TYPE_CHECKING: if sys.version_info >= (3, 10): @@ -27,7 +27,14 @@ DaskArray = NDArray # type: ignore DaskCollection: Any = NDArray # type: ignore - from xarray.namedarray._typing import T_Chunks, _Dim, _Dims, _Shape, duckarray + from xarray.namedarray._typing import ( + T_Chunks, + _Dim, + _Dims, + _DType, + _Shape, + duckarray, + ) K = TypeVar("K") @@ -224,9 +231,6 @@ def __dask_tokenize__(self) -> object: return normalize_token((type(self), self._value)) -from xarray.namedarray._typing import _DType, _NormalizedChunks - - def normalize_chunks_to_tuples( chunks: T_Chunks, dims: _Dims, @@ -263,6 +267,7 @@ def normalize_chunks_to_tuples( } # (everything below here is vendored from dask) + from xarray.vendor.dask.array.utils import validate_axis # validate that chunk lengths are valid choices ndim = len(dims) @@ -282,7 +287,7 @@ def normalize_chunks_to_tuples( ) # TODO vendor the normalize_chunks function and remove it from the ChunkManager - from dask.array.core import normalize_chunks + from xarray.vendor.dask.array.core import normalize_chunks # supports the 'auto' option, using previous_chunks as a fallback return cast( @@ -291,24 +296,3 @@ def normalize_chunks_to_tuples( chunks, shape, dtype=dtype, previous_chunks=_previous_chunks ), ) - - -import numbers - -if Version(np.__version__).release >= (2, 0): - from numpy.exceptions import AxisError -else: - from numpy import AxisError - - -def validate_axis(axis: int, ndim: int) -> int: - """Validate an input to axis= keywords""" - if isinstance(axis, (tuple, list)): - return tuple(validate_axis(ax, ndim) for ax in axis) - if not isinstance(axis, numbers.Integral): - raise TypeError(f"Axis value must be an integer, got {axis}") - if axis < -ndim or axis >= ndim: - raise AxisError(f"Axis {axis} is out of bounds for array of dimension {ndim}") - if axis < 0: - axis += ndim - return axis diff --git a/xarray/vendor/dask/array/core.py b/xarray/vendor/dask/array/core.py new file mode 100644 index 00000000000..062f061ac78 --- /dev/null +++ b/xarray/vendor/dask/array/core.py @@ -0,0 +1,379 @@ +from __future__ import annotations + +import math +from numbers import Number + +import numpy as np + +from xarray.vendor.toolz.itertoolz import frequencies +from xarray.vendor.dask.utils import is_integer, parse_bytes + +unknown_chunk_message = ( + "\n\n" + "A possible solution: " + "https://docs.dask.org/en/latest/array-chunks.html#unknown-chunks\n" + "Summary: to compute chunks sizes, use\n\n" + " x.compute_chunk_sizes() # for Dask Array `x`\n" + " ddf.to_dask_array(lengths=True) # for Dask DataFrame `ddf`" +) + +def blockdims_from_blockshape(shape, chunks): + """ + + >>> blockdims_from_blockshape((10, 10), (4, 3)) + ((4, 4, 2), (3, 3, 3, 1)) + >>> blockdims_from_blockshape((10, 0), (4, 0)) + ((4, 4, 2), (0,)) + """ + if chunks is None: + raise TypeError("Must supply chunks= keyword argument") + if shape is None: + raise TypeError("Must supply shape= keyword argument") + if np.isnan(sum(shape)) or np.isnan(sum(chunks)): + raise ValueError( + "Array chunk sizes are unknown. shape: %s, chunks: %s%s" + % (shape, chunks, unknown_chunk_message) + ) + if not all(map(is_integer, chunks)): + raise ValueError("chunks can only contain integers.") + if not all(map(is_integer, shape)): + raise ValueError("shape can only contain integers.") + shape = tuple(map(int, shape)) + chunks = tuple(map(int, chunks)) + return tuple( + ((bd,) * (d // bd) + ((d % bd,) if d % bd else ()) if d else (0,)) + for d, bd in zip(shape, chunks) + ) + + +CHUNKS_NONE_ERROR_MESSAGE = """ +You must specify a chunks= keyword argument. +This specifies the chunksize of your array blocks. + +See the following documentation page for details: + https://docs.dask.org/en/latest/array-creation.html#chunks +""".strip() + + +def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks=None): + """Normalize chunks to tuple of tuples + + This takes in a variety of input types and information and produces a full + tuple-of-tuples result for chunks, suitable to be passed to Array or + rechunk or any other operation that creates a Dask array. + + Parameters + ---------- + chunks: tuple, int, dict, or string + The chunks to be normalized. See examples below for more details + shape: Tuple[int] + The shape of the array + limit: int (optional) + The maximum block size to target in bytes, + if freedom is given to choose + dtype: np.dtype + previous_chunks: Tuple[Tuple[int]] optional + Chunks from a previous array that we should use for inspiration when + rechunking auto dimensions. If not provided but auto-chunking exists + then auto-dimensions will prefer square-like chunk shapes. + + Examples + -------- + Specify uniform chunk sizes + + >>> from dask.array.core import normalize_chunks + >>> normalize_chunks((2, 2), shape=(5, 6)) + ((2, 2, 1), (2, 2, 2)) + + Also passes through fully explicit tuple-of-tuples + + >>> normalize_chunks(((2, 2, 1), (2, 2, 2)), shape=(5, 6)) + ((2, 2, 1), (2, 2, 2)) + + Cleans up lists to tuples + + >>> normalize_chunks([[2, 2], [3, 3]]) + ((2, 2), (3, 3)) + + Expands integer inputs 10 -> (10, 10) + + >>> normalize_chunks(10, shape=(30, 5)) + ((10, 10, 10), (5,)) + + Expands dict inputs + + >>> normalize_chunks({0: 2, 1: 3}, shape=(6, 6)) + ((2, 2, 2), (3, 3)) + + The values -1 and None get mapped to full size + + >>> normalize_chunks((5, -1), shape=(10, 10)) + ((5, 5), (10,)) + + Use the value "auto" to automatically determine chunk sizes along certain + dimensions. This uses the ``limit=`` and ``dtype=`` keywords to + determine how large to make the chunks. The term "auto" can be used + anywhere an integer can be used. See array chunking documentation for more + information. + + >>> normalize_chunks(("auto",), shape=(20,), limit=5, dtype='uint8') + ((5, 5, 5, 5),) + + You can also use byte sizes (see :func:`dask.utils.parse_bytes`) in place of + "auto" to ask for a particular size + + >>> normalize_chunks("1kiB", shape=(2000,), dtype='float32') + ((256, 256, 256, 256, 256, 256, 256, 208),) + + Respects null dimensions + + >>> normalize_chunks((), shape=(0, 0)) + ((0,), (0,)) + """ + if dtype and not isinstance(dtype, np.dtype): + dtype = np.dtype(dtype) + if chunks is None: + raise ValueError(CHUNKS_NONE_ERROR_MESSAGE) + if isinstance(chunks, list): + chunks = tuple(chunks) + if isinstance(chunks, (Number, str)): + chunks = (chunks,) * len(shape) + if isinstance(chunks, dict): + chunks = tuple(chunks.get(i, None) for i in range(len(shape))) + if isinstance(chunks, np.ndarray): + chunks = chunks.tolist() + if not chunks and shape and all(s == 0 for s in shape): + chunks = ((0,),) * len(shape) + + if ( + shape + and len(shape) == 1 + and len(chunks) > 1 + and all(isinstance(c, (Number, str)) for c in chunks) + ): + chunks = (chunks,) + + if shape and len(chunks) != len(shape): + raise ValueError( + "Chunks and shape must be of the same length/dimension. " + "Got chunks=%s, shape=%s" % (chunks, shape) + ) + if -1 in chunks or None in chunks: + chunks = tuple(s if c == -1 or c is None else c for c, s in zip(chunks, shape)) + + # If specifying chunk size in bytes, use that value to set the limit. + # Verify there is only one consistent value of limit or chunk-bytes used. + for c in chunks: + if isinstance(c, str) and c != "auto": + parsed = parse_bytes(c) + if limit is None: + limit = parsed + elif parsed != limit: + raise ValueError( + "Only one consistent value of limit or chunk is allowed." + "Used %s != %s" % (parsed, limit) + ) + # Substitute byte limits with 'auto' now that limit is set. + chunks = tuple("auto" if isinstance(c, str) and c != "auto" else c for c in chunks) + + if any(c == "auto" for c in chunks): + chunks = auto_chunks(chunks, shape, limit, dtype, previous_chunks) + + if shape is not None: + chunks = tuple(c if c not in {None, -1} else s for c, s in zip(chunks, shape)) + + if chunks and shape is not None: + chunks = sum( + ( + blockdims_from_blockshape((s,), (c,)) + if not isinstance(c, (tuple, list)) + else (c,) + for s, c in zip(shape, chunks) + ), + (), + ) + for c in chunks: + if not c: + raise ValueError( + "Empty tuples are not allowed in chunks. Express " + "zero length dimensions with 0(s) in chunks" + ) + + if shape is not None: + if len(chunks) != len(shape): + raise ValueError( + "Input array has %d dimensions but the supplied " + "chunks has only %d dimensions" % (len(shape), len(chunks)) + ) + if not all( + c == s or (math.isnan(c) or math.isnan(s)) + for c, s in zip(map(sum, chunks), shape) + ): + raise ValueError( + "Chunks do not add up to shape. " + "Got chunks=%s, shape=%s" % (chunks, shape) + ) + + return tuple( + tuple(int(x) if not math.isnan(x) else np.nan for x in c) for c in chunks + ) + + +def _compute_multiplier(limit: int, dtype, largest_block: int, result): + """ + Utility function for auto_chunk, to fin how much larger or smaller the ideal + chunk size is relative to what we have now. + """ + return ( + limit + / dtype.itemsize + / largest_block + / math.prod(r for r in result.values() if r) + ) + + +def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None): + """Determine automatic chunks + + This takes in a chunks value that contains ``"auto"`` values in certain + dimensions and replaces those values with concrete dimension sizes that try + to get chunks to be of a certain size in bytes, provided by the ``limit=`` + keyword. If multiple dimensions are marked as ``"auto"`` then they will + all respond to meet the desired byte limit, trying to respect the aspect + ratio of their dimensions in ``previous_chunks=``, if given. + + Parameters + ---------- + chunks: Tuple + A tuple of either dimensions or tuples of explicit chunk dimensions + Some entries should be "auto" + shape: Tuple[int] + limit: int, str + The maximum allowable size of a chunk in bytes + previous_chunks: Tuple[Tuple[int]] + + See also + -------- + normalize_chunks: for full docstring and parameters + """ + if previous_chunks is not None: + previous_chunks = tuple( + c if isinstance(c, tuple) else (c,) for c in previous_chunks + ) + chunks = list(chunks) + + autos = {i for i, c in enumerate(chunks) if c == "auto"} + if not autos: + return tuple(chunks) + + if limit is None: + limit = "128MiB" # config.get("array.chunk-size") + if isinstance(limit, str): + limit = parse_bytes(limit) + + if dtype is None: + raise TypeError("dtype must be known for auto-chunking") + + if dtype.hasobject: + raise NotImplementedError( + "Can not use auto rechunking with object dtype. " + "We are unable to estimate the size in bytes of object data" + ) + + for x in tuple(chunks) + tuple(shape): + if ( + isinstance(x, Number) + and np.isnan(x) + or isinstance(x, tuple) + and np.isnan(x).any() + ): + raise ValueError( + "Can not perform automatic rechunking with unknown " + "(nan) chunk sizes.%s" % unknown_chunk_message + ) + + limit = max(1, limit) + + largest_block = math.prod( + cs if isinstance(cs, Number) else max(cs) for cs in chunks if cs != "auto" + ) + + if previous_chunks: + # Base ideal ratio on the median chunk size of the previous chunks + result = {a: np.median(previous_chunks[a]) for a in autos} + + ideal_shape = [] + for i, s in enumerate(shape): + chunk_frequencies = frequencies(previous_chunks[i]) + mode, count = max(chunk_frequencies.items(), key=lambda kv: kv[1]) + if mode > 1 and count >= len(previous_chunks[i]) / 2: + ideal_shape.append(mode) + else: + ideal_shape.append(s) + + # How much larger or smaller the ideal chunk size is relative to what we have now + multiplier = _compute_multiplier(limit, dtype, largest_block, result) + + last_multiplier = 0 + last_autos = set() + while ( + multiplier != last_multiplier or autos != last_autos + ): # while things change + last_multiplier = multiplier # record previous values + last_autos = set(autos) # record previous values + + # Expand or contract each of the dimensions appropriately + for a in sorted(autos): + if ideal_shape[a] == 0: + result[a] = 0 + continue + proposed = result[a] * multiplier ** (1 / len(autos)) + if proposed > shape[a]: # we've hit the shape boundary + autos.remove(a) + largest_block *= shape[a] + chunks[a] = shape[a] + del result[a] + else: + result[a] = round_to(proposed, ideal_shape[a]) + + # recompute how much multiplier we have left, repeat + multiplier = _compute_multiplier(limit, dtype, largest_block, result) + + for k, v in result.items(): + chunks[k] = v + return tuple(chunks) + + else: + # Check if dtype.itemsize is greater than 0 + if dtype.itemsize == 0: + raise ValueError( + "auto-chunking with dtype.itemsize == 0 is not supported, please pass in `chunks` explicitly" + ) + size = (limit / dtype.itemsize / largest_block) ** (1 / len(autos)) + small = [i for i in autos if shape[i] < size] + if small: + for i in small: + chunks[i] = (shape[i],) + return auto_chunks(chunks, shape, limit, dtype) + + for i in autos: + chunks[i] = round_to(size, shape[i]) + + return tuple(chunks) + + +def round_to(c, s): + """Return a chunk dimension that is close to an even multiple or factor + + We want values for c that are nicely aligned with s. + + If c is smaller than s we use the original chunk size and accept an + uneven chunk at the end. + + If c is larger than s then we want the largest multiple of s that is still + smaller than c. + """ + if c <= s: + return max(1, int(c)) + else: + return c // s * s diff --git a/xarray/vendor/dask/array/utils.py b/xarray/vendor/dask/array/utils.py new file mode 100644 index 00000000000..1c94633f9fa --- /dev/null +++ b/xarray/vendor/dask/array/utils.py @@ -0,0 +1,23 @@ +import numbers +import numpy + +from packaging.version import Version + + +if Version(numpy.__version__).release >= (2, 0): + from numpy.exceptions import AxisError +else: + from numpy import AxisError # type: ignore[attr-defined, no-redef] + + +def validate_axis(axis: int, ndim: int) -> int: + """Validate an input to axis= keywords""" + if isinstance(axis, (tuple, list)): + return tuple(validate_axis(ax, ndim) for ax in axis) + if not isinstance(axis, numbers.Integral): + raise TypeError(f"Axis value must be an integer, got {axis}") + if axis < -ndim or axis >= ndim: + raise AxisError(f"Axis {axis} is out of bounds for array of dimension {ndim}") + if axis < 0: + axis += ndim + return axis diff --git a/xarray/vendor/dask/utils.py b/xarray/vendor/dask/utils.py new file mode 100644 index 00000000000..b04965acf68 --- /dev/null +++ b/xarray/vendor/dask/utils.py @@ -0,0 +1,89 @@ +from numbers import Integral + + +def is_integer(i) -> bool: + """ + >>> is_integer(6) + True + >>> is_integer(42.0) + True + >>> is_integer('abc') + False + """ + return isinstance(i, Integral) or (isinstance(i, float) and i.is_integer()) + + +def parse_bytes(s: float | str) -> int: + """Parse byte string to numbers + + >>> from dask.utils import parse_bytes + >>> parse_bytes('100') + 100 + >>> parse_bytes('100 MB') + 100000000 + >>> parse_bytes('100M') + 100000000 + >>> parse_bytes('5kB') + 5000 + >>> parse_bytes('5.4 kB') + 5400 + >>> parse_bytes('1kiB') + 1024 + >>> parse_bytes('1e6') + 1000000 + >>> parse_bytes('1e6 kB') + 1000000000 + >>> parse_bytes('MB') + 1000000 + >>> parse_bytes(123) + 123 + >>> parse_bytes('5 foos') + Traceback (most recent call last): + ... + ValueError: Could not interpret 'foos' as a byte unit + """ + if isinstance(s, (int, float)): + return int(s) + s = s.replace(" ", "") + if not any(char.isdigit() for char in s): + s = "1" + s + + for i in range(len(s) - 1, -1, -1): + if not s[i].isalpha(): + break + index = i + 1 + + prefix = s[:index] + suffix = s[index:] + + try: + n = float(prefix) + except ValueError as e: + raise ValueError("Could not interpret '%s' as a number" % prefix) from e + + try: + multiplier = byte_sizes[suffix.lower()] + except KeyError as e: + raise ValueError("Could not interpret '%s' as a byte unit" % suffix) from e + + result = n * multiplier + return int(result) + + +byte_sizes = { + "kB": 10**3, + "MB": 10**6, + "GB": 10**9, + "TB": 10**12, + "PB": 10**15, + "KiB": 2**10, + "MiB": 2**20, + "GiB": 2**30, + "TiB": 2**40, + "PiB": 2**50, + "B": 1, + "": 1, +} +byte_sizes = {k.lower(): v for k, v in byte_sizes.items()} +byte_sizes.update({k[0]: v for k, v in byte_sizes.items() if k and "i" not in k}) +byte_sizes.update({k[:-1]: v for k, v in byte_sizes.items() if k and "i" in k}) diff --git a/xarray/vendor/toolz/itertoolz.py b/xarray/vendor/toolz/itertoolz.py new file mode 100644 index 00000000000..57d6cb3f51f --- /dev/null +++ b/xarray/vendor/toolz/itertoolz.py @@ -0,0 +1,17 @@ +import collections + + +def frequencies(seq): + """ Find number of occurrences of each value in seq + + >>> frequencies(['cat', 'cat', 'ox', 'pig', 'pig', 'cat']) #doctest: +SKIP + {'cat': 3, 'ox': 1, 'pig': 2} + + See Also: + countby + groupby + """ + d = collections.defaultdict(int) + for item in seq: + d[item] += 1 + return dict(d) From aa3afff22828acde54d12d479bc37e521db8a337 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 12 Aug 2024 19:14:35 +0000 Subject: [PATCH 25/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/vendor/dask/array/core.py | 17 ++++++++++------- xarray/vendor/dask/array/utils.py | 3 +-- xarray/vendor/dask/utils.py | 22 +++++++++++----------- xarray/vendor/toolz/itertoolz.py | 4 ++-- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/xarray/vendor/dask/array/core.py b/xarray/vendor/dask/array/core.py index 062f061ac78..a47b6f51f35 100644 --- a/xarray/vendor/dask/array/core.py +++ b/xarray/vendor/dask/array/core.py @@ -5,8 +5,8 @@ import numpy as np -from xarray.vendor.toolz.itertoolz import frequencies from xarray.vendor.dask.utils import is_integer, parse_bytes +from xarray.vendor.toolz.itertoolz import frequencies unknown_chunk_message = ( "\n\n" @@ -17,6 +17,7 @@ " ddf.to_dask_array(lengths=True) # for Dask DataFrame `ddf`" ) + def blockdims_from_blockshape(shape, chunks): """ @@ -116,13 +117,13 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks anywhere an integer can be used. See array chunking documentation for more information. - >>> normalize_chunks(("auto",), shape=(20,), limit=5, dtype='uint8') + >>> normalize_chunks(("auto",), shape=(20,), limit=5, dtype="uint8") ((5, 5, 5, 5),) You can also use byte sizes (see :func:`dask.utils.parse_bytes`) in place of "auto" to ask for a particular size - >>> normalize_chunks("1kiB", shape=(2000,), dtype='float32') + >>> normalize_chunks("1kiB", shape=(2000,), dtype="float32") ((256, 256, 256, 256, 256, 256, 256, 208),) Respects null dimensions @@ -185,9 +186,11 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks if chunks and shape is not None: chunks = sum( ( - blockdims_from_blockshape((s,), (c,)) - if not isinstance(c, (tuple, list)) - else (c,) + ( + blockdims_from_blockshape((s,), (c,)) + if not isinstance(c, (tuple, list)) + else (c,) + ) for s, c in zip(shape, chunks) ), (), @@ -267,7 +270,7 @@ def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None): return tuple(chunks) if limit is None: - limit = "128MiB" # config.get("array.chunk-size") + limit = "128MiB" # config.get("array.chunk-size") if isinstance(limit, str): limit = parse_bytes(limit) diff --git a/xarray/vendor/dask/array/utils.py b/xarray/vendor/dask/array/utils.py index 1c94633f9fa..e8f65b6dd59 100644 --- a/xarray/vendor/dask/array/utils.py +++ b/xarray/vendor/dask/array/utils.py @@ -1,9 +1,8 @@ import numbers -import numpy +import numpy from packaging.version import Version - if Version(numpy.__version__).release >= (2, 0): from numpy.exceptions import AxisError else: diff --git a/xarray/vendor/dask/utils.py b/xarray/vendor/dask/utils.py index b04965acf68..7dd52200674 100644 --- a/xarray/vendor/dask/utils.py +++ b/xarray/vendor/dask/utils.py @@ -7,7 +7,7 @@ def is_integer(i) -> bool: True >>> is_integer(42.0) True - >>> is_integer('abc') + >>> is_integer("abc") False """ return isinstance(i, Integral) or (isinstance(i, float) and i.is_integer()) @@ -17,27 +17,27 @@ def parse_bytes(s: float | str) -> int: """Parse byte string to numbers >>> from dask.utils import parse_bytes - >>> parse_bytes('100') + >>> parse_bytes("100") 100 - >>> parse_bytes('100 MB') + >>> parse_bytes("100 MB") 100000000 - >>> parse_bytes('100M') + >>> parse_bytes("100M") 100000000 - >>> parse_bytes('5kB') + >>> parse_bytes("5kB") 5000 - >>> parse_bytes('5.4 kB') + >>> parse_bytes("5.4 kB") 5400 - >>> parse_bytes('1kiB') + >>> parse_bytes("1kiB") 1024 - >>> parse_bytes('1e6') + >>> parse_bytes("1e6") 1000000 - >>> parse_bytes('1e6 kB') + >>> parse_bytes("1e6 kB") 1000000000 - >>> parse_bytes('MB') + >>> parse_bytes("MB") 1000000 >>> parse_bytes(123) 123 - >>> parse_bytes('5 foos') + >>> parse_bytes("5 foos") Traceback (most recent call last): ... ValueError: Could not interpret 'foos' as a byte unit diff --git a/xarray/vendor/toolz/itertoolz.py b/xarray/vendor/toolz/itertoolz.py index 57d6cb3f51f..f49e60b6dce 100644 --- a/xarray/vendor/toolz/itertoolz.py +++ b/xarray/vendor/toolz/itertoolz.py @@ -2,9 +2,9 @@ def frequencies(seq): - """ Find number of occurrences of each value in seq + """Find number of occurrences of each value in seq - >>> frequencies(['cat', 'cat', 'ox', 'pig', 'pig', 'cat']) #doctest: +SKIP + >>> frequencies(["cat", "cat", "ox", "pig", "pig", "cat"]) # doctest: +SKIP {'cat': 3, 'ox': 1, 'pig': 2} See Also: From bd29f79186df7cb0bb5f35a7a1c6ff17697f1682 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 12 Aug 2024 15:23:24 -0400 Subject: [PATCH 26/30] use Union for python <3.10 --- xarray/vendor/dask/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xarray/vendor/dask/utils.py b/xarray/vendor/dask/utils.py index 7dd52200674..94d0e8062f3 100644 --- a/xarray/vendor/dask/utils.py +++ b/xarray/vendor/dask/utils.py @@ -1,4 +1,5 @@ from numbers import Integral +from typing import Union def is_integer(i) -> bool: @@ -13,7 +14,7 @@ def is_integer(i) -> bool: return isinstance(i, Integral) or (isinstance(i, float) and i.is_integer()) -def parse_bytes(s: float | str) -> int: +def parse_bytes(s: Union[float, str]) -> int: """Parse byte string to numbers >>> from dask.utils import parse_bytes From 7ad5f07e9bc2daf99d228abc3d5d7c932af5231b Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 12 Aug 2024 15:34:37 -0400 Subject: [PATCH 27/30] some pre-commit fixes --- xarray/vendor/dask/array/core.py | 12 +++++------- xarray/vendor/dask/utils.py | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/xarray/vendor/dask/array/core.py b/xarray/vendor/dask/array/core.py index a47b6f51f35..9e5fcc05917 100644 --- a/xarray/vendor/dask/array/core.py +++ b/xarray/vendor/dask/array/core.py @@ -32,8 +32,7 @@ def blockdims_from_blockshape(shape, chunks): raise TypeError("Must supply shape= keyword argument") if np.isnan(sum(shape)) or np.isnan(sum(chunks)): raise ValueError( - "Array chunk sizes are unknown. shape: %s, chunks: %s%s" - % (shape, chunks, unknown_chunk_message) + f"Array chunk sizes are unknown. shape: {shape}, chunks: {chunks}{unknown_chunk_message}" ) if not all(map(is_integer, chunks)): raise ValueError("chunks can only contain integers.") @@ -157,7 +156,7 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks if shape and len(chunks) != len(shape): raise ValueError( "Chunks and shape must be of the same length/dimension. " - "Got chunks=%s, shape=%s" % (chunks, shape) + f"Got chunks={chunks}, shape={shape}" ) if -1 in chunks or None in chunks: chunks = tuple(s if c == -1 or c is None else c for c, s in zip(chunks, shape)) @@ -172,7 +171,7 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks elif parsed != limit: raise ValueError( "Only one consistent value of limit or chunk is allowed." - "Used %s != %s" % (parsed, limit) + f"Used {parsed} != {limit}" ) # Substitute byte limits with 'auto' now that limit is set. chunks = tuple("auto" if isinstance(c, str) and c != "auto" else c for c in chunks) @@ -213,8 +212,7 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks for c, s in zip(map(sum, chunks), shape) ): raise ValueError( - "Chunks do not add up to shape. " - "Got chunks=%s, shape=%s" % (chunks, shape) + "Chunks do not add up to shape. " f"Got chunks={chunks}, shape={shape}" ) return tuple( @@ -292,7 +290,7 @@ def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None): ): raise ValueError( "Can not perform automatic rechunking with unknown " - "(nan) chunk sizes.%s" % unknown_chunk_message + f"(nan) chunk sizes.{unknown_chunk_message}" ) limit = max(1, limit) diff --git a/xarray/vendor/dask/utils.py b/xarray/vendor/dask/utils.py index 94d0e8062f3..1e528df1639 100644 --- a/xarray/vendor/dask/utils.py +++ b/xarray/vendor/dask/utils.py @@ -60,12 +60,12 @@ def parse_bytes(s: Union[float, str]) -> int: try: n = float(prefix) except ValueError as e: - raise ValueError("Could not interpret '%s' as a number" % prefix) from e + raise ValueError(f"Could not interpret '{prefix}' as a number") from e try: multiplier = byte_sizes[suffix.lower()] except KeyError as e: - raise ValueError("Could not interpret '%s' as a byte unit" % suffix) from e + raise ValueError(f"Could not interpret '{suffix}' as a byte unit") from e result = n * multiplier return int(result) From d14b705733d14caf0fe76500516366bcf8c99f22 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 12 Aug 2024 15:56:21 -0400 Subject: [PATCH 28/30] add __init__.py's to avoid import problems --- xarray/vendor/dask/__init__.py | 0 xarray/vendor/dask/array/__init__.py | 0 xarray/vendor/toolz/__init__.py | 0 3 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 xarray/vendor/dask/__init__.py create mode 100644 xarray/vendor/dask/array/__init__.py create mode 100644 xarray/vendor/toolz/__init__.py diff --git a/xarray/vendor/dask/__init__.py b/xarray/vendor/dask/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/xarray/vendor/dask/array/__init__.py b/xarray/vendor/dask/array/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/xarray/vendor/toolz/__init__.py b/xarray/vendor/toolz/__init__.py new file mode 100644 index 00000000000..e69de29bb2d From aac1566aacedf2c27c226ca10e671481d7dca7dc Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 12 Aug 2024 16:08:58 -0400 Subject: [PATCH 29/30] add vendor/__init__.py --- xarray/vendor/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 xarray/vendor/__init__.py diff --git a/xarray/vendor/__init__.py b/xarray/vendor/__init__.py new file mode 100644 index 00000000000..e69de29bb2d From a0d1e84c5b2d8314d6fcc877482b64337547b366 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 12 Aug 2024 16:31:20 -0400 Subject: [PATCH 30/30] try to shut mypy up --- xarray/namedarray/core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/xarray/namedarray/core.py b/xarray/namedarray/core.py index 918e0de5bc8..e97b8d19e1a 100644 --- a/xarray/namedarray/core.py +++ b/xarray/namedarray/core.py @@ -811,9 +811,10 @@ def chunk( data_old = self._data if is_chunked_array(data_old): + cast(_chunkedarray, data_old) old_chunks = data_old.chunks - normalized_chunks = normalize_chunks_to_tuples( + normalized_chunks = normalize_chunks_to_tuples( # type: ignore[arg-type] chunks, self.dims, data_old.shape, @@ -840,7 +841,7 @@ def chunk( ndata = ImplicitToExplicitIndexingAdapter(data_old, OuterIndexer) # type: ignore[assignment] # will fallback to one chunk per axis as previous_chunks is not supplied - normalized_chunks = normalize_chunks_to_tuples( + normalized_chunks = normalize_chunks_to_tuples( # type: ignore[arg-type] chunks, self.dims, ndata.shape, ndata.dtype )