From b67cbe1e1d8f2ce7d33747e00e8c5acff001071d Mon Sep 17 00:00:00 2001 From: scharlottej13 Date: Fri, 13 Sep 2024 15:01:13 -0700 Subject: [PATCH 1/9] First pass at updates to Dask page in Xarray docs --- doc/conf.py | 2 +- doc/user-guide/dask.rst | 425 +++++++++++++++------------------------- doc/user-guide/io.rst | 2 + 3 files changed, 162 insertions(+), 267 deletions(-) diff --git a/doc/conf.py b/doc/conf.py index e418045207c..78bcb2bf996 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -58,7 +58,7 @@ ] ) -nbsphinx_allow_errors = False +nbsphinx_allow_errors = True nbsphinx_requirejs_path = "" # -- General configuration ------------------------------------------------ diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 5c421aa51d8..1287f34a9cc 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -2,195 +2,151 @@ .. _dask: -Parallel computing with Dask +Parallel Computing with Dask ============================ -Xarray integrates with `Dask `__ to support parallel -computations and streaming computation on datasets that don't fit into memory. -Currently, Dask is an entirely optional feature for xarray. However, the -benefits of using Dask are sufficiently strong that Dask may become a required -dependency in a future version of xarray. +`Dask `__, a general purpose library for parallel computing, integrates with Xarray to handle larger-than-memory computations. -For a full example of how to use xarray's Dask integration, read the -`blog post introducing xarray and Dask`_. More up-to-date examples -may be found at the `Pangeo project's gallery `_ -and at the `Dask examples website `_. +If you’ve been using Xarray to read in large datasets or split up data across a number of files, you may already be using Dask: -.. _blog post introducing xarray and Dask: https://stephanhoyer.com/2015/06/11/xray-dask-out-of-core-labeled-arrays/ - -What is a Dask array? ---------------------- +.. code-block:: python -.. image:: ../_static/dask_array.png - :width: 40 % - :align: right - :alt: A Dask array + import xarray -Dask divides arrays into many small pieces, called *chunks*, each of which is -presumed to be small enough to fit into memory. + ds = xr.open_zarr("/path/to/data.zarr") + timeseries = ds["temp"].mean(dim=["x", "y"]).compute() # Compute result -Unlike NumPy, which has eager evaluation, operations on Dask arrays are lazy. -Operations queue up a series of tasks mapped over blocks, and no computation is -performed until you actually ask values to be computed (e.g., to print results -to your screen or write to disk). At that point, data is loaded into memory -and computation proceeds in a streaming fashion, block-by-block. +Using Dask with Xarray feels similar to working with NumPy arrays, but on much larger datasets. The Dask integration is transparent, so you don’t need to manage the parallelism directly; Xarray and Dask handle these aspects behind the scenes. This makes it easy to write code that scales from small, in-memory datasets on a single machine to large datasets that are distributed across a cluster, with minimal code changes. -The actual computation is controlled by a multi-processing or thread pool, -which allows Dask to take full advantage of multiple processors available on -most modern computers. +Examples +-------- -For more details, read the `Dask documentation `__. -Note that xarray only makes use of ``dask.array`` and ``dask.delayed``. +- `Zonal averaging with the NOAA National Water Model `_ +- `CMIP6 Precipitation Frequency Analysis `_ +- `Using Dask + Cloud Optimized GeoTIFFs `_ +- `Xarray + Dask Tutorial `_ -.. _dask.io: -Reading and writing data ------------------------- +Using Dask with Xarray +---------------------- -The usual way to create a ``Dataset`` filled with Dask arrays is to load the -data from a netCDF file or files. You can do this by supplying a ``chunks`` -argument to :py:func:`~xarray.open_dataset` or using the -:py:func:`~xarray.open_mfdataset` function. +.. image:: https://docs.dask.org/en/stable/_images/dask-array.svg + :width: 50 % + :align: right + :alt: A Dask array -.. ipython:: python - :suppress: +Xarray operations on Dask-backed arrays are lazy. This means computations are not executed immediately, but are instead queued up as tasks in a Dask graph. - import os +When a result is requested (e.g., for plotting, saving, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently. - import numpy as np - import pandas as pd - import xarray as xr +Nearly all Xarray methods have been extended to work automatically with Dask Arrays. This includes things like indexing, concatenating, rechunking, grouped operations, etc. Common operations are covered in more detail in each of the sections below. - np.random.seed(123456) - np.set_printoptions(precision=3, linewidth=100, threshold=100, edgeitems=3) - - ds = xr.Dataset( - { - "temperature": ( - ("time", "latitude", "longitude"), - np.random.randn(30, 180, 180), - ), - "time": pd.date_range("2015-01-01", periods=30), - "longitude": np.arange(180), - "latitude": np.arange(89.5, -90.5, -1), - } - ) - ds.to_netcdf("example-data.nc") +.. _dask.io: -.. ipython:: python +Reading and writing data +~~~~~~~~~~~~~~~~~~~~~~~~ - ds = xr.open_dataset("example-data.nc", chunks={"time": 10}) - ds +When reading data, Dask automatically divides your dataset into smaller chunks. These chunks are essentially small, manageable blocks of the larger dataset. You can specify the size of chunks with the ``chunks`` argument (see the `Dask Array docs on chunks `_). -In this example ``latitude`` and ``longitude`` do not appear in the ``chunks`` -dict, so only one chunk will be used along those dimensions. It is also -entirely equivalent to opening a dataset using :py:func:`~xarray.open_dataset` -and then chunking the data using the ``chunk`` method, e.g., -``xr.open_dataset('example-data.nc').chunk({'time': 10})``. +.. tab:: Zarr -To open multiple files simultaneously in parallel using Dask delayed, -use :py:func:`~xarray.open_mfdataset`:: + The `Zarr `_ format is ideal for working with large datasets. Each chunk is stored in a separate file, allowing parallel reading and writing with Dask. When you open a Zarr dataset with :py:func:`~xarray.open_zarr`, it is loaded as a Dask array by default:: - xr.open_mfdataset('my/files/*.nc', parallel=True) + ds = xr.open_zarr("path/to/directory.zarr") + + Save data to a local Zarr dataset:: -This function will automatically concatenate and merge datasets into one in -the simple cases that it understands (see :py:func:`~xarray.combine_by_coords` -for the full disclaimer). By default, :py:func:`~xarray.open_mfdataset` will chunk each -netCDF file into a single Dask array; again, supply the ``chunks`` argument to -control the size of the resulting Dask arrays. In more complex cases, you can -open each file individually using :py:func:`~xarray.open_dataset` and merge the result, as -described in :ref:`combining data`. Passing the keyword argument ``parallel=True`` to -:py:func:`~xarray.open_mfdataset` will speed up the reading of large multi-file datasets by -executing those read tasks in parallel using ``dask.delayed``. + ds.to_zarr("path/to/directory.zarr") + + Or to an S3 bucket:: + + ds.to_zarr("s3://my-bucket/data.zarr") + + See :ref:`io.zarr` for more details. -.. warning:: +.. tab:: NetCDF - :py:func:`~xarray.open_mfdataset` called without ``chunks`` argument will return - dask arrays with chunk sizes equal to the individual files. Re-chunking - the dataset after creation with ``ds.chunk()`` will lead to an ineffective use of - memory and is not recommended. + Open a single netCDF file with :py:func:`~xarray.open_dataset` and supplying a ``chunks`` argument:: -You'll notice that printing a dataset still shows a preview of array values, -even if they are actually Dask arrays. We can do this quickly with Dask because -we only need to compute the first few values (typically from the first block). -To reveal the true nature of an array, print a DataArray: + ds = xr.open_dataset("example-data.nc", chunks={"time": 10}) + + Or open multiple files with py:func:`~xarray.open_mfdataset`:: -.. ipython:: python + xr.open_mfdataset('my/files/*.nc', parallel=True) - ds.temperature + .. tip:: + + When reading in many netCDF files with py:func:`~xarray.open_mfdataset`, using ``engine=h5netcdf`` can + be faster than the default which uses the netCDF4 package. + + Saving a larger-than-memory netCDF file:: -Once you've manipulated a Dask array, you can still write a dataset too big to -fit into memory back to disk by using :py:meth:`~xarray.Dataset.to_netcdf` in the -usual way. + delayed_write = ds.to_netcdf("my-big-file.nc", compute=False) + delayed_write.compute() + + .. note:: -.. ipython:: python + When using Dask’s distributed scheduler to write NETCDF4 files, it may be necessary to set the environment variable ``HDF5_USE_FILE_LOCKING=FALSE`` to avoid competing locks within the HDF5 SWMR file locking scheme. Note that writing netCDF files with Dask’s distributed scheduler is only supported for the netcdf4 backend. - ds.to_netcdf("manipulated-example-data.nc") + See :ref:`io.netcdf` for more details. + +.. tab:: HDF5 -By setting the ``compute`` argument to ``False``, :py:meth:`~xarray.Dataset.to_netcdf` -will return a ``dask.delayed`` object that can be computed later. + Open HDF5 files with :py:func:`~xarray.open_dataset`:: -.. ipython:: python + xr.open_dataset("/path/to/my/file.h5", chunks='auto') - from dask.diagnostics import ProgressBar + See :ref:`io.hdf5` for more details. - # or distributed.progress when using the distributed scheduler - delayed_obj = ds.to_netcdf("manipulated-example-data.nc", compute=False) - with ProgressBar(): - results = delayed_obj.compute() +.. tab:: GeoTIFF -.. ipython:: python - :suppress: + Open large geoTIFF files with rioxarray:: - os.remove("manipulated-example-data.nc") # Was not opened. + xds = rioxarray.open_rasterio("my-satellite-image.tif", chunks='auto') -.. note:: + See :ref:`io.rasterio` for more details. - When using Dask's distributed scheduler to write NETCDF4 files, - it may be necessary to set the environment variable `HDF5_USE_FILE_LOCKING=FALSE` - to avoid competing locks within the HDF5 SWMR file locking scheme. Note that - writing netCDF files with Dask's distributed scheduler is only supported for - the `netcdf4` backend. -A dataset can also be converted to a Dask DataFrame using :py:meth:`~xarray.Dataset.to_dask_dataframe`. +Loading Dask Arrays +~~~~~~~~~~~~~~~~~~~ .. ipython:: python - :okwarning: - - df = ds.to_dask_dataframe() - df + :suppress: -Dask DataFrames do not support multi-indexes so the coordinate variables from the dataset are included as columns in the Dask DataFrame. + import os + import numpy as np + import pandas as pd + import xarray as xr -Using Dask with xarray ----------------------- + np.random.seed(123456) + np.set_printoptions(precision=3, linewidth=100, threshold=100, edgeitems=3) -Nearly all existing xarray methods (including those for indexing, computation, -concatenating and grouped operations) have been extended to work automatically -with Dask arrays. When you load data as a Dask array in an xarray data -structure, almost all xarray operations will keep it as a Dask array; when this -is not possible, they will raise an exception rather than unexpectedly loading -data into memory. Converting a Dask array into memory generally requires an -explicit conversion step. One notable exception is indexing operations: to -enable label based indexing, xarray will automatically load coordinate labels -into memory. + ds = xr.Dataset( + { + "temperature": ( + ("time", "latitude", "longitude"), + np.random.randn(30, 180, 180), + ), + "time": pd.date_range("2015-01-01", periods=30), + "longitude": np.arange(180), + "latitude": np.arange(89.5, -90.5, -1), + } + ) + ds.to_netcdf("example-data.nc") -.. tip:: +There are a couple common cases where you may want to convert lazy Dask arrays into eager, in-memory Xarray data structures: - By default, dask uses its multi-threaded scheduler, which distributes work across - multiple cores and allows for processing some datasets that do not fit into memory. - For running across a cluster, `setup the distributed scheduler `_. +- You want to inspect smaller intermediate results when working interactively or debugging +- You've reduced the dataset (by filtering or with a groupby, for example) and now have something much smaller that fits in memory -The easiest way to convert an xarray data structure from lazy Dask arrays into -*eager*, in-memory NumPy arrays is to use the :py:meth:`~xarray.Dataset.load` method: +To do this, you can use :py:meth:`~xarray.Dataset.load`, which is similar to :py:meth:`~xarray.Dataset.compute`, but instead changes results in-place: .. ipython:: python ds.load() -You can also access :py:attr:`~xarray.DataArray.values`, which will always be a -NumPy array: +You can also access :py:attr:`~xarray.DataArray.values`, which will always be a NumPy array: .. ipython:: :verbatim: @@ -202,125 +158,73 @@ NumPy array: ... # truncated for brevity -Explicit conversion by wrapping a DataArray with ``np.asarray`` also works: - -.. ipython:: - :verbatim: - - In [5]: np.asarray(ds.temperature) - Out[5]: - array([[[ 4.691e-01, -2.829e-01, ..., -5.577e-01, 3.814e-01], - [ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00], - ... - -Alternatively you can load the data into memory but keep the arrays as -Dask arrays using the :py:meth:`~xarray.Dataset.persist` method: +NumPy ufuncs like ``np.sin`` transparently work on all xarray objects, including those +that store lazy Dask arrays: .. ipython:: python - persisted = ds.persist() - -:py:meth:`~xarray.Dataset.persist` is particularly useful when using a -distributed cluster because the data will be loaded into distributed memory -across your machines and be much faster to use than reading repeatedly from -disk. + import numpy as np -.. warning:: + np.sin(ds) - On a single machine :py:meth:`~xarray.Dataset.persist` will try to load all of - your data into memory. You should make sure that your dataset is not larger than - available memory. +To access Dask arrays directly, use the +:py:attr:`DataArray.data ` attribute. This attribute exposes +array data either as a Dask array or as a NumPy array, depending on whether it has been +loaded into Dask or not. .. note:: - For more on the differences between :py:meth:`~xarray.Dataset.persist` and - :py:meth:`~xarray.Dataset.compute` see this `Stack Overflow answer on the differences between client persist and client compute `_ and the `Dask documentation `_. - -For performance you may wish to consider chunk sizes. The correct choice of -chunk size depends both on your data and on the operations you want to perform. -With xarray, both converting data to a Dask arrays and converting the chunk -sizes of Dask arrays is done with the :py:meth:`~xarray.Dataset.chunk` method: - -.. ipython:: python - - rechunked = ds.chunk({"latitude": 100, "longitude": 100}) - -.. warning:: - - Rechunking an existing dask array created with :py:func:`~xarray.open_mfdataset` - is not recommended (see above). - -You can view the size of existing chunks on an array by viewing the -:py:attr:`~xarray.Dataset.chunks` attribute: - -.. ipython:: python + ``.data`` is also used to expose other "computable" array backends beyond Dask and + NumPy (e.g. sparse and pint arrays). - rechunked.chunks +If you're using a Dask cluster, you can also use :py:meth:`~xarray.Dataset.persist` for quickly accessing intermediate outputs. This is most helpful after expensive operations like rechunking or setting an index. It's a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. You will get back a new Dask array that is semantically equivalent to your old array, but now points to running data. -If there are not consistent chunksizes between all the arrays in a dataset -along a particular dimension, an exception is raised when you try to access -``.chunks``. +.. code-block:: python -.. note:: + ds = ds.persist() - In the future, we would like to enable automatic alignment of Dask - chunksizes (but not the other way around). We might also require that all - arrays in a dataset share the same chunking alignment. Neither of these - are currently done. +.. _dask.chunks: -NumPy ufuncs like ``np.sin`` transparently work on all xarray objects, including those -that store lazy Dask arrays: - -.. ipython:: python - - import numpy as np +Chunking and performance +~~~~~~~~~~~~~~~~~~~~~~~~ - np.sin(rechunked) +The way a dataset is chunked can be critical to performance when working with large datasets. You'll want have chunk sizes large enough to reduce the number of chunks that Dask has to think about (to reduce overhead from the task graph) but also small enough so that many of them can fit in memory at once. -To access Dask arrays directly, use the -:py:attr:`DataArray.data ` attribute. This attribute exposes -array data either as a Dask array or as a NumPy array, depending on whether it has been -loaded into Dask or not: +.. tip:: -.. ipython:: python + A good rule of thumb is to create arrays with a minimum chunk size of at least one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), you may need larger chunks. See `Choosing good chunk sizes in Dask `_. - ds.temperature.data +It can be helpful to choose chunk sizes based on your downstream analyses and to chunk as early as possible. Datasets with smaller chunks along the time axis, for example, can make time domain problems easier to parallelize since Dask can perform the same operation on each time chunk. If you're working with a large dataset with chunks that make downstream analyses challenging, you may need to rechunk your data. This is an expensive operation though, so is only recommended when needed. -.. note:: +You can rechunk a dataset by: - ``.data`` is also used to expose other "computable" array backends beyond Dask and - NumPy (e.g. sparse and pint arrays). +- Specifying ``chunks={}`` when reading in your dataset. If you know you'll want to do some spatial subsetting, for example, you could use ``chunks={'latitude': 10, 'longitude': 10}`` to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. +- Using :py:meth:`Dataset.chunk` after you've already read in your dataset. For time domain problems, for example, you can use ``ds.chunk(time=TimeResampler())`` to rechunk according to a specified unit of time. ``ds.chunk(time=TimeResampler("MS"))``, for example, will set the chunks so that a month of data is contained in one chunk. .. _dask.automatic-parallelization: -Automatic parallelization with ``apply_ufunc`` and ``map_blocks`` ------------------------------------------------------------------ - -.. tip:: - - Some problems can become embarrassingly parallel and thus easy to parallelize - automatically by rechunking to a frequency, e.g. ``ds.chunk(time=TimeResampler("YE"))``. - See :py:meth:`Dataset.chunk` for more. +Parallelize custom functions with ``apply_ufunc`` and ``map_blocks`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Almost all of xarray's built-in operations work on Dask arrays. If you want to -use a function that isn't wrapped by xarray, and have it applied in parallel on +Almost all of Xarray's built-in operations work on Dask arrays. If you want to +use a function that isn't wrapped by Xarray, and have it applied in parallel on each block of your xarray object, you have three options: -1. Extract Dask arrays from xarray objects (``.data``) and use Dask directly. +1. Extract Dask Arrays from xarray objects with ``.data`` and use Dask directly. 2. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays. 3. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks` to apply functions that consume and return xarray objects. ``apply_ufunc`` -~~~~~~~~~~~~~~~ +############### :py:func:`~xarray.apply_ufunc` automates `embarrassingly parallel `__ "map" type operations where a function written for processing NumPy arrays should be repeatedly -applied to xarray objects containing Dask arrays. It works similarly to +applied to Xarray objects containing Dask Arrays. It works similarly to :py:func:`dask.array.map_blocks` and :py:func:`dask.array.blockwise`, but without -requiring an intermediate layer of abstraction. +requiring an intermediate layer of abstraction. See the `Dask documentation `_ for more details. For the best performance when using Dask's multi-threaded scheduler, wrap a function that already releases the global interpreter lock, which fortunately @@ -415,9 +319,7 @@ application. .. tip:: - For the majority of NumPy functions that are already wrapped by Dask, it's - usually a better idea to use the pre-existing ``dask.array`` function, by - using either a pre-existing xarray methods or + When possible, it's recommended to use pre-existing ``dask.array`` functions, either with existing xarray methods or :py:func:`~xarray.apply_ufunc()` with ``dask='allowed'``. Dask can often have a more efficient implementation that makes use of the specialized structure of a problem, unlike the generic speedups offered by @@ -425,10 +327,10 @@ application. ``map_blocks`` -~~~~~~~~~~~~~~ +############## -Functions that consume and return xarray objects can be easily applied in parallel using :py:func:`map_blocks`. -Your function will receive an xarray Dataset or DataArray subset to one chunk +Functions that consume and return Xarray objects can be easily applied in parallel using :py:func:`map_blocks`. +Your function will receive an Xarray Dataset or DataArray subset to one chunk along each chunked dimension. .. ipython:: python @@ -455,7 +357,7 @@ Notice that the :py:meth:`map_blocks` call printed ``func`` is received 0-sized blocks! :py:meth:`map_blocks` needs to know what the final result looks like in terms of dimensions, shapes etc. It does so by running the provided function on 0-shaped inputs (*automated inference*). This works in many cases, but not all. If automatic inference does not -work for your function, provide the ``template`` kwarg (see below). +work for your function, provide the ``template`` kwarg (see `below `). In this case, automatic inference has worked so let's check that the result is as expected. @@ -469,7 +371,6 @@ This executes the Dask graph in `serial` using a for loop, but allows for printi debugging techniques. We can easily see that our function is receiving blocks of shape 10x180x180 and the returned result is identical to ``ds.time`` as expected. - Here is a common example where automated inference will not work. .. ipython:: python @@ -489,6 +390,8 @@ what the function returns) with dimensions, shapes, chunk sizes, attributes, coo variables that look exactly like the expected result. The variables should be dask-backed and hence not incur much memory cost. +.. _template-note: + .. note:: Note that when ``template`` is provided, ``attrs`` from ``template`` are copied over to the result. Any @@ -533,61 +436,51 @@ Notice that the 0-shaped sizes were not printed to screen. Since ``template`` ha As :py:func:`map_blocks` loads each block into memory, reduce as much as possible objects consumed by user functions. For example, drop useless variables before calling ``func`` with :py:func:`map_blocks`. +Deploying Dask +-------------- +By default, Dask distributes work across multiple cores on a single machine, which lets you process as much data as you can download and fit onto your hard drive. However, this has two limitations: -Chunking and performance ------------------------- +- You are limited by the size of your hard drive +- Downloading data can be slow and expensive -The ``chunks`` parameter has critical performance implications when using Dask -arrays. If your chunks are too small, queueing up operations will be extremely -slow, because Dask will translate each operation into a huge number of -operations mapped across chunks. Computation on Dask arrays with small chunks -can also be slow, because each operation on a chunk has some fixed overhead from -the Python interpreter and the Dask task executor. +Instead, it can be faster and cheaper to run your computations close to where your data is stored, distributed across many machines on a Dask cluster. Often, this means deploying Dask on HPC clusters or on the cloud. See the `Dask documentation `_ for more details. -Conversely, if your chunks are too big, some of your computation may be wasted, -because Dask only computes results one chunk at a time. -A good rule of thumb is to create arrays with a minimum chunksize of at least -one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), the -cost of queueing up Dask operations can be noticeable, and you may need even -larger chunksizes. +Best Practices +-------------- -.. tip:: +Dask is pretty easy to use but there are some gotchas, many of which are under active development. Here are some tips we have found through experience. We also recommend checking out the `Dask best practices `_. - Check out the `dask documentation on chunks `_. +.. code-block:: python -.. tip:: + from flox.xarray import xarray_reduce + import xarray - Many time domain problems become amenable to an embarrassingly parallel or blockwise solution - (e.g. using :py:func:`xarray.map_blocks`, :py:func:`dask.array.map_blocks`, or - :py:func:`dask.array.blockwise`) by rechunking to a frequency along the time dimension. - Provide :py:class:`xarray.groupers.TimeResampler` objects to :py:meth:`Dataset.chunk` to do so. - For example ``ds.chunk(time=TimeResampler("MS"))`` will set the chunks so that a month of - data is contained in one chunk. The resulting chunk sizes need not be uniform, depending on - the frequency of the data, and the calendar. + ds = xr.open_zarr( # Since we're doing a spatial reduction, increase chunk size in x, y + "my-data.zarr", + chunks={'x': 100, 'y': 100} + ) + time_subset = ds.sea_temperature.sel( + time=slice("2020-01-01", "2020-12-31") # Filter early + ) + + zonal_mean = xarray_reduce( # Faster groupby with flox + time_subset, + chunked_zones, + func="mean", + expected_groups=(zone_labels,), + ) -Optimization Tips ------------------ + zonal_mean.load() # Pull smaller results into memory after reducing the dataset -With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance -can become very slow or memory hungry in certain cases. Here are some optimization tips we have found -through experience: -1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). +1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). 2. More generally, ``groupby()`` is a costly operation and will perform a lot better if the ``flox`` package is installed. See the `flox documentation `_ for more. By default Xarray will use ``flox`` if installed. 3. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) -4. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load subsets of data which span multiple chunks. On individual files, prefer to subset before chunking (suggestion 1). - -5. Chunk as early as possible, and avoid rechunking as much as possible. Always pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid redundant file reads. - -6. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` can be quicker than the default ``engine='netcdf4'`` that uses the netCDF4 package. - -7. Find `best practices specific to Dask arrays in the documentation `_. - -8. The `dask diagnostics `_ can be useful in identifying performance bottlenecks. +4. Use the `Dask dashboard `_ to identify performance bottlenecks. diff --git a/doc/user-guide/io.rst b/doc/user-guide/io.rst index fabff1000d7..d40f2bd4a34 100644 --- a/doc/user-guide/io.rst +++ b/doc/user-guide/io.rst @@ -768,6 +768,8 @@ To read back a zarr dataset that has been created this way, we use the ds_zarr = xr.open_zarr("path/to/directory.zarr") ds_zarr +.. _io.zarr.cloud: + Cloud Storage Buckets ~~~~~~~~~~~~~~~~~~~~~ From fb51f80b94f7fef7f64496050fce8251e6a828f4 Mon Sep 17 00:00:00 2001 From: scharlottej13 Date: Fri, 13 Sep 2024 18:09:01 -0700 Subject: [PATCH 2/9] cleanup --- doc/conf.py | 2 +- doc/user-guide/dask.rst | 2 +- doc/user-guide/io.rst | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/doc/conf.py b/doc/conf.py index 78bcb2bf996..e418045207c 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -58,7 +58,7 @@ ] ) -nbsphinx_allow_errors = True +nbsphinx_allow_errors = False nbsphinx_requirejs_path = "" # -- General configuration ------------------------------------------------ diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 1287f34a9cc..15989eb5a7e 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -5,7 +5,7 @@ Parallel Computing with Dask ============================ -`Dask `__, a general purpose library for parallel computing, integrates with Xarray to handle larger-than-memory computations. +`Dask `_, a general purpose library for parallel computing, integrates with Xarray to handle larger-than-memory computations. If you’ve been using Xarray to read in large datasets or split up data across a number of files, you may already be using Dask: diff --git a/doc/user-guide/io.rst b/doc/user-guide/io.rst index d40f2bd4a34..fabff1000d7 100644 --- a/doc/user-guide/io.rst +++ b/doc/user-guide/io.rst @@ -768,8 +768,6 @@ To read back a zarr dataset that has been created this way, we use the ds_zarr = xr.open_zarr("path/to/directory.zarr") ds_zarr -.. _io.zarr.cloud: - Cloud Storage Buckets ~~~~~~~~~~~~~~~~~~~~~ From 4727192a0ddc1d809c67c93341a58778289ccd8d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 14 Sep 2024 01:17:58 +0000 Subject: [PATCH 3/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- doc/user-guide/dask.rst | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 15989eb5a7e..a7cb0b5676c 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -14,7 +14,7 @@ If you’ve been using Xarray to read in large datasets or split up data across import xarray ds = xr.open_zarr("/path/to/data.zarr") - timeseries = ds["temp"].mean(dim=["x", "y"]).compute() # Compute result + timeseries = ds["temp"].mean(dim=["x", "y"]).compute() # Compute result Using Dask with Xarray feels similar to working with NumPy arrays, but on much larger datasets. The Dask integration is transparent, so you don’t need to manage the parallelism directly; Xarray and Dask handle these aspects behind the scenes. This makes it easy to write code that scales from small, in-memory datasets on a single machine to large datasets that are distributed across a cluster, with minimal code changes. @@ -53,15 +53,15 @@ When reading data, Dask automatically divides your dataset into smaller chunks. The `Zarr `_ format is ideal for working with large datasets. Each chunk is stored in a separate file, allowing parallel reading and writing with Dask. When you open a Zarr dataset with :py:func:`~xarray.open_zarr`, it is loaded as a Dask array by default:: ds = xr.open_zarr("path/to/directory.zarr") - + Save data to a local Zarr dataset:: ds.to_zarr("path/to/directory.zarr") - + Or to an S3 bucket:: - + ds.to_zarr("s3://my-bucket/data.zarr") - + See :ref:`io.zarr` for more details. .. tab:: NetCDF @@ -69,27 +69,27 @@ When reading data, Dask automatically divides your dataset into smaller chunks. Open a single netCDF file with :py:func:`~xarray.open_dataset` and supplying a ``chunks`` argument:: ds = xr.open_dataset("example-data.nc", chunks={"time": 10}) - + Or open multiple files with py:func:`~xarray.open_mfdataset`:: xr.open_mfdataset('my/files/*.nc', parallel=True) .. tip:: - + When reading in many netCDF files with py:func:`~xarray.open_mfdataset`, using ``engine=h5netcdf`` can be faster than the default which uses the netCDF4 package. - + Saving a larger-than-memory netCDF file:: delayed_write = ds.to_netcdf("my-big-file.nc", compute=False) delayed_write.compute() - + .. note:: When using Dask’s distributed scheduler to write NETCDF4 files, it may be necessary to set the environment variable ``HDF5_USE_FILE_LOCKING=FALSE`` to avoid competing locks within the HDF5 SWMR file locking scheme. Note that writing netCDF files with Dask’s distributed scheduler is only supported for the netcdf4 backend. See :ref:`io.netcdf` for more details. - + .. tab:: HDF5 Open HDF5 files with :py:func:`~xarray.open_dataset`:: @@ -457,23 +457,22 @@ Dask is pretty easy to use but there are some gotchas, many of which are under a from flox.xarray import xarray_reduce import xarray - ds = xr.open_zarr( # Since we're doing a spatial reduction, increase chunk size in x, y - "my-data.zarr", - chunks={'x': 100, 'y': 100} + ds = xr.open_zarr( # Since we're doing a spatial reduction, increase chunk size in x, y + "my-data.zarr", chunks={"x": 100, "y": 100} ) time_subset = ds.sea_temperature.sel( time=slice("2020-01-01", "2020-12-31") # Filter early ) - zonal_mean = xarray_reduce( # Faster groupby with flox + zonal_mean = xarray_reduce( # Faster groupby with flox time_subset, chunked_zones, func="mean", expected_groups=(zone_labels,), ) - zonal_mean.load() # Pull smaller results into memory after reducing the dataset + zonal_mean.load() # Pull smaller results into memory after reducing the dataset 1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). From 0305f22095320a6fb41b5a08249e40b93e35394d Mon Sep 17 00:00:00 2001 From: scharlottej13 Date: Fri, 13 Sep 2024 18:37:37 -0700 Subject: [PATCH 4/9] Fix internal references --- doc/user-guide/dask.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 15989eb5a7e..8b028d454c2 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -5,7 +5,7 @@ Parallel Computing with Dask ============================ -`Dask `_, a general purpose library for parallel computing, integrates with Xarray to handle larger-than-memory computations. +`Dask `__, a general purpose library for parallel computing, integrates with Xarray to handle larger-than-memory computations. If you’ve been using Xarray to read in large datasets or split up data across a number of files, you may already be using Dask: @@ -224,7 +224,7 @@ each block of your xarray object, you have three options: where a function written for processing NumPy arrays should be repeatedly applied to Xarray objects containing Dask Arrays. It works similarly to :py:func:`dask.array.map_blocks` and :py:func:`dask.array.blockwise`, but without -requiring an intermediate layer of abstraction. See the `Dask documentation `_ for more details. +requiring an intermediate layer of abstraction. See the `Dask documentation `__ for more details. For the best performance when using Dask's multi-threaded scheduler, wrap a function that already releases the global interpreter lock, which fortunately @@ -357,7 +357,7 @@ Notice that the :py:meth:`map_blocks` call printed ``func`` is received 0-sized blocks! :py:meth:`map_blocks` needs to know what the final result looks like in terms of dimensions, shapes etc. It does so by running the provided function on 0-shaped inputs (*automated inference*). This works in many cases, but not all. If automatic inference does not -work for your function, provide the ``template`` kwarg (see `below `). +work for your function, provide the ``template`` kwarg (see :ref:`below `). In this case, automatic inference has worked so let's check that the result is as expected. @@ -444,7 +444,7 @@ By default, Dask distributes work across multiple cores on a single machine, whi - You are limited by the size of your hard drive - Downloading data can be slow and expensive -Instead, it can be faster and cheaper to run your computations close to where your data is stored, distributed across many machines on a Dask cluster. Often, this means deploying Dask on HPC clusters or on the cloud. See the `Dask documentation `_ for more details. +Instead, it can be faster and cheaper to run your computations close to where your data is stored, distributed across many machines on a Dask cluster. Often, this means deploying Dask on HPC clusters or on the cloud. See the `Dask documentation `__ for more details. Best Practices From f3f85e88b34df6c0d32b863b8843f6c7f99930d9 Mon Sep 17 00:00:00 2001 From: Sarah Charlotte Johnson Date: Mon, 16 Sep 2024 13:51:00 -0700 Subject: [PATCH 5/9] Apply suggestions from code review Co-authored-by: Tom Nicholas Co-authored-by: Stephan Hoyer --- doc/user-guide/dask.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index 9db44588902..d71e27e3235 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -11,12 +11,12 @@ If you’ve been using Xarray to read in large datasets or split up data across .. code-block:: python - import xarray + import xarray as xr ds = xr.open_zarr("/path/to/data.zarr") timeseries = ds["temp"].mean(dim=["x", "y"]).compute() # Compute result -Using Dask with Xarray feels similar to working with NumPy arrays, but on much larger datasets. The Dask integration is transparent, so you don’t need to manage the parallelism directly; Xarray and Dask handle these aspects behind the scenes. This makes it easy to write code that scales from small, in-memory datasets on a single machine to large datasets that are distributed across a cluster, with minimal code changes. +Using Dask with Xarray feels similar to working with NumPy arrays, but on much larger datasets. The Dask integration is transparent, so you usually don’t need to manage the parallelism directly; Xarray and Dask handle these aspects behind the scenes. This makes it easy to write code that scales from small, in-memory datasets on a single machine to large datasets that are distributed across a cluster, with minimal code changes. Examples -------- From 289ee75fb576cdc6c52e812e065c653ae7e39e5d Mon Sep 17 00:00:00 2001 From: scharlottej13 Date: Mon, 16 Sep 2024 16:01:20 -0700 Subject: [PATCH 6/9] Update Dask Array image --- doc/_static/dask-array.svg | 349 +++++++++++++++++++++++++++++++++++++ doc/_static/dask_array.png | Bin 13488 -> 0 bytes 2 files changed, 349 insertions(+) create mode 100644 doc/_static/dask-array.svg delete mode 100644 doc/_static/dask_array.png diff --git a/doc/_static/dask-array.svg b/doc/_static/dask-array.svg new file mode 100644 index 00000000000..bdf33c0ac70 --- /dev/null +++ b/doc/_static/dask-array.svg @@ -0,0 +1,349 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/doc/_static/dask_array.png b/doc/_static/dask_array.png deleted file mode 100644 index 7ddb6e400ef3c8e0a84adfa3263122f9a170df9d..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 13488 zcmdUW2T)V%+HMq;t=JWijvycsdhZG-A{_*zO9@Ety{U+RG-&~(sR)QjF(LHcJJJmy z6zMIYw@~hvy*=kYXWw)Fa%c9Pd&faSX4cBL*81M}eOf}*ROBz5qdo_LKrSdel+%Df zPB?>~2fv*G?|AfFM}S{vOcmwjAjiai@2j$7ArMB0g515wo+&F>zsCb^W5;WA=Y^e_ z3qw9;of&(3+Z0V*=4H)!+R#m~vTa~cSv-kdFD<{T+<*YHdyMOioptvr8i`PpMROrJ z#L@5UN6Kw-gcXCcnyd|T~WERMzjk%tUUU>#y)5hOaxm8o%6PFnWOH;`!t*r|rE%RJs zx+JyB$j;7gP~{!}=FPdj{(kVs+U9!WS>jS3>w527p3ESOA0HaiJWb}eRA~?jYcfri z^eMhj7(pZ49hiH~^7bE}ugQ_;c6_+Au{W3QvcG9n?tvdG#9E`SGBBtb8PQX68N8Pb zzDRG7DI%l0ADIkQOx3f>=}}daky*ivy21W~Qeddn9+%w6z~-Xk0ye_U!cB z9C0aTC_S&aRF6bK8+djC`$aW)^%QG|M$~*oeaX1LO){_9>pYv`e4&7V0DW`wPn;|) zP1_6E>E7N}KX1Vn#O*&q-M(qcHBPt^|DS){dSC9mN{|6~;{fAbkFD@PKCOFMoTja3 z^`p?_t5Ma6eKJCT6&$#TwB3*~($;Pk9{24Z>VL%1)YvF6k_C-W02jXx#T?|5hc(rQ zUSk_^e&qCGb=A?-%xtBYUC~SfEHD_l;Y#=r67p?fTlUJ849%OAS9nFpov&O8{PW9F zp#aAD?;m8#jR$MbT%eZgNs|-}IFjJ5IY4A5Fmn#%XtEt`NFQIjt4nS0?AfzVis!rz zr>c}C_kHr1+)g@hDc#l*JAy-+|Nc!b zBZ-#%t%YzJ%AW321`$aWn|<%Tp}c_EHa3P&2t<9qqLm;l2U?H`rP(NxF?y1y`k?vI z9gdE(WCW@cCtg5KojD^WsSOU&aCE$F*-gvFY!;NA(08X@Ls#g&u8_r<*4EZqb3Va6 zlvO`#g6g6QjnLPR2*gXT4WnC(jBmex=Yhz`)Thwr3i>Ur{8*DBqX;+a`-Dl7^tlnR z*TInU#d_}wI(kh*gSEe4Jk|GL&F4CKs@0l7+U%0<5T@8p7Macy4>KEs&s6)Re{)@c zW+^M0f)gkx(sU5;1vakKZK=BNPUIVuovdA*Y)F#wE63{C+J04JbIo>{tG2|Ct!yrN zu!V*yK;-0HcPEE$XSdiW8$a*t6s%lp#|Ig5^wF#MQ`P_xeD|#k(yXeYd57p%aB+W&VC*3kz1$)3omu6ZAtv%OXJk zj*b2n6VE}3LAhDNk06rN%4wYEma#ickDS0Re#lyxuOaHLG&PT=#| z{`s0j$sVTz)#Z!juHH?w>Zkagr*A9k;{mfeQ|h{K6{4=rDPHdt7D!rC3mV#|Po&hv zTe#HPpG^hVsHrdBH5xOa_vLvZ>Y}miABRBOO6-iDZ;ryN+T%X362#2rx;Y1i^c0j+ zm0ewh-{VHdGNOj)GLN@Qpe2!(d`V>l+~i|zZO7f!6WPOftLl_Yw5{pJtnk_2etWw1 zQ!AWa^00g*K6A%nFU#FVh5NLOQb!lhoG&GN-k-g(JW|1-$bT0M2S{&kFBeT| zUVMDK<4-v*&`q-qD;T1jZ{A#?5_5grfG2EH)xJ6#<-BbxMI)SV|Kq5QEopjs*>bcr zsj=mfHKKt3BEyj|B#`vD&)&z@)|iP|JUStv0d$Q~F(aA!r;zG>>w^BJom<8|edc@{ z?Hn{-8(pe5#9Tkg`o(2>SfEqBe0c<^Z(ycbagm2xJg={pflzSjH6nYYI%d0)X}XhX z?)&@G3nfR4RT0JuR|aydT=8RJm!y3YDk~eVQ&O(uDmdmxwzMreJ=pBWRj-E=S=9h~-b1xG&W$5p)NjC&9) zahbe`q9>1Zs}Iqy|{hqYh7 zNG~yuXdBJ>`t{WF?G2M)?yO^*6~nPH9ITowV=&VsV$$! zx!-<+ya(WBQi|lvfoHHvwu+%)c^YUyU$r^N$#rN}DtyauA^1+eLQbk43#7WHCUzAP zqz>*b=Le`^gQ2b-G~r3PD+QYh?Mx_}(ZmsFh|?oR326j)8}Jto;;)fQ>C$<*&N_`{#N)!10wE)Gd)Pem86Cn*G56nqshdW)BX0xeQ95 zmaJ_q^h<1X@&NHOFK`*zEGF0|EZWj2&-~}NX8uZQtPx;6J!|~5Gb{)5UB|G41@UbMA ziHQ`Z#DVFi3C|^IiSZ9oxYoW(i!N$04_xO;$+(KMGoS41^EbJ;B1uRuzKx4(tn}J2 z5K0o?Baps*%ai!=BY%o?Krz}~P;l_8VMX}+_m}B?Y!w=|<>~h>jg(xnY4H9>!zp8SyfUVDkz+- z@W8XOu?06a%E7sfyVE3nemi^C32lxOGJnWd>t^h*w~(!Sonn^Xq&`FLzMK6IdV2b{ z={A>~&$k`clZ^|G{OY&3ab;->{giwGM`3dE9yXqLI0fB=xGM|krE3@*6rEj_T;RG@wOMwfFfTpW)YtM#I-hNh*lN8~;xr`&C6;rqJ> zk-j0(%mv3&@A%Ec_~-`?Gjcwcw(qV*iEK@TmXuNE)WEEhXYM#w7m)m2AS7^iZi1W77iqNL#y}qMY3Iwzg}Ad=~ne zaJRTjGy83*-%la4wm!KBdfIHa zp>?V7%wZhjF+5TYE-mp9uLxS4*J6%NTlWzqop?Fxb8i$%$fDao4nKC*NOJbaH~x^2 z`R_j|$-Kw_?4bj+r|>OFV-w3&%7poKr@)slsU*GM4M$O3yC$y`ZezqZZ3U3dSO{XX zPLqR)NtoteqS$|pZU2IIH+BXZEqi9wjf@tFXq1KJw6Z-nH}_1x>hVBxZZsT>|9a`PHPG}O5F73NHhQ!o11)%r{gl6l5B^?ZwlJq!K+5mOBSMS0jC=5?}(=0OHw*TN=kBDaf*=NMFs$F>EpCN&~6L(^-VWV z`_ZFE5>@A|Zc0l_GchxtfGv=8aAv+JWP=TO^E_)m zt7nvMbc>fcPM%au;M&@B{uJ~vBg1PUGXWBpS%-2MKe7a;fWctC8(qSUjg6y)D@|dP zqHc?E99iU-N`3bbSiCYTe_fpky8g`%N>fJAi`hS;6VWQGMB}R}Cx1W{Ee- z%j-C+a24|aBP&y1c6@}Ue-d5P7|)TgOBf%wa!yAGz|$=jzRA$j(yGYG6=i2pvYkG2 z#>e|CQ{QCgM(UWMlTg_E5a5ON8>Y9 zDc0>UN6Fdg{vlzMJ>LNAoN8`Dy3CQPMD@R`PAhU-O7~iCBLyvLbyGu6S@%wd4SPqk zZ~j|KidiBaZEtt3ulIj&_bwyk#fujzQG?*1>$J4(N&GEstxqNwGK=1T5>i%&MM|qE5?_Gup|*T!?`KyFC+iu7lzmJr@VM7S-y!nxu`cb2ggss4w!TfDqypNE88 zls=%I+4Rg?&@U)2fAcqR{*!kRW|2P`A?&ZJ-fOkLT}6`I)ePr!K-!{dziQjaipRD! zg2wwf4}}2tO<%Hr4Rrd*$ZWuXvY^8Ss%(;y5h*Dot_#i_yu2k&`jRQvIxP}9fan#?#F=W{;VenBI_cAVMN>58T|yxvNDImX`w%W-WwpYQfA8 zhdlxQ42I>K6Ud&UU|poY{L`muVRPH3jJx~LYjK&94(-mh&nrrO%i_>TN@3ck&Mq~k zZ81r{YfUXm$;Hp@v8FV?gXP*x#RN#m8$hzUX=o*OPy74(-(r#cUYO^BKSjaqpZfi~ zBV>L3;>JYaSuhSwJs#CtAlF7+_-{y8OP7@ezgu29k3=`lb~#+Vd>OESTmaX+bB}Dr zJ?uY#f&&xdV@AaZx4Cl+enq&?$0ao|G)g$BbJTCg@eT)vi0|&*+}Xb``u`5DS7PNa zs6<2yj~~4HAS++Is=AfY5PW>0cA~DP2t!YUNoZ|VnVy+(+*%~B@H=}p@Wk$7ZdG1d z4d)&Jf>f%y-j?vLoG)KaF-k}vdZj$ft*Kj0qAw zkF7YqfKf?(_h1KCwY%Ej(bgfZoXd`BMAxmPb%E0u3S+ESZ%gyXKbP~X$ogxnmg*OcwWsz z6^D8gMh2cC2X_fj2~ZUu?N3#w!9c&pYkAPDPSHyHzbbP{FR8dt%8qMApk<66KR$K$ z*|XmX1R&=SgHp9=HGlmv~)TdCD#u-3BP@T9*K?f zkcUt*#8g~-Lp(Ygo(bwmxWDE>wNiWg@%{J6)W3~)1xAmP%jDQNIv9|uA3vT|sdp9Mvf5g0AiIP}nC~{!v29xMEJG*`Hu{;Be$KIf_gTCM0wG)T+ z;q_Xdih~<5a@s)^;RkS0mly#5*Z{4Jj^Qg)PftlXgq)`atjflY?AKH3bnZ#Bdy=OO zy3`ZYqNq{8p>9P%XJ_;94dR^>6-@%lriO+_Jq}yZ*4B2Dmp6K*BLNBIMj06yz(gu8 zP{o{g#8OW9?psj=1??#%uHIHxPo*gi4wmQhS~mc7mwYfgoEwntco^nmP}j?wt!mtQ zu*ZFs4x2a0UNSI%u3WA6J~&u*aJj=>Qs^*l>AKJt_~S=4m%feu+7>kR1GGxGloM-A8gM+;XZ}%XYjvR3VnWp{J)t9Z14i z{1!_6Q8QUt-K>Ic+ir88Yf``HmjKb+u04(|k#8ncJ4)1fS~i>(y(`d;a>pG!fAc0c zoZ&b`pTXZTA^E1Fd@~7bp-VR_vr0~HMDb$JTt3uq(Qo?2i+czA!~A8AA6o4k-f-L& z32HNE;$5>ZpzAkfXW_8n1od)@P1Ut6(N0-+NF5$ohDL7Y$Xs-j$J0xC$0qHub*FMT zF8t8aBKJMqN;pp|(uix3cA(*BHane}Ntq<>kzn{TXsU$pko+AWAkm%8FmLT6992WYd@VQCDYalNzE)^dx86UR; zCj2|z2iigz5Qhq%Ky7~%))8sNo5)D}+tclRKSoL%7#3sxW;e(xDTO!q9UPqGNp_>D zPzMq>5EW_cc6Kg((l7BRLU%bgx6`Ljx4aw8hDOX}>v}Istsfl*C@6)yc$A0C0luK3 zlG<)xRApFk`RcV`pQrcxuIl6)SQo$L_?;Z=KZO2;IR4L6VJ%}aF%kifmDvT-H*aI;JmY4KGDyg-<6SVfB?b5#rx~f z=II~*5}W7kuQ|6w?&uairHy69b}yqFb~}uZ9%{uOUXfyf@g44C+gccE+a$fs$$6SlQZn_^r&|ds-{D}ADcR-4e=`f{ zF9a5we!wm&xPJEk^=V%P7HA~0BV?bCtos&Y&v z(UyQL(#u;OEerbk6_p~sBN-h{$IdP~ZtDZH>Pbepx=Z|nTHnnuS=m+>Xgo1DC$G^* zaslJLqXM+_;jFo48|ii{??U z9Z&q6!aoIc0wUrk)|0bF^bx>?gBb-xD3Zl)a;g1iSoLFvfDVL>!~a0VUs(3UwfCI| zj_CQjuC7HLXY6fYFqRaNXvG4fL#tt2ONPP2hd!?_(haT>r2NkC0AVaeGP$QYBA2E( zU|j)>I0BWhJ?q+9AfO@Vw6x+OuY%>p2Jx$t0VF)0tK2KaEzR33e28maUZOy$XtFo{ zCEPnGCM;d4PRndFpN-cMTAxi$=W(5X7`zYASto*Js*D+!2qq>sAz>7pKhc)yu#MTS z3v*o&Ow!UlgHGomkn6g{EWoAYt_S$BLx&&kivt3{3~mZ$s8$392mpRwJ6Lkp6>koH z3yGOx!8DQRj^2#mkkC+OU{+-BQ(R9u`D|C$=IAsKgqK;ZVeP0X7FflJs3Uxl?|GZ59p=vaqI;cY#6dP_;#IjaA7Jz|zNh-d7x# zL#Y29y2o@&>p?T2#z)tIZRNEAHxVL;4j#9YlL4}K>hx*hk>P4T>>y#?ISjkKkH3Go z)BH<_ZEo$0CVo5>BR_xa=%_(^d;2b~0`GaWH&^Dn$ETU4G&xZhNMso8?T;YO#--{% zY_F|(@c17%h)_h$U8Q(y$uFAfJp8cdicNQ(%fw*LG%1Om_>9ghELZIs8n>*B!RjLpfyCtO75 zPbEd9{hDfJ?cq+8L$An(4<JsOWZZN{s+z*c6Nhz zhtEYqll>4boAbR80QY|v*F8P4T2mbfigeb-M)oEvBQRjRT_GP#+8Of+8N|p`Z;o>T^(JZJW7Rey)P5G5y4noVIu9$e5IKsC0 zEhRbmiObZ~i@^9`2gDyj$#q?Hz#m(l0K9GJ#vHfVWW6gGtzL&)b3la{D@6v5dCdt% z(sfvXt>(X#o0CVKd(_fFLh6pOlLmJhnEIPIZ2H7g!dKwul7jt_t=)wk`WyVuL=yM3 zi@(1V2q_d|JVAIN&1;ql@*u)_#ZC&>_ut0*rRI_V%o4%mt5g#dWW(}0QOV8Srn($kE2%RG8!5D_U+{a;?d*BbFWt>C!>|pBof}dapvPs zuBp{OmYUqB+!4qwEyax;0N-zU1cmAy22jPryYnA){`SA?{L%zWN_z+AGG>Pgs6UKC z#dx=ttp}i&@e{QH;809dLl0wPV--|X+|kd|Hfu?y(4|h~z)S(Ac+2X>hQokGXW~LQ zRsSY)Z%z&pc(Hz1(M5;s#i6`8H%#U)K%a-{pJ|R3Kl!R_o<|{T0h$G@k=P2A>B`qoIl&pAE2%>np^;#Ye(JOdjAU6-i z>oUs(X5H!{Z%btAJvlkjgqkOx^Vor2`WtK^Fo>3siz~XcR4yTk=KA%{`QH9Y|IWlj z5mVFAotO{BpP=zG5w{f?B!IT-j8y;k@sz(n!vAvuMkGL0*%qa(X=u1WRH{)ZL1p`b zoE%o^<76SDh@!_Zm=}@|m-+2LKr!L^I)K*Esj14Io}~?dfTO#jnJYk+#ZwJ7LtcI4 z9h{wTlj}V+qJt$!0tyX(K|(4vAjHG-7HnH$OlAUayraV^B9iEXuNrOucELF^GMilo zn3e6((NR2Wa+k%%ri`52r>6Da(?Sy?2d0*QstF1z8;e}SQ{K1{Qd1)h`t}SU8pM!| znxi96AnBzuYVb3bLdh5qv%!y5C7&jv;IaL|qg`O+hb;~O6abC1j3P?*iBf)IJ6I0} zim;#yG(w)`T(S2Fx6y z>aS@0GauRoN#HU($p|EfnQNZ)#dgPC{i@Q9O<{9SYNS$~rhcN!Hb}BttdxyaN6NQ) zB=)8QK_{ZJR0S#yW@hpH=TB1z@L~+5?&)(tJ3-`!B|d)URnXzr$$82f^9SmM zf_{?hA|i2MQUZ5@m?0tuW?J$`prDifO)Yg@{ti}@)H?ZSwpE075|=afc(a} zbYDbhl~0MwzAHU&?1*>^xXU;AEvQ*FJM3?ae4Fw++6myl4VAd>>3M4hEBqRSR{-Z3 z_|6Kr2XH)RrlQ8pMoY*0%3O9Ry8g~T3hl8Bp6hKoH>4y6?bHA6JV+gRGLQQu$ZBb^ zfIj>5N1@ZFtnOm*i0N){|NbLNVdS$SNbkRE;=MUv2j|o`5K`9{@xb|POf%7cC^K|JSo9l`f8kx+w_DaCE0jR zMT~1cc70dvhp)9jeuH&jp!!0jxq~C$yUqE4kVR{4lV^+FpZnt)SO!wL{Rk^My5JmL z#lWBO(uDtKl*mxu!(!pm+oU9P;pFbxR3!mdQMumEp_IfI1Tu9oKqu_aj4ki@cQGV7 z(e<15Rf74G++4Mx*UBUgH_fH?ZpBjZQ(U24-c^HV5zxZUUJI z@I6%!Z+P^mTNq=cI|6JM((|QZ0FHrJm{Y?ER^c5EI5eq^(%84N$ck)!h_mK{A3Lm~KwuD^-X`H#V|NkL)eLVG=G}D!wQIg&9sw;X-=&G z?lq#v0mf%&B<{>WDu?f8Z+XVi_z@g7JZ8@4vV&~^PTR!lB#BDFc)zcx;-4*&eYfAi z{W@jMLyO*z!_;p??)o0kod+>2NVe)68s(t_?@GH&{SVSJC_xaWEZbZhOm|=LmuL@q zRb)B;UHw80zTXl8@I^Zsxo10CT5dvMO9yT~GTmv);wpnQ;Bk3w%TI)|RRiQfis|D= zG4a8j|C%3hXk)j1B^6`aIynk#r@MruP3FHxU))kBMl)OhFg@|xr6}7XB2f2{wNzKX z^4x^WbM~Jdk7f|ZTbm5VJL9V|K$rT7_0ai_TFUE8T#Hj2jt2__;p7mY2ZKPxHDJ0} zrW;0p0F!mf%a@vv3+c@PUw{V!#N4d+6LHw|8rAeu5S7&ZV)B5O#dAx#C*|bFe?Ins zEmY|mEj5N3MAA#jDJjtc?jbOO2M`lTTY%uADuNIGBd{`4&YUhD4>vmXPU@r)2Z$a5 zMP0YjlR8(28#vA?j~~Y&9Vhh?(gP&C*7c6C1pry8aBpHp)Ir3H8hsj%ks%@ z+>m?v^zG!)8b?QST&4{G`eB)InTnR`YL0J$$c(d$YqA`!I{mkjDAzAvhTK(FMm6}I zIlsI1QJ$EPs?PETo={`+Hf(#j383fK;4*_5j+%0Kfg)6>m-qOwfn5=PeWqOX=kS=0 z4Tuip)pn$ut@B!c$@@BCUfRj4lOJmFAATcpG(uoRu zN=1n8f5E=6xq`Yw*fR$rEl~_6xY4;4dlLIMz&K0ViI#uherlpVh$(oaK!1mCt|uLK zbO3`IB&vYkUr=*Ii$V?2c-RMh{8%otygb#kC2d46Qi!o9fX+?apdR?62wXrFQ0G9X zE>*AGAnKQ?^slp2Qktzk+ux7;X7L$P0cP4R{QPTxyf%WZ~@zZ>FDF9o^94j#S%HHe8b`6;V#DGw)hPt|ng~f&{Qd>vogmiUjw&>3IF(V1-(aViFGEv`M z#dGJ@!8%&6sUAp!k?pX`{<%+NdX6TH8R@bmf2yA82L0 znTnT8%?L&w2EC Date: Mon, 16 Sep 2024 16:01:38 -0700 Subject: [PATCH 7/9] Updates after review --- doc/user-guide/dask.rst | 82 +++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 35 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index d71e27e3235..b57ca28b0fe 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -5,7 +5,7 @@ Parallel Computing with Dask ============================ -`Dask `__, a general purpose library for parallel computing, integrates with Xarray to handle larger-than-memory computations. +Xarray integrates with `Dask `__, a general purpose library for parallel computing, to handle larger-than-memory computations. If you’ve been using Xarray to read in large datasets or split up data across a number of files, you may already be using Dask: @@ -21,20 +21,25 @@ Using Dask with Xarray feels similar to working with NumPy arrays, but on much l Examples -------- +If you're new to using Xarray with Dask, we recommend the `Xarray + Dask Tutorial `_. + +Here are some examples for using Xarray with Dask at scale: + - `Zonal averaging with the NOAA National Water Model `_ - `CMIP6 Precipitation Frequency Analysis `_ - `Using Dask + Cloud Optimized GeoTIFFs `_ -- `Xarray + Dask Tutorial `_ Using Dask with Xarray ---------------------- -.. image:: https://docs.dask.org/en/stable/_images/dask-array.svg +.. image:: ../_static/dask-array.svg :width: 50 % :align: right :alt: A Dask array +Dask divides arrays into smaller parts called chunks. These chunks are small, manageable pieces of the larger dataset, that Dask is able to process in parallel (see the `Dask Array docs on chunks `_). + Xarray operations on Dask-backed arrays are lazy. This means computations are not executed immediately, but are instead queued up as tasks in a Dask graph. When a result is requested (e.g., for plotting, saving, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently. @@ -46,22 +51,16 @@ Nearly all Xarray methods have been extended to work automatically with Dask Arr Reading and writing data ~~~~~~~~~~~~~~~~~~~~~~~~ -When reading data, Dask automatically divides your dataset into smaller chunks. These chunks are essentially small, manageable blocks of the larger dataset. You can specify the size of chunks with the ``chunks`` argument (see the `Dask Array docs on chunks `_). +When reading data, Dask divides your dataset into smaller chunks. You can specify the size of chunks with the ``chunks`` argument. .. tab:: Zarr - The `Zarr `_ format is ideal for working with large datasets. Each chunk is stored in a separate file, allowing parallel reading and writing with Dask. When you open a Zarr dataset with :py:func:`~xarray.open_zarr`, it is loaded as a Dask array by default:: + The `Zarr `_ format is ideal for working with large datasets. Each chunk is stored in a separate file, allowing parallel reading and writing with Dask. You can also use Zarr to read/write directly from cloud storage buckets (see the `Dask documentation on connecting to remote data `__) + + When you open a Zarr dataset with :py:func:`~xarray.open_zarr`, it is loaded as a Dask array by default (if Dask is installed):: ds = xr.open_zarr("path/to/directory.zarr") - Save data to a local Zarr dataset:: - - ds.to_zarr("path/to/directory.zarr") - - Or to an S3 bucket:: - - ds.to_zarr("s3://my-bucket/data.zarr") - See :ref:`io.zarr` for more details. .. tab:: NetCDF @@ -70,7 +69,7 @@ When reading data, Dask automatically divides your dataset into smaller chunks. ds = xr.open_dataset("example-data.nc", chunks={"time": 10}) - Or open multiple files with py:func:`~xarray.open_mfdataset`:: + Or open multiple files in parallel with py:func:`~xarray.open_mfdataset`:: xr.open_mfdataset('my/files/*.nc', parallel=True) @@ -79,7 +78,11 @@ When reading data, Dask automatically divides your dataset into smaller chunks. When reading in many netCDF files with py:func:`~xarray.open_mfdataset`, using ``engine=h5netcdf`` can be faster than the default which uses the netCDF4 package. - Saving a larger-than-memory netCDF file:: + Save larger-than-memory netCDF files:: + + ds.to_netcdf("my-big-file.nc") + + Or set ``compute=False`` to return a dask.delayed object that can be computed later:: delayed_write = ds.to_netcdf("my-big-file.nc", compute=False) delayed_write.compute() @@ -135,16 +138,21 @@ Loading Dask Arrays ) ds.to_netcdf("example-data.nc") -There are a couple common cases where you may want to convert lazy Dask arrays into eager, in-memory Xarray data structures: +There are a few common cases where you may want to convert lazy Dask arrays into eager, in-memory Xarray data structures: - You want to inspect smaller intermediate results when working interactively or debugging - You've reduced the dataset (by filtering or with a groupby, for example) and now have something much smaller that fits in memory +- You need to compute intermediate results since Dask is unable (or struggles) to perform a certain computation. The canonical example of this is normalizing a dataset, e.g., ``ds - ds.mean()``, when ``ds`` is larger than memory. Typically, you should either save ``ds`` to disk or compute ``ds.mean()`` eagerly. -To do this, you can use :py:meth:`~xarray.Dataset.load`, which is similar to :py:meth:`~xarray.Dataset.compute`, but instead changes results in-place: +To do this, you can use :py:meth:`~xarray.Dataset.compute`: .. ipython:: python - ds.load() + ds.compute() + +.. note:: + + Using :py:meth:`~xarray.Dataset.compute` is preferred to :py:meth:`~xarray.Dataset.load`, which changes the results in-place. You can also access :py:attr:`~xarray.DataArray.values`, which will always be a NumPy array: @@ -188,7 +196,7 @@ If you're using a Dask cluster, you can also use :py:meth:`~xarray.Dataset.persi Chunking and performance ~~~~~~~~~~~~~~~~~~~~~~~~ -The way a dataset is chunked can be critical to performance when working with large datasets. You'll want have chunk sizes large enough to reduce the number of chunks that Dask has to think about (to reduce overhead from the task graph) but also small enough so that many of them can fit in memory at once. +The way a dataset is chunked can be critical to performance when working with large datasets. You'll want chunk sizes large enough to reduce the number of chunks that Dask has to think about (to reduce overhead from the task graph) but also small enough so that many of them can fit in memory at once. .. tip:: @@ -198,9 +206,12 @@ It can be helpful to choose chunk sizes based on your downstream analyses and to You can rechunk a dataset by: -- Specifying ``chunks={}`` when reading in your dataset. If you know you'll want to do some spatial subsetting, for example, you could use ``chunks={'latitude': 10, 'longitude': 10}`` to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. +- Specifying ``chunks={}`` when reading in your dataset. If you know you'll want to do some spatial subsetting, for example, you could use ``chunks={'latitude': 10, 'longitude': 10}`` to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. Note that this will only work, though, for chunks that are similar to how the data is chunked on disk. Otherwise, it will be very slow and require a lot of network bandwidth. - Using :py:meth:`Dataset.chunk` after you've already read in your dataset. For time domain problems, for example, you can use ``ds.chunk(time=TimeResampler())`` to rechunk according to a specified unit of time. ``ds.chunk(time=TimeResampler("MS"))``, for example, will set the chunks so that a month of data is contained in one chunk. + +For large-scale rechunking tasks (e.g., converting a simulation dataset stored with chunking only along time to a dataset with chunking only across space), consider writing another copy of your data on disk and/or using dedicated tools such as `Rechunker `_. + .. _dask.automatic-parallelization: Parallelize custom functions with ``apply_ufunc`` and ``map_blocks`` @@ -210,10 +221,10 @@ Almost all of Xarray's built-in operations work on Dask arrays. If you want to use a function that isn't wrapped by Xarray, and have it applied in parallel on each block of your xarray object, you have three options: -1. Extract Dask Arrays from xarray objects with ``.data`` and use Dask directly. -2. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays. -3. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks` +1. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays. +2. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks` to apply functions that consume and return xarray objects. +3. Extract Dask Arrays from xarray objects with ``.data`` and use Dask directly. ``apply_ufunc`` @@ -439,19 +450,29 @@ Notice that the 0-shaped sizes were not printed to screen. Since ``template`` ha Deploying Dask -------------- -By default, Dask distributes work across multiple cores on a single machine, which lets you process as much data as you can download and fit onto your hard drive. However, this has two limitations: +By default, Dask uses the multi-threaded scheduler, which distributes work across multiple cores on a single machine and allows for processing some datasets that do not fit into memory. However, this has two limitations: - You are limited by the size of your hard drive - Downloading data can be slow and expensive -Instead, it can be faster and cheaper to run your computations close to where your data is stored, distributed across many machines on a Dask cluster. Often, this means deploying Dask on HPC clusters or on the cloud. See the `Dask documentation `__ for more details. - +Instead, it can be faster and cheaper to run your computations close to where your data is stored, distributed across many machines on a Dask cluster. Often, this means deploying Dask on HPC clusters or on the cloud. See the `Dask deployment documentation `__ for more details. Best Practices -------------- Dask is pretty easy to use but there are some gotchas, many of which are under active development. Here are some tips we have found through experience. We also recommend checking out the `Dask best practices `_. +1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). + +2. More generally, ``groupby()`` is a costly operation and will perform a lot better if the ``flox`` package is installed. + See the `flox documentation `_ for more. By default Xarray will use ``flox`` if installed. + +3. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) + +4. Use the `Dask dashboard `_ to identify performance bottlenecks. + +Here's an example of a simplified workflow putting some of these tips together: + .. code-block:: python from flox.xarray import xarray_reduce @@ -474,12 +495,3 @@ Dask is pretty easy to use but there are some gotchas, many of which are under a zonal_mean.load() # Pull smaller results into memory after reducing the dataset - -1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_). - -2. More generally, ``groupby()`` is a costly operation and will perform a lot better if the ``flox`` package is installed. - See the `flox documentation `_ for more. By default Xarray will use ``flox`` if installed. - -3. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_) - -4. Use the `Dask dashboard `_ to identify performance bottlenecks. From b6902484e1ba2e9a6af299aa4168d746141416f2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 16 Sep 2024 23:02:24 +0000 Subject: [PATCH 8/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- doc/user-guide/dask.rst | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index b57ca28b0fe..e972b79d047 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -56,7 +56,7 @@ When reading data, Dask divides your dataset into smaller chunks. You can specif .. tab:: Zarr The `Zarr `_ format is ideal for working with large datasets. Each chunk is stored in a separate file, allowing parallel reading and writing with Dask. You can also use Zarr to read/write directly from cloud storage buckets (see the `Dask documentation on connecting to remote data `__) - + When you open a Zarr dataset with :py:func:`~xarray.open_zarr`, it is loaded as a Dask array by default (if Dask is installed):: ds = xr.open_zarr("path/to/directory.zarr") @@ -81,7 +81,7 @@ When reading data, Dask divides your dataset into smaller chunks. You can specif Save larger-than-memory netCDF files:: ds.to_netcdf("my-big-file.nc") - + Or set ``compute=False`` to return a dask.delayed object that can be computed later:: delayed_write = ds.to_netcdf("my-big-file.nc", compute=False) @@ -494,4 +494,3 @@ Here's an example of a simplified workflow putting some of these tips together: ) zonal_mean.load() # Pull smaller results into memory after reducing the dataset - From a7c03e5289077ea57c84f70e2655990057e09efa Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 8 Nov 2024 17:44:17 -0700 Subject: [PATCH 9/9] Some edits --- doc/user-guide/dask.rst | 56 ++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst index e972b79d047..d7fb7cbd41e 100644 --- a/doc/user-guide/dask.rst +++ b/doc/user-guide/dask.rst @@ -29,6 +29,8 @@ Here are some examples for using Xarray with Dask at scale: - `CMIP6 Precipitation Frequency Analysis `_ - `Using Dask + Cloud Optimized GeoTIFFs `_ +Find more examples at the `Project Pythia cookbook gallery `_. + Using Dask with Xarray ---------------------- @@ -38,11 +40,11 @@ Using Dask with Xarray :align: right :alt: A Dask array -Dask divides arrays into smaller parts called chunks. These chunks are small, manageable pieces of the larger dataset, that Dask is able to process in parallel (see the `Dask Array docs on chunks `_). +Dask divides arrays into smaller parts called chunks. These chunks are small, manageable pieces of the larger dataset, that Dask is able to process in parallel (see the `Dask Array docs on chunks `_). Commonly chunks are set when reading data, but you can also set the chunksize manually at any point in your workflow using :py:meth:`Dataset.chunk` and :py:meth:`DataArray.chunk`. See :ref:`dask.chunks` for more. Xarray operations on Dask-backed arrays are lazy. This means computations are not executed immediately, but are instead queued up as tasks in a Dask graph. -When a result is requested (e.g., for plotting, saving, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently. +When a result is requested (e.g., for plotting, writing to disk, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently. Nearly all Xarray methods have been extended to work automatically with Dask Arrays. This includes things like indexing, concatenating, rechunking, grouped operations, etc. Common operations are covered in more detail in each of the sections below. @@ -51,7 +53,7 @@ Nearly all Xarray methods have been extended to work automatically with Dask Arr Reading and writing data ~~~~~~~~~~~~~~~~~~~~~~~~ -When reading data, Dask divides your dataset into smaller chunks. You can specify the size of chunks with the ``chunks`` argument. +When reading data, Dask divides your dataset into smaller chunks. You can specify the size of chunks with the ``chunks`` argument. Specifying ``chunks="auto"`` will set the dask chunk sizes to be a multiple of the on-disk chunk sizes. This can be a good idea, but usually the appropriate dask chunk size will depend on your workflow. .. tab:: Zarr @@ -75,7 +77,7 @@ When reading data, Dask divides your dataset into smaller chunks. You can specif .. tip:: - When reading in many netCDF files with py:func:`~xarray.open_mfdataset`, using ``engine=h5netcdf`` can + When reading in many netCDF files with py:func:`~xarray.open_mfdataset`, using ``engine="h5netcdf"`` can be faster than the default which uses the netCDF4 package. Save larger-than-memory netCDF files:: @@ -144,7 +146,7 @@ There are a few common cases where you may want to convert lazy Dask arrays into - You've reduced the dataset (by filtering or with a groupby, for example) and now have something much smaller that fits in memory - You need to compute intermediate results since Dask is unable (or struggles) to perform a certain computation. The canonical example of this is normalizing a dataset, e.g., ``ds - ds.mean()``, when ``ds`` is larger than memory. Typically, you should either save ``ds`` to disk or compute ``ds.mean()`` eagerly. -To do this, you can use :py:meth:`~xarray.Dataset.compute`: +To do this, you can use :py:meth:`Dataset.compute` or :py:meth:`DataArray.compute`: .. ipython:: python @@ -152,9 +154,9 @@ To do this, you can use :py:meth:`~xarray.Dataset.compute`: .. note:: - Using :py:meth:`~xarray.Dataset.compute` is preferred to :py:meth:`~xarray.Dataset.load`, which changes the results in-place. + Using :py:meth:`Dataset.compute` is preferred to :py:meth:`Dataset.load`, which changes the results in-place. -You can also access :py:attr:`~xarray.DataArray.values`, which will always be a NumPy array: +You can also access :py:attr:`DataArray.values`, which will always be a NumPy array: .. ipython:: :verbatim: @@ -166,7 +168,7 @@ You can also access :py:attr:`~xarray.DataArray.values`, which will always be a ... # truncated for brevity -NumPy ufuncs like ``np.sin`` transparently work on all xarray objects, including those +NumPy ufuncs like :py:func:`numpy.sin` transparently work on all xarray objects, including those that store lazy Dask arrays: .. ipython:: python @@ -175,22 +177,18 @@ that store lazy Dask arrays: np.sin(ds) -To access Dask arrays directly, use the -:py:attr:`DataArray.data ` attribute. This attribute exposes -array data either as a Dask array or as a NumPy array, depending on whether it has been -loaded into Dask or not. - -.. note:: - - ``.data`` is also used to expose other "computable" array backends beyond Dask and - NumPy (e.g. sparse and pint arrays). +To access Dask arrays directly, use the :py:attr:`DataArray.data` attribute which exposes the DataArray's underlying array type. -If you're using a Dask cluster, you can also use :py:meth:`~xarray.Dataset.persist` for quickly accessing intermediate outputs. This is most helpful after expensive operations like rechunking or setting an index. It's a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. You will get back a new Dask array that is semantically equivalent to your old array, but now points to running data. +If you're using a Dask cluster, you can also use :py:meth:`Dataset.persist` for quickly accessing intermediate outputs. This is most helpful after expensive operations like rechunking or setting an index. It's a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. You will get back a new Dask array that is semantically equivalent to your old array, but now points to running data. .. code-block:: python ds = ds.persist() +.. tip:: + + Remember to save the dataset returned by persist! This is a common mistake. + .. _dask.chunks: Chunking and performance @@ -204,9 +202,10 @@ The way a dataset is chunked can be critical to performance when working with la It can be helpful to choose chunk sizes based on your downstream analyses and to chunk as early as possible. Datasets with smaller chunks along the time axis, for example, can make time domain problems easier to parallelize since Dask can perform the same operation on each time chunk. If you're working with a large dataset with chunks that make downstream analyses challenging, you may need to rechunk your data. This is an expensive operation though, so is only recommended when needed. -You can rechunk a dataset by: +You can chunk or rechunk a dataset by: -- Specifying ``chunks={}`` when reading in your dataset. If you know you'll want to do some spatial subsetting, for example, you could use ``chunks={'latitude': 10, 'longitude': 10}`` to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. Note that this will only work, though, for chunks that are similar to how the data is chunked on disk. Otherwise, it will be very slow and require a lot of network bandwidth. +- Specifying the ``chunks`` kwarg when reading in your dataset. If you know you'll want to do some spatial subsetting, for example, you could use ``chunks={'latitude': 10, 'longitude': 10}`` to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. Note that this will only work, though, for chunks that are similar to how the data is chunked on disk. Otherwise, it will be very slow and require a lot of network bandwidth. +- Many array file formats are chunked on disk. You can specify ``chunks={}`` to have a single dask chunk map to a single on-disk chunk, and ``chunks="auto"`` to have a single dask chunk be a automatically chosen multiple of the on-disk chunks. - Using :py:meth:`Dataset.chunk` after you've already read in your dataset. For time domain problems, for example, you can use ``ds.chunk(time=TimeResampler())`` to rechunk according to a specified unit of time. ``ds.chunk(time=TimeResampler("MS"))``, for example, will set the chunks so that a month of data is contained in one chunk. @@ -224,7 +223,11 @@ each block of your xarray object, you have three options: 1. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays. 2. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks` to apply functions that consume and return xarray objects. -3. Extract Dask Arrays from xarray objects with ``.data`` and use Dask directly. +3. Extract Dask Arrays from xarray objects with :py:attr:`DataArray.data` and use Dask directly. + +.. tip:: + + See the extensive Xarray tutorial on `apply_ufunc `_. ``apply_ufunc`` @@ -475,7 +478,6 @@ Here's an example of a simplified workflow putting some of these tips together: .. code-block:: python - from flox.xarray import xarray_reduce import xarray ds = xr.open_zarr( # Since we're doing a spatial reduction, increase chunk size in x, y @@ -486,11 +488,7 @@ Here's an example of a simplified workflow putting some of these tips together: time=slice("2020-01-01", "2020-12-31") # Filter early ) - zonal_mean = xarray_reduce( # Faster groupby with flox - time_subset, - chunked_zones, - func="mean", - expected_groups=(zone_labels,), - ) + # faster resampling when flox is installed + daily = ds.resample(time="D").mean() - zonal_mean.load() # Pull smaller results into memory after reducing the dataset + daily.load() # Pull smaller results into memory after reducing the dataset