From 507c34c077bf3d145784308965c85c6ab980ba09 Mon Sep 17 00:00:00 2001 From: Martin Yeo <40734014+trexfeathers@users.noreply.github.com> Date: Thu, 23 Nov 2023 15:12:55 +0000 Subject: [PATCH] Mergeback of `FEATURE_chunk_control` branch (#5588) * Merge chunk control code into latest iris (#5565) * Dask chunking control for netcdf loading. * renamed loader * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix indentation error, perhaps also docstring error * fixed result error in loader, and set tests to treat as big files * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * trial and error, solve non iterable tuple 1.0 * trial and error, solve non iterable tuple 2.0 (used if var is none: instead of if var: ) * commented out docstring * fixed mock 'no name' failure * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixed precommit issues * corrected docstrings as per review comments * Removed unnecessary line Co-authored-by: Martin Yeo <40734014+trexfeathers@users.noreply.github.com> --------- Co-authored-by: Patrick Peglar Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Martin Yeo <40734014+trexfeathers@users.noreply.github.com> * Chunk control modes (#5575) * added modes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * added as_dask mode * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * cleaned up enum and as_dask, as per review comments * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * corrected to in final place * unindented lazy_param assignment one indent --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * Corrected required type of dimension_chunksizes. (#5581) * Chunk Control Tests (#5583) * converted tests to pytest, added neg_one, and incomplete from_file and as_dask tests * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * added from_file test * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * added mocking tests * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * trial and error with mocks and patches, may or may not work * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * converted Mock to patch in as_dask test * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * review comment changes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * pre commit fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * review comments, and added test in test__get_cf_var_data() * added in another test * added tests and fixed review comments * added AuxCoord test --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * Chunk control minor fixes (#5593) * Disallow chunks=None in optimum_chunksize. * Clearer docstrings. * Corrected docstring. * Chunk Control documentation (#5597) * init PR, skeleton TP * whoops, missed the TP. * fixed doctests in rst file * correct triple chevron to elipses * updated set doctest to better show functionality * removed in-progress doctest code * Review comments, part 1 * Review comments, part 2 * changed numpy docs dict * wait, this way is better * fixed linkcheck failures (maybe) * fixed :meth: * fixed a couple doc bits * hopefully fixed doctests * newest review comments * fixed rendering, and wording in docstring * fixed docstring numpyness * What's New Entry (#5601) * written whatsnew entry * added ref * moved label to before title --------- Co-authored-by: Elias <110238618+ESadek-MO@users.noreply.github.com> Co-authored-by: Patrick Peglar Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- docs/src/techpapers/index.rst | 1 + docs/src/techpapers/netcdf_io.rst | 140 ++++++++++ docs/src/whatsnew/latest.rst | 8 + lib/iris/_lazy_data.py | 203 ++++++++++----- lib/iris/fileformats/netcdf/loader.py | 243 +++++++++++++++++- .../netcdf/loader/test__chunk_control.py | 216 ++++++++++++++++ .../netcdf/loader/test__get_cf_var_data.py | 13 +- .../tests/unit/lazy_data/test_as_lazy_data.py | 24 +- 8 files changed, 768 insertions(+), 80 deletions(-) create mode 100644 docs/src/techpapers/netcdf_io.rst create mode 100644 lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py diff --git a/docs/src/techpapers/index.rst b/docs/src/techpapers/index.rst index 773c8f7059..e97a87f39c 100644 --- a/docs/src/techpapers/index.rst +++ b/docs/src/techpapers/index.rst @@ -11,3 +11,4 @@ Extra information on specific technical issues. um_files_loading.rst missing_data_handling.rst + netcdf_io.rst diff --git a/docs/src/techpapers/netcdf_io.rst b/docs/src/techpapers/netcdf_io.rst new file mode 100644 index 0000000000..e151b2b7c1 --- /dev/null +++ b/docs/src/techpapers/netcdf_io.rst @@ -0,0 +1,140 @@ +.. testsetup:: chunk_control + + import iris + from iris.fileformats.netcdf.loader import CHUNK_CONTROL + + from pathlib import Path + import dask + import shutil + import tempfile + + tmp_dir = Path(tempfile.mkdtemp()) + tmp_filepath = tmp_dir / "tmp.nc" + + cube = iris.load(iris.sample_data_path("E1_north_america.nc"))[0] + iris.save(cube, tmp_filepath, chunksizes=(120, 37, 49)) + old_dask = dask.config.get("array.chunk-size") + dask.config.set({'array.chunk-size': '500KiB'}) + + +.. testcleanup:: chunk_control + + dask.config.set({'array.chunk-size': old_dask}) + shutil.rmtree(tmp_dir) + +.. _netcdf_io: + +============================= +NetCDF I/O Handling in Iris +============================= + +This document provides a basic account of how Iris loads and saves NetCDF files. + +.. admonition:: Under Construction + + This document is still a work in progress, so might include blank or unfinished sections, + watch this space! + + +Chunk Control +-------------- + +Default Chunking +^^^^^^^^^^^^^^^^ + +Chunks are, by default, optimised by Iris on load. This will automatically +decide the best chunksize for your data without any user input. This is +calculated based on a number of factors, including: + +- File Variable Chunking +- Full Variable Shape +- Dask Default Chunksize +- Dimension Order: Earlier (outer) dimensions will be prioritised to be split over later (inner) dimensions. + +.. doctest:: chunk_control + + >>> cube = iris.load_cube(tmp_filepath) + >>> + >>> print(cube.shape) + (240, 37, 49) + >>> print(cube.core_data().chunksize) + (60, 37, 49) + +For more user control, functionality was updated in :pull:`5588`, with the +creation of the :data:`iris.fileformats.netcdf.loader.CHUNK_CONTROL` class. + +Custom Chunking: Set +^^^^^^^^^^^^^^^^^^^^ + +There are three context manangers within :data:`~iris.fileformats.netcdf.loader.CHUNK_CONTROL`. The most basic is +:meth:`~iris.fileformats.netcdf.loader.ChunkControl.set`. This allows you to specify the chunksize for each dimension, +and to specify a ``var_name`` specifically to change. + +Using ``-1`` in place of a chunksize will ensure the chunksize stays the same +as the shape, i.e. no optimisation occurs on that dimension. + +.. doctest:: chunk_control + + >>> with CHUNK_CONTROL.set("air_temperature", time=180, latitude=-1, longitude=25): + ... cube = iris.load_cube(tmp_filepath) + >>> + >>> print(cube.core_data().chunksize) + (180, 37, 25) + +Note that ``var_name`` is optional, and that you don't need to specify every dimension. If you +specify only one dimension, the rest will be optimised using Iris' default behaviour. + +.. doctest:: chunk_control + + >>> with CHUNK_CONTROL.set(longitude=25): + ... cube = iris.load_cube(tmp_filepath) + >>> + >>> print(cube.core_data().chunksize) + (120, 37, 25) + +Custom Chunking: From File +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The second context manager is :meth:`~iris.fileformats.netcdf.loader.ChunkControl.from_file`. +This takes chunksizes as defined in the NetCDF file. Any dimensions without specified chunks +will default to Iris optimisation. + +.. doctest:: chunk_control + + >>> with CHUNK_CONTROL.from_file(): + ... cube = iris.load_cube(tmp_filepath) + >>> + >>> print(cube.core_data().chunksize) + (120, 37, 49) + +Custom Chunking: As Dask +^^^^^^^^^^^^^^^^^^^^^^^^ + +The final context manager, :meth:`~iris.fileformats.netcdf.loader.ChunkControl.as_dask`, bypasses +Iris' optimisation all together, and will take its chunksizes from Dask's behaviour. + +.. doctest:: chunk_control + + >>> with CHUNK_CONTROL.as_dask(): + ... cube = iris.load_cube(tmp_filepath) + >>> + >>> print(cube.core_data().chunksize) + (70, 37, 49) + + +Split Attributes +----------------- + +TBC + + +Deferred Saving +---------------- + +TBC + + +Guess Axis +----------- + +TBC diff --git a/docs/src/whatsnew/latest.rst b/docs/src/whatsnew/latest.rst index 93919216c7..3f2f9a1fd9 100644 --- a/docs/src/whatsnew/latest.rst +++ b/docs/src/whatsnew/latest.rst @@ -59,6 +59,10 @@ This document explains the changes made to Iris for this release intervention preventing :func:`~iris.util.guess_coord_axis` from acting on a coordinate. (:pull:`5551`) +#. `@pp-mo`_, `@trexfeathers`_ and `@ESadek-MO`_ added more control over + NetCDF chunking with the use of the :data:`iris.fileformats.netcdf.loader.CHUNK_CONTROL` + context manager. (:pull:`5588`) + 🐛 Bugs Fixed ============= @@ -118,6 +122,10 @@ This document explains the changes made to Iris for this release #. `@ESadek-MO`_ added a phrasebook for synonymous terms used in similar packages. (:pull:`5564`) +#. `@ESadek-MO`_ and `@trexfeathers`_ created a technical paper for NetCDF + saving and loading, :ref:`netcdf_io` with a section on chunking, and placeholders + for further topics. (:pull:`5588`) + 💼 Internal =========== diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index fb29f411d3..11477a2fa6 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -61,6 +61,7 @@ def _optimum_chunksize_internals( shape, limit=None, dtype=np.dtype("f4"), + dims_fixed=None, dask_array_chunksize=dask.config.get("array.chunk-size"), ): """ @@ -70,8 +71,8 @@ def _optimum_chunksize_internals( Args: - * chunks (tuple of int, or None): - Pre-existing chunk shape of the target data : None if unknown. + * chunks (tuple of int): + Pre-existing chunk shape of the target data. * shape (tuple of int): The full array shape of the target data. * limit (int): @@ -79,6 +80,11 @@ def _optimum_chunksize_internals( :mod:`dask.config`. * dtype (np.dtype): Numpy dtype of target data. + * dims_fixed (list of bool): + If set, a list of values equal in length to 'chunks' or 'shape'. + 'True' values indicate a dimension that can not be changed, i.e. that + element of the result must equal the corresponding value in 'chunks' or + data.shape. Returns: * chunk (tuple of int): @@ -99,6 +105,7 @@ def _optimum_chunksize_internals( "chunks = [c[0] for c in normalise_chunks('auto', ...)]". """ + # Set the chunksize limit. if limit is None: # Fetch the default 'optimal' chunksize from the dask config. @@ -108,58 +115,90 @@ def _optimum_chunksize_internals( point_size_limit = limit / dtype.itemsize - # Create result chunks, starting with a copy of the input. - result = list(chunks) - - if np.prod(result) < point_size_limit: - # If size is less than maximum, expand the chunks, multiplying later - # (i.e. inner) dims first. - i_expand = len(shape) - 1 - while np.prod(result) < point_size_limit and i_expand >= 0: - factor = np.floor(point_size_limit * 1.0 / np.prod(result)) - new_dim = result[i_expand] * int(factor) - if new_dim >= shape[i_expand]: - # Clip to dim size : chunk dims must not exceed the full shape. - new_dim = shape[i_expand] - else: - # 'new_dim' is less than the relevant dim of 'shape' -- but it - # is also the largest possible multiple of the input-chunks, - # within the size limit. - # So : 'i_expand' is the outer (last) dimension over which we - # will multiply the input chunks, and 'new_dim' is a value that - # ensures the fewest possible chunks within that dim. - - # Now replace 'new_dim' with the value **closest to equal-size - # chunks**, for the same (minimum) number of chunks. - # More-equal chunks are practically better. - # E.G. : "divide 8 into multiples of 2, with a limit of 7", - # produces new_dim=6, which would mean chunks of sizes (6, 2). - # But (4, 4) is clearly better for memory and time cost. - - # Calculate how many (expanded) chunks fit into this dimension. - dim_chunks = np.ceil(shape[i_expand] * 1.0 / new_dim) - # Get "ideal" (equal) size for that many chunks. - ideal_equal_chunk_size = shape[i_expand] / dim_chunks - # Use the nearest whole multiple of input chunks >= ideal. - new_dim = int( - result[i_expand] - * np.ceil(ideal_equal_chunk_size / result[i_expand]) - ) - - result[i_expand] = new_dim - i_expand -= 1 + if dims_fixed is not None: + if not np.any(dims_fixed): + dims_fixed = None + + if dims_fixed is None: + # Get initial result chunks, starting with a copy of the input. + working = list(chunks) + else: + # Adjust the operation to ignore the 'fixed' dims. + # (We reconstruct the original later, before return). + chunks = np.array(chunks) + dims_fixed_arr = np.array(dims_fixed) + # Reduce the target size by the fixed size of all the 'fixed' dims. + point_size_limit = point_size_limit // np.prod(chunks[dims_fixed_arr]) + # Work on only the 'free' dims. + original_shape = tuple(shape) + shape = tuple(np.array(shape)[~dims_fixed_arr]) + working = list(chunks[~dims_fixed_arr]) + + if len(working) >= 1: + if np.prod(working) < point_size_limit: + # If size is less than maximum, expand the chunks, multiplying + # later (i.e. inner) dims first. + i_expand = len(shape) - 1 + while np.prod(working) < point_size_limit and i_expand >= 0: + factor = np.floor(point_size_limit * 1.0 / np.prod(working)) + new_dim = working[i_expand] * int(factor) + if new_dim >= shape[i_expand]: + # Clip to dim size : must not exceed the full shape. + new_dim = shape[i_expand] + else: + # 'new_dim' is less than the relevant dim of 'shape' -- but + # it is also the largest possible multiple of the + # input-chunks, within the size limit. + # So : 'i_expand' is the outer (last) dimension over which + # we will multiply the input chunks, and 'new_dim' is a + # value giving the fewest possible chunks within that dim. + + # Now replace 'new_dim' with the value **closest to + # equal-size chunks**, for the same (minimum) number of + # chunks. More-equal chunks are practically better. + # E.G. : "divide 8 into multiples of 2, with a limit of 7", + # produces new_dim=6, meaning chunks of sizes (6, 2). + # But (4, 4) is clearly better for memory and time cost. + + # Calculate how many (expanded) chunks fit in this dim. + dim_chunks = np.ceil(shape[i_expand] * 1.0 / new_dim) + # Get "ideal" (equal) size for that many chunks. + ideal_equal_chunk_size = shape[i_expand] / dim_chunks + # Use the nearest whole multiple of input chunks >= ideal. + new_dim = int( + working[i_expand] + * np.ceil(ideal_equal_chunk_size / working[i_expand]) + ) + + working[i_expand] = new_dim + i_expand -= 1 + else: + # Similarly, reduce if too big, reducing earlier (outer) dims first. + i_reduce = 0 + while np.prod(working) > point_size_limit: + factor = np.ceil(np.prod(working) / point_size_limit) + new_dim = int(working[i_reduce] / factor) + if new_dim < 1: + new_dim = 1 + working[i_reduce] = new_dim + i_reduce += 1 + + working = tuple(working) + + if dims_fixed is None: + result = working else: - # Similarly, reduce if too big, reducing earlier (outer) dims first. - i_reduce = 0 - while np.prod(result) > point_size_limit: - factor = np.ceil(np.prod(result) / point_size_limit) - new_dim = int(result[i_reduce] / factor) - if new_dim < 1: - new_dim = 1 - result[i_reduce] = new_dim - i_reduce += 1 + # Reconstruct the original form + result = [] + for i_dim in range(len(original_shape)): + if dims_fixed[i_dim]: + dim = chunks[i_dim] + else: + dim = working[0] + working = working[1:] + result.append(dim) - return tuple(result) + return result @wraps(_optimum_chunksize_internals) @@ -168,6 +207,7 @@ def _optimum_chunksize( shape, limit=None, dtype=np.dtype("f4"), + dims_fixed=None, ): # By providing dask_array_chunksize as an argument, we make it so that the # output of _optimum_chunksize_internals depends only on its arguments (and @@ -177,11 +217,14 @@ def _optimum_chunksize( tuple(shape), limit=limit, dtype=dtype, + dims_fixed=dims_fixed, dask_array_chunksize=dask.config.get("array.chunk-size"), ) -def as_lazy_data(data, chunks=None, asarray=False): +def as_lazy_data( + data, chunks=None, asarray=False, dims_fixed=None, dask_chunking=False +): """ Convert the input array `data` to a :class:`dask.array.Array`. @@ -200,6 +243,16 @@ def as_lazy_data(data, chunks=None, asarray=False): If True, then chunks will be converted to instances of `ndarray`. Set to False (default) to pass passed chunks through unchanged. + * dims_fixed (list of bool): + If set, a list of values equal in length to 'chunks' or data.ndim. + 'True' values indicate a dimension which can not be changed, i.e. the + result for that index must equal the value in 'chunks' or data.shape. + + * dask_chunking (bool): + If True, Iris chunking optimisation will be bypassed, and dask's default + chunking will be used instead. Including a value for chunks while dask_chunking + is set to True will result in a failure. + Returns: The input array converted to a :class:`dask.array.Array`. @@ -211,24 +264,38 @@ def as_lazy_data(data, chunks=None, asarray=False): but reduced by a factor if that exceeds the dask default chunksize. """ - if chunks is None: - # No existing chunks : Make a chunk the shape of the entire input array - # (but we will subdivide it if too big). - chunks = list(data.shape) - - # Adjust chunk size for better dask performance, - # NOTE: but only if no shape dimension is zero, so that we can handle the - # PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0). - if all(elem > 0 for elem in data.shape): - # Expand or reduce the basic chunk shape to an optimum size. - chunks = _optimum_chunksize(chunks, shape=data.shape, dtype=data.dtype) - + if dask_chunking: + if chunks is not None: + raise ValueError( + f"Dask chunking chosen, but chunks already assigned value {chunks}" + ) + lazy_params = {"asarray": asarray, "meta": np.ndarray} + else: + if chunks is None: + # No existing chunks : Make a chunk the shape of the entire input array + # (but we will subdivide it if too big). + chunks = list(data.shape) + + # Adjust chunk size for better dask performance, + # NOTE: but only if no shape dimension is zero, so that we can handle the + # PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0). + if all(elem > 0 for elem in data.shape): + # Expand or reduce the basic chunk shape to an optimum size. + chunks = _optimum_chunksize( + chunks, + shape=data.shape, + dtype=data.dtype, + dims_fixed=dims_fixed, + ) + lazy_params = { + "chunks": chunks, + "asarray": asarray, + "meta": np.ndarray, + } if isinstance(data, ma.core.MaskedConstant): data = ma.masked_array(data.data, mask=data.mask) if not is_lazy_data(data): - data = da.from_array( - data, chunks=chunks, asarray=asarray, meta=np.ndarray - ) + data = da.from_array(data, **lazy_params) return data diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index f0ed111687..623d1eb6c7 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -12,7 +12,12 @@ Also : `CF Conventions `_. """ -from collections.abc import Iterable +from collections.abc import Iterable, Mapping +from contextlib import contextmanager +from copy import deepcopy +from enum import Enum, auto +import threading +from typing import Union import warnings import numpy as np @@ -204,6 +209,7 @@ def _get_cf_var_data(cf_var, filename): unnecessarily slow + wasteful of memory. """ + global CHUNK_CONTROL if hasattr(cf_var, "_data_array"): # The variable is not an actual netCDF4 file variable, but an emulating # object with an attached data array (either numpy or dask), which can be @@ -220,6 +226,8 @@ def _get_cf_var_data(cf_var, filename): else: # Get lazy chunked data out of a cf variable. + # Creates Dask wrappers around data arrays for any cube components which + # can have lazy values, e.g. Cube, Coord, CellMeasure, AuxiliaryVariable. dtype = _get_actual_dtype(cf_var) # Make a data-proxy that mimics array access and can fetch from the file. @@ -233,21 +241,59 @@ def _get_cf_var_data(cf_var, filename): ) # Get the chunking specified for the variable : this is either a shape, or # maybe the string "contiguous". - chunks = cf_var.cf_data.chunking() - # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. - if chunks == "contiguous": - chunks = None - - # Return a dask array providing deferred access. - result = as_lazy_data(proxy, chunks=chunks) - + if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK: + result = as_lazy_data(proxy, chunks=None, dask_chunking=True) + else: + chunks = cf_var.cf_data.chunking() + # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. + if chunks == "contiguous": + if ( + CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE + and isinstance( + cf_var, iris.fileformats.cf.CFDataVariable + ) + ): + raise KeyError( + f"{cf_var.cf_name} does not contain pre-existing chunk specifications." + f" Instead, you might wish to use CHUNK_CONTROL.set(), or just use default" + f" behaviour outside of a context manager. " + ) + # Equivalent to chunks=None, but value required by chunking control + chunks = list(cf_var.shape) + + # Modify the chunking in the context of an active chunking control. + # N.B. settings specific to this named var override global ('*') ones. + dim_chunks = CHUNK_CONTROL.var_dim_chunksizes.get( + cf_var.cf_name + ) or CHUNK_CONTROL.var_dim_chunksizes.get("*") + dims = cf_var.cf_data.dimensions + if CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE: + dims_fixed = np.ones(len(dims), dtype=bool) + elif not dim_chunks: + dims_fixed = None + else: + # Modify the chunks argument, and pass in a list of 'fixed' dims, for + # any of our dims which are controlled. + dims_fixed = np.zeros(len(dims), dtype=bool) + for i_dim, dim_name in enumerate(dims): + dim_chunksize = dim_chunks.get(dim_name) + if dim_chunksize: + if dim_chunksize == -1: + chunks[i_dim] = cf_var.shape[i_dim] + else: + chunks[i_dim] = dim_chunksize + dims_fixed[i_dim] = True + if dims_fixed is None: + dims_fixed = [dims_fixed] + result = as_lazy_data( + proxy, chunks=chunks, dims_fixed=tuple(dims_fixed) + ) return result class _OrderedAddableList(list): """ A custom container object for actions recording. - Used purely in actions debugging, to accumulate a record of which actions were activated. @@ -270,6 +316,18 @@ def add(self, msg): def _load_cube(engine, cf, cf_var, filename): + global CHUNK_CONTROL + + # Translate dimension chunk-settings specific to this cube (i.e. named by + # it's data-var) into global ones, for the duration of this load. + # Thus, by default, we will create any AuxCoords, CellMeasures et al with + # any per-dimension chunksizes specified for the cube. + these_settings = CHUNK_CONTROL.var_dim_chunksizes.get(cf_var.cf_name, {}) + with CHUNK_CONTROL.set(**these_settings): + return _load_cube_inner(engine, cf, cf_var, filename) + + +def _load_cube_inner(engine, cf, cf_var, filename): from iris.cube import Cube """Create the cube associated with the CF-netCDF data variable.""" @@ -614,3 +672,168 @@ def load_cubes(file_sources, callback=None, constraints=None): continue yield cube + + +class ChunkControl(threading.local): + class Modes(Enum): + DEFAULT = auto() + FROM_FILE = auto() + AS_DASK = auto() + + def __init__(self, var_dim_chunksizes=None): + """ + Provide user control of Dask chunking. + + The NetCDF loader is controlled by the single instance of this: the + :data:`~iris.fileformats.netcdf.loader.CHUNK_CONTROL` object. + + A chunk size can be set for a specific (named) file dimension, when + loading specific (named) variables, or for all variables. + + When a selected variable is a CF data-variable, which loads as a + :class:`~iris.cube.Cube`, then the given dimension chunk size is *also* + fixed for all variables which are components of that :class:`~iris.cube.Cube`, + i.e. any :class:`~iris.coords.Coord`, :class:`~iris.coords.CellMeasure`, + :class:`~iris.coords.AncillaryVariable` etc. + This can be overridden, if required, by variable-specific settings. + + For this purpose, :class:`~iris.experimental.ugrid.mesh.MeshCoord` and + :class:`~iris.experimental.ugrid.mesh.Connectivity` are not + :class:`~iris.cube.Cube` components, and chunk control on a + :class:`~iris.cube.Cube` data-variable will not affect them. + + """ + self.var_dim_chunksizes = var_dim_chunksizes or {} + self.mode = self.Modes.DEFAULT + + @contextmanager + def set( + self, + var_names: Union[str, Iterable[str]] = None, + **dimension_chunksizes: Mapping[str, int], + ) -> None: + """ + Control the Dask chunk sizes applied to NetCDF variables during loading. + + Parameters + ---------- + var_names : str or list of str, default=None + apply the `dimension_chunksizes` controls only to these variables, + or when building :class:`~iris.cube.Cube`\\ s from these data variables. + If ``None``, settings apply to all loaded variables. + dimension_chunksizes : dict of {str: int} + Kwargs specifying chunksizes for dimensions of file variables. + Each key-value pair defines a chunk size for a named file + dimension, e.g. ``{'time': 10, 'model_levels':1}``. + Values of ``-1`` will lock the chunk size to the full size of that + dimension. + + Notes + ----- + This function acts as a context manager, for use in a ``with`` block. + + >>> import iris + >>> from iris.fileformats.netcdf.loader import CHUNK_CONTROL + >>> with CHUNK_CONTROL.set("air_temperature", time=180, latitude=-1): + ... cube = iris.load(iris.sample_data_path("E1_north_america.nc"))[0] + + When `var_names` is present, the chunk size adjustments are applied + only to the selected variables. However, for a CF data variable, this + extends to all components of the (raw) :class:`~iris.cube.Cube` created + from it. + + **Un**-adjusted dimensions have chunk sizes set in the 'usual' way. + That is, according to the normal behaviour of + :func:`iris._lazy_data.as_lazy_data`, which is: chunk size is based on + the file variable chunking, or full variable shape; this is scaled up + or down by integer factors to best match the Dask default chunk size, + i.e. the setting configured by + ``dask.config.set({'array.chunk-size': '250MiB'})``. + + """ + old_mode = self.mode + old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) + if var_names is None: + var_names = ["*"] + elif isinstance(var_names, str): + var_names = [var_names] + try: + for var_name in var_names: + # Note: here we simply treat '*' as another name. + # A specific name match should override a '*' setting, but + # that is implemented elsewhere. + if not isinstance(var_name, str): + msg = ( + "'var_names' should be an iterable of strings, " + f"not {var_names!r}." + ) + raise ValueError(msg) + dim_chunks = self.var_dim_chunksizes.setdefault(var_name, {}) + for dim_name, chunksize in dimension_chunksizes.items(): + if not ( + isinstance(dim_name, str) + and isinstance(chunksize, int) + ): + msg = ( + "'dimension_chunksizes' kwargs should be a dict " + f"of `str: int` pairs, not {dimension_chunksizes!r}." + ) + raise ValueError(msg) + dim_chunks[dim_name] = chunksize + yield + finally: + self.var_dim_chunksizes = old_var_dim_chunksizes + self.mode = old_mode + + @contextmanager + def from_file(self) -> None: + """ + Ensures the chunk sizes are loaded in from NetCDF file variables. + + Raises + ------ + KeyError + If any NetCDF data variables - those that become + :class:`~iris.cube.Cube`\\ s - do not specify chunk sizes. + + Notes + ----- + This function acts as a context manager, for use in a ``with`` block. + """ + old_mode = self.mode + old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) + try: + self.mode = self.Modes.FROM_FILE + yield + finally: + self.mode = old_mode + self.var_dim_chunksizes = old_var_dim_chunksizes + + @contextmanager + def as_dask(self) -> None: + """ + Relies on Dask :external+dask:doc:`array` to control chunk sizes. + + Notes + ----- + This function acts as a context manager, for use in a ``with`` block. + """ + old_mode = self.mode + old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) + try: + self.mode = self.Modes.AS_DASK + yield + finally: + self.mode = old_mode + self.var_dim_chunksizes = old_var_dim_chunksizes + + +# Note: the CHUNK_CONTROL object controls chunk sizing in the +# :meth:`_get_cf_var_data` method. +# N.B. :meth:`_load_cube` also modifies this when loading each cube, +# introducing an additional context in which any cube-specific settings are +# 'promoted' into being global ones. + +#: The global :class:`ChunkControl` object providing user-control of Dask chunking +#: when Iris loads NetCDF files. +CHUNK_CONTROL: ChunkControl = ChunkControl() diff --git a/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py b/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py new file mode 100644 index 0000000000..7249c39829 --- /dev/null +++ b/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py @@ -0,0 +1,216 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the BSD license. +# See LICENSE in the root of the repository for full licensing details. +"""Unit tests for :class:`iris.fileformats.netcdf.loader.ChunkControl`.""" + +# Import iris.tests first so that some things can be initialised before +# importing anything else. +import iris.tests as tests # isort:skip +from unittest.mock import ANY, patch + +import dask +import numpy as np +import pytest + +import iris +from iris.cube import CubeList +from iris.fileformats.netcdf import loader +from iris.fileformats.netcdf.loader import CHUNK_CONTROL +import iris.tests.stock as istk + + +@pytest.fixture() +def save_cubelist_with_sigma(tmp_filepath): + cube = istk.simple_4d_with_hybrid_height() + cube_varname = "my_var" + sigma_varname = "my_sigma" + cube.var_name = cube_varname + cube.coord("sigma").var_name = sigma_varname + cube.coord("sigma").guess_bounds() + iris.save(cube, tmp_filepath) + return cube_varname, sigma_varname + + +@pytest.fixture +def save_cube_with_chunksize(tmp_filepath): + cube = istk.simple_3d() + # adding an aux coord allows us to test that + # iris.fileformats.netcdf.loader._get_cf_var_data() + # will only throw an error if from_file mode is + # True when the entire cube has no specified chunking + aux = iris.coords.AuxCoord( + points=np.zeros((3, 4)), + long_name="random", + units="1", + ) + cube.add_aux_coord(aux, [1, 2]) + iris.save(cube, tmp_filepath, chunksizes=(1, 3, 4)) + + +@pytest.fixture(scope="session") +def tmp_filepath(tmp_path_factory): + tmp_dir = tmp_path_factory.mktemp("data") + tmp_path = tmp_dir / "tmp.nc" + return str(tmp_path) + + +@pytest.fixture(autouse=True) +def remove_min_bytes(): + old_min_bytes = loader._LAZYVAR_MIN_BYTES + loader._LAZYVAR_MIN_BYTES = 0 + yield + loader._LAZYVAR_MIN_BYTES = old_min_bytes + + +def test_default(tmp_filepath, save_cubelist_with_sigma): + cube_varname, _ = save_cubelist_with_sigma + cubes = CubeList(loader.load_cubes(tmp_filepath)) + cube = cubes.extract_cube(cube_varname) + assert cube.shape == (3, 4, 5, 6) + assert cube.lazy_data().chunksize == (3, 4, 5, 6) + + sigma = cube.coord("sigma") + assert sigma.shape == (4,) + assert sigma.lazy_points().chunksize == (4,) + assert sigma.lazy_bounds().chunksize == (4, 2) + + +def test_control_global(tmp_filepath, save_cubelist_with_sigma): + cube_varname, _ = save_cubelist_with_sigma + with CHUNK_CONTROL.set(model_level_number=2): + cubes = CubeList(loader.load_cubes(tmp_filepath)) + cube = cubes.extract_cube(cube_varname) + assert cube.shape == (3, 4, 5, 6) + assert cube.lazy_data().chunksize == (3, 2, 5, 6) + + sigma = cube.coord("sigma") + assert sigma.shape == (4,) + assert sigma.lazy_points().chunksize == (2,) + assert sigma.lazy_bounds().chunksize == (2, 2) + + +def test_control_sigma_only(tmp_filepath, save_cubelist_with_sigma): + cube_varname, sigma_varname = save_cubelist_with_sigma + with CHUNK_CONTROL.set(sigma_varname, model_level_number=2): + cubes = CubeList(loader.load_cubes(tmp_filepath)) + cube = cubes.extract_cube(cube_varname) + assert cube.shape == (3, 4, 5, 6) + assert cube.lazy_data().chunksize == (3, 4, 5, 6) + + sigma = cube.coord("sigma") + assert sigma.shape == (4,) + assert sigma.lazy_points().chunksize == (2,) + # N.B. this does not apply to bounds array + assert sigma.lazy_bounds().chunksize == (4, 2) + + +def test_control_cube_var(tmp_filepath, save_cubelist_with_sigma): + cube_varname, _ = save_cubelist_with_sigma + with CHUNK_CONTROL.set(cube_varname, model_level_number=2): + cubes = CubeList(loader.load_cubes(tmp_filepath)) + cube = cubes.extract_cube(cube_varname) + assert cube.shape == (3, 4, 5, 6) + assert cube.lazy_data().chunksize == (3, 2, 5, 6) + + sigma = cube.coord("sigma") + assert sigma.shape == (4,) + assert sigma.lazy_points().chunksize == (2,) + assert sigma.lazy_bounds().chunksize == (2, 2) + + +def test_invalid_chunksize(tmp_filepath, save_cubelist_with_sigma): + with pytest.raises(ValueError): + with CHUNK_CONTROL.set(model_level_numer="2"): + CubeList(loader.load_cubes(tmp_filepath)) + + +def test_invalid_var_name(tmp_filepath, save_cubelist_with_sigma): + with pytest.raises(ValueError): + with CHUNK_CONTROL.set([1, 2], model_level_numer="2"): + CubeList(loader.load_cubes(tmp_filepath)) + + +def test_control_multiple(tmp_filepath, save_cubelist_with_sigma): + cube_varname, sigma_varname = save_cubelist_with_sigma + with CHUNK_CONTROL.set( + cube_varname, model_level_number=2 + ), CHUNK_CONTROL.set(sigma_varname, model_level_number=3): + cubes = CubeList(loader.load_cubes(tmp_filepath)) + cube = cubes.extract_cube(cube_varname) + assert cube.shape == (3, 4, 5, 6) + assert cube.lazy_data().chunksize == (3, 2, 5, 6) + + sigma = cube.coord("sigma") + assert sigma.shape == (4,) + assert sigma.lazy_points().chunksize == (3,) + assert sigma.lazy_bounds().chunksize == (2, 2) + + +def test_neg_one(tmp_filepath, save_cubelist_with_sigma): + cube_varname, _ = save_cubelist_with_sigma + with dask.config.set({"array.chunk-size": "50B"}): + with CHUNK_CONTROL.set(model_level_number=-1): + cubes = CubeList(loader.load_cubes(tmp_filepath)) + cube = cubes.extract_cube(cube_varname) + assert cube.shape == (3, 4, 5, 6) + # uses known good output + assert cube.lazy_data().chunksize == (1, 4, 1, 1) + + sigma = cube.coord("sigma") + assert sigma.shape == (4,) + assert sigma.lazy_points().chunksize == (4,) + assert sigma.lazy_bounds().chunksize == (4, 1) + + +def test_from_file(tmp_filepath, save_cube_with_chunksize): + with CHUNK_CONTROL.from_file(): + cube = next(loader.load_cubes(tmp_filepath)) + assert cube.shape == (2, 3, 4) + assert cube.lazy_data().chunksize == (1, 3, 4) + + +def test_no_chunks_from_file(tmp_filepath, save_cubelist_with_sigma): + cube_varname, _ = save_cubelist_with_sigma + with pytest.raises(KeyError): + with CHUNK_CONTROL.from_file(): + CubeList(loader.load_cubes(tmp_filepath)) + + +def test_as_dask(tmp_filepath, save_cubelist_with_sigma): + """ + This does not test return values, as we can't be sure + dask chunking behaviour won't change, or that it will differ + from our own chunking behaviour. + """ + message = "Mock called, rest of test unneeded" + with patch("iris.fileformats.netcdf.loader.as_lazy_data") as as_lazy_data: + as_lazy_data.side_effect = RuntimeError(message) + with CHUNK_CONTROL.as_dask(): + try: + CubeList(loader.load_cubes(tmp_filepath)) + except RuntimeError as e: + if str(e) != message: + raise e + as_lazy_data.assert_called_with(ANY, chunks=None, dask_chunking=True) + + +def test_pinned_optimisation(tmp_filepath, save_cubelist_with_sigma): + cube_varname, _ = save_cubelist_with_sigma + with dask.config.set({"array.chunk-size": "250B"}): + with CHUNK_CONTROL.set(model_level_number=2): + cubes = CubeList(loader.load_cubes(tmp_filepath)) + cube = cubes.extract_cube(cube_varname) + assert cube.shape == (3, 4, 5, 6) + # uses known good output + # known good output WITHOUT pinning: (1, 1, 5, 6) + assert cube.lazy_data().chunksize == (1, 2, 2, 6) + + sigma = cube.coord("sigma") + assert sigma.shape == (4,) + assert sigma.lazy_points().chunksize == (2,) + assert sigma.lazy_bounds().chunksize == (2, 2) + + +if __name__ == "__main__": + tests.main() diff --git a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py index 3c3cbff7f4..caece8b6bc 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py +++ b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py @@ -14,7 +14,7 @@ from iris._lazy_data import _optimum_chunksize import iris.fileformats.cf -from iris.fileformats.netcdf.loader import _get_cf_var_data +from iris.fileformats.netcdf.loader import CHUNK_CONTROL, _get_cf_var_data class Test__get_cf_var_data(tests.IrisTest): @@ -29,6 +29,7 @@ def _make( cf_data = mock.MagicMock( _FillValue=None, __getitem__="", + dimensions=["dim_" + str(x) for x in range(len(shape or "1"))], ) cf_data.chunking = mock.MagicMock(return_value=chunksizes) if shape is None: @@ -60,6 +61,16 @@ def test_cf_data_chunks(self): expected_chunks = _optimum_chunksize(chunks, self.shape) self.assertArrayEqual(lazy_data_chunks, expected_chunks) + def test_cf_data_chunk_control(self): + # more thorough testing can be found at `test__chunk_control` + chunks = [2500, 240, 200] + cf_var = self._make(shape=(2500, 240, 200), chunksizes=chunks) + with CHUNK_CONTROL.set(dim_0=25, dim_1=24, dim_2=20): + lazy_data = _get_cf_var_data(cf_var, self.filename) + lazy_data_chunks = [c[0] for c in lazy_data.chunks] + expected_chunks = (25, 24, 20) + self.assertArrayEqual(lazy_data_chunks, expected_chunks) + def test_cf_data_no_chunks(self): # No chunks means chunks are calculated from the array's shape by # `iris._lazy_data._optimum_chunksize()`. diff --git a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py index 0acb085830..2222d185c3 100644 --- a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py +++ b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py @@ -41,6 +41,25 @@ def test_non_default_chunks(self): (result,) = np.unique(lazy_data.chunks) self.assertEqual(result, 24) + def test_dask_chunking(self): + data = np.arange(24) + chunks = (12,) + optimum = self.patch("iris._lazy_data._optimum_chunksize") + optimum.return_value = chunks + _ = as_lazy_data(data, chunks=None, dask_chunking=True) + self.assertFalse(optimum.called) + + def test_dask_chunking_error(self): + data = np.arange(24) + chunks = (12,) + optimum = self.patch("iris._lazy_data._optimum_chunksize") + optimum.return_value = chunks + with self.assertRaisesRegex( + ValueError, + r"Dask chunking chosen, but chunks already assigned value", + ): + as_lazy_data(data, chunks=chunks, dask_chunking=True) + def test_with_masked_constant(self): masked_data = ma.masked_array([8], mask=True) masked_constant = masked_data[0] @@ -151,7 +170,10 @@ def test_default_chunks_limiting(self): limitcall_patch.call_args_list, [ mock.call( - list(test_shape), shape=test_shape, dtype=np.dtype("f4") + list(test_shape), + shape=test_shape, + dtype=np.dtype("f4"), + dims_fixed=None, ) ], )