diff --git a/docs/examples/parallel-computing-with-dask.ipynb b/docs/examples/parallel-computing-with-dask.ipynb index 2165f6c8..92312a55 100644 --- a/docs/examples/parallel-computing-with-dask.ipynb +++ b/docs/examples/parallel-computing-with-dask.ipynb @@ -7,9 +7,7 @@ "source": [ "# General Guide for Parallelizing xCDAT Operations with Dask\n", "\n", - "Author: [Tom Vo](https://github.com/tomvothecoder)\n", - "\n", - "Date: 02/27/24\n" + "Author: [Tom Vo](https://github.com/tomvothecoder)\n" ] }, { @@ -28,7 +26,23 @@ "- How to use Dask with xCDAT, including real-world examples and performance metrics\n", "- Dask Schedulers and using a local distributed scheduler for more resource-intensive needs\n", "\n", - "_The data used in the code examples can be found through the [Earth System Grid Federation (ESGF) search portal](https://aims2.llnl.gov/search)._\n" + "_The data used in the code examples can be found through the [Earth System Grid Federation (ESGF) search portal](https://aims2.llnl.gov/search)._\n", + "\n", + "### More Resources\n", + "\n", + "To learn more in-depth about Dask and Xarray, please check these resources out:\n", + "\n", + "- [Official Xarray Parallel Computing with Dask Guide](https://docs.xarray.dev/en/stable/user-guide/dask.html)\n", + "- [Official Xarray Parallel Computing with Dask Jupyter Notebook Tutorial](https://tutorial.xarray.dev/intermediate/xarray_and_dask.html)\n", + "- [Official Dask guide for Xarray with Dask Arrays](https://examples.dask.org/xarray.html)\n", + "- [Project Pythia: Dask Arrays with Xarray](https://foundations.projectpythia.org/core/xarray/dask-arrays-xarray.html)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n" ] }, { @@ -38,16 +52,19 @@ "source": [ "## Notebook Setup\n", "\n", - "Create an Anaconda environment for this notebook using the command below:\n", + "Create an Anaconda environment for this notebook using the command below. You can\n", + "substitute `conda` with `mamba` if you are using Mamba instead.\n", "\n", "```bash\n", "conda create -n xcdat_dask_guide -c conda-forge python xarray dask xcdat flox matplotlib nc-time-axis jupyter jupyter-server-proxy\n", "```\n", "\n", - "- [`flox`](https://flox.readthedocs.io/en/latest/) is a package that is used to improve the xarray `groupby()` performance by\n", - " making it parallelizable.\n", - "- `matplotlib` is an optional dependency required for plotting with xarray\n", - "- [`nc-time-axis`](https://nc-time-axis.readthedocs.io/en/latest/) is an optional dependency required for `matplotlib` to plot `cftime` coordinates\n" + "- [`flox`](https://flox.readthedocs.io/en/latest/) is a package that is used to improve the xarray `groupby()` performance by making it parallelizable.\n", + "- [`matplotlib`](https://matplotlib.org/) is an optional dependency required for plotting with xarray.\n", + "- [`nc-time-axis`](https://nc-time-axis.readthedocs.io/en/latest/) is an optional dependency required for `matplotlib` to plot `cftime` coordinates.\n", + "- [`jupyter-server-proxy`](https://github.com/jupyterhub/jupyter-server-proxy) is a package used\n", + " for co-locating the Jupyter server with the Dask Scheduler so that they share the same port,\n", + " allowing for the Dask dashboard to route the connection to Jupyter (and vice versa).\n" ] }, { @@ -55,7 +72,32 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Basics of Dask Arrays\n", + "## Dask Best Practices\n", + "\n", + "- **Use NumPy**\n", + " - If your data fits comfortably in RAM and you are not performance bound, then using NumPy might be the right choice.\n", + " - Dask adds another layer of complexity which may get in the way.\n", + " - If you are just looking for speedups rather than scalability then you may want to consider a project like [Numba](https://numba.pydata.org/)\n", + "- **Select a good chunk size**\n", + " - A common performance problem among Dask Array users is that they have chosen a chunk size that is either too small (leading to lots of overhead) or poorly aligned with their data (leading to inefficient reading).\n", + "- Orient your chunks\n", + " - When reading data you should align your chunks with your storage format.\n", + "- **Avoid Oversubscribing Threads**\n", + " - By default Dask will run as many concurrent tasks as you have logical cores. It assumes that each task will consume about one core. However, many array-computing libraries are themselves multi-threaded, which can cause contention and low performance.\n", + "- **Consider Xarray**\n", + " - The Xarray package wraps around Dask Array, and so offers the same scalability, but also adds convenience when dealing with complex datasets\n", + "- **Build your own Operations**\n", + " - Often we want to perform computations for which there is no exact function in Dask Array. In these cases we may be able to use some of the more generic functions to build our own.\n", + "\n", + "Source: https://docs.dask.org/en/stable/array-best-practices.html#best-practices\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The basics of Dask Arrays\n", "\n", "- **Dask divides arrays** into many small pieces, called **\"chunks\"** (each presumed to be small enough to fit into memory)\n", "- Dask Array **operations are lazy**\n", @@ -76,39 +118,11 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## General Dask Best Practices\n", - "\n", - "- [Use NumPy](https://docs.dask.org/en/stable/array-best-practices.html#use-numpy)\n", - " - If your **data fits comfortably in RAM and you are not performance bound**, then using **NumPy might be the right choice**. Dask adds another layer of complexity which may get in the way.\n", - " - If you are just looking for **speedups rather than scalability** then you may want to **consider a project like [Numba](https://numba.pydata.org/)**\n", - "- [Select a good chunk size](https://docs.dask.org/en/stable/array-best-practices.html#select-a-good-chunk-size)\n", - " - A **common performance problem among Dask Array users** is that they have chosen a **chunk size** that is either **too small** (leading to lots of overhead) or **poorly aligned with their data** (leading to inefficient reading).\n", - "- [Orient your chunks](https://docs.dask.org/en/stable/array-best-practices.html#orient-your-chunks)\n", - " - When reading data you should **align your chunks with your storage format**.\n", - "- [Avoid Oversubscribing Threads](https://docs.dask.org/en/stable/array-best-practices.html#avoid-oversubscribing-threads)\n", - " - **By default Dask will run as many concurrent tasks as you have logical cores.** It assumes that **each task** will consume about **one core**. However, many array-computing libraries are themselves multi-threaded, which can cause contention and low performance.\n", - "- [Consider Xarray](hhttps://docs.dask.org/en/stable/array-best-practices.html#consider-xarray)\n", - " - The **Xarray package wraps** around **Dask Array**, and so offers the **same scalability**, but also adds **convenience when dealing with complex datasets**.\n", - "- [Build your own Operations](https://docs.dask.org/en/stable/array-best-practices.html#build-your-own-operations)\n", - " - Often we want to perform computations for which there is no exact function in Dask Array. In these cases we may be able to use some of the more generic functions to build our own.\n", + "## Dask integration with Xarray\n", "\n", - "Source: https://docs.dask.org/en/stable/array-best-practices.html#best-practices\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## How to use Dask with Xarray\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Quick Overview\n" + "
\n", + " \"xarray\n", + "
\n" ] }, { @@ -116,48 +130,46 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "- Why does Xarray integrate with Dask?\n", - " > Xarray integrates with Dask to support parallel computations and streaming computation\n", - " > on datasets that don’t fit into memory. Currently, Dask is an entirely optional feature\n", - " > for xarray. However, the benefits of using Dask are sufficiently strong that Dask may\n", - " > become a required dependency in a future version of xarray.\n", - " >\n", - " > — https://docs.xarray.dev/en/stable/use\n", - "- Which Xarray features support Dask?\n", - "\n", - " > Nearly all existing xarray methods (including those for indexing, computation,\n", - " > concatenating and grouped operations) have been extended to work automatically with\n", - " > Dask arrays. When you load data as a Dask array in an xarray data structure, almost\n", - " > all xarray operations will keep it as a Dask array; when this is not possible, they\n", - " > will raise an exception rather than unexpectedly loading data into memory.\n", - " >\n", - " > — https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray\n", - "\n", - "- What is the default Dask behavior for distributing work on compute hardware?\n", - "\n", - " > By default, dask uses its multi-threaded scheduler, which distributes work across\n", - " > multiple cores and allows for processing some datasets that do not fit into memory.\n", - " > For running across a cluster, [setup the distributed scheduler](https://docs.dask.org/en/latest/setup.html).\n", - " >\n", - " > — https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray\n", - "\n", - "- How do I use Dask arrays in an `xarray.Dataset`?\n", - "\n", - " > The usual way to create a Dataset filled with Dask arrays is to load the data from a\n", - " > netCDF file or files. You can do this by supplying a `chunks` argument to [open_dataset()](https://docs.xarray.dev/en/stable/generated/xarray.open_dataset.html#xarray.open_dataset)\n", - " > or using the [open_mfdataset()](https://docs.xarray.dev/en/stable/generated/xarray.open_mfdataset.html#xarray.open_mfdataset) function.\n", - "\n", - "- What happens if I don't specify `chunks` with `open_mfdataset()`?\n", - "\n", - " > `open_mfdataset()` called without `chunks` argument will return dask arrays with\n", - " > chunk sizes equal to the individual files. Re-chunking the dataset after creation\n", - " > with `ds.chunk()` will lead to an ineffective use of memory and is not recommended.\n", - " >\n", - " > — https://docs.xarray.dev/en/stable/user-guide/dask.html#reading-and-writing-data\n", - "\n", - "- Are there any optimizations tips for working with Dask and Xarray?\n", - "\n", - " - We HIGHLY recommend checking out the [Optimization Tips](https://docs.xarray.dev/en/stable/user-guide/dask.html#optimization-tips) section if you are using Dask with Xarray.\n" + "**Why does Xarray integrate with Dask?**\n", + "\n", + "> Xarray integrates with Dask to support parallel computations and streaming computation\n", + "> on datasets that don’t fit into memory. Currently, Dask is an entirely optional feature\n", + "> for xarray. However, the benefits of using Dask are sufficiently strong that Dask may\n", + "> become a required dependency in a future version of xarray.\n", + ">\n", + "> — https://docs.xarray.dev/en/stable/use\n", + "\n", + "**Which Xarray features support Dask?**\n", + "\n", + "> Nearly all existing xarray methods (including those for indexing, computation,\n", + "> concatenating and grouped operations) have been extended to work automatically with\n", + "> Dask arrays. When you load data as a Dask array in an xarray data structure, almost\n", + "> all xarray operations will keep it as a Dask array; when this is not possible, they\n", + "> will raise an exception rather than unexpectedly loading data into memory.\n", + ">\n", + "> — https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray\n", + "\n", + "**What is the default Dask behavior for distributing work on compute hardware**\n", + "\n", + "> By default, dask uses its multi-threaded scheduler, which distributes work across\n", + "> multiple cores and allows for processing some datasets that do not fit into memory.\n", + "> For running across a cluster, [setup the distributed scheduler](https://docs.dask.org/en/latest/setup.html).\n", + ">\n", + "> — https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray\n", + "\n", + "**How do I use Dask arrays in an `xarray.Dataset`**\n", + "\n", + "> The usual way to create a Dataset filled with Dask arrays is to load the data from a\n", + "> netCDF file or files. You can do this by supplying a `chunks` argument to [open_dataset()](https://docs.xarray.dev/en/stable/generated/xarray.open_dataset.html#xarray.open_dataset)\n", + "> or using the [open_mfdataset()](https://docs.xarray.dev/en/stable/generated/xarray.open_mfdataset.html#xarray.open_mfdataset) function.\n", + "\n", + "**What happens if I don't specify `chunks` with `open_mfdataset()`**\n", + "\n", + "> `open_mfdataset()` called without `chunks` argument will return dask arrays with\n", + "> chunk sizes equal to the individual files. Re-chunking the dataset after creation\n", + "> with `ds.chunk()` will lead to an ineffective use of memory and is not recommended.\n", + ">\n", + "> — https://docs.xarray.dev/en/stable/user-guide/dask.html#reading-and-writing-data\n" ] }, { @@ -165,7 +177,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Chunking Best Practices\n" + "### First, let's learn about chunking arrays\n" ] }, { @@ -196,11 +208,10 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ - "### Chunking and Performance\n" + "### Chunking with Xarray\n" ] }, { @@ -208,22 +219,27 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "> The chunks parameter has critical performance implications when using Dask arrays. If\n", - "> your chunks are too small, queueing up operations will be extremely slow, because Dask\n", - "> will translate each operation into a huge number of operations mapped across chunks.\n", - "> Computation on Dask arrays with small chunks can also be slow, because each operation\n", - "> on a chunk has some fixed overhead from the Python interpreter and the Dask task\n", - "> executor.\n", - ">\n", - "> Conversely, if your chunks are too big, some of your computation may be wasted, because\n", - "> Dask only computes results one chunk at a time.\n", - ">\n", - "> A good rule of thumb is to create arrays with a minimum chunksize of at least one\n", - "> million elements (e.g., a 1000x1000 > matrix). With large arrays (10+ GB), the cost of\n", - "> queueing up Dask operations can be noticeable, and you may need even > larger\n", - "> chunksizes.\n", - ">\n", - "> — https://docs.xarray.dev/en/stable/user-guide/dask.html#chunking-and-performance\n" + "The `chunks` parameter has critical performance implications when using Dask arrays.\n", + "\n", + "- **If your chunks are too small**, queueing up operations will be extremely slow.\n", + "\n", + " - Dask will translate each operation into a huge number of operations mapped across chunks.\n", + " - Computation on Dask arrays with small chunks can also be slow, because each operation on a chunk has some fixed overhead from the Python interpreter and the Dask task executor.\n", + "\n", + "- **If your chunks are too big**, some of your computation may be wasted. Dask only computes results one chunk at a time.\n", + "\n", + "Source: https://docs.xarray.dev/en/stable/user-guide/dask.html#chunking-and-performance\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Good rule of thumb\n", + "\n", + "**Create arrays with a minimum chunksize of at least one million elements (e.g., a 1000x1000 > matrix).**\n", + "\n", + "**With large arrays (10+ GB)**, the cost of queueing up Dask operations can be noticeable and **you may need even > larger chunksizes**.\n" ] }, { @@ -231,22 +247,28 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "#### User Tip: You can let Dask figure out the optimal chunk size\n", + "#### Or let Dask try to figure out chunking for you\n", "\n", "Dask Arrays can look for a `.chunks` attribute and use that to provide a good chunking.\n", - "This helps prevent users from specifying \"too many chunks\" and \"too few chunks\" which\n", + "This can help prevent users from specifying \"too many chunks\" and \"too few chunks\" which\n", "can lead to performance issues.\n", "\n", - "To do this in `xarray.open_dataset()` and `xarray.open_mfdataset()`, specify:\n", + "To do this in `open_dataset()`/`open_mfdataset()`, specify `chunks` on specific a dimension(s) or all dimensions as shown below:\n", "\n", - "1. `chunks={\"time\": \"auto\"}`: auto-scale the specified dimension to get to accommodate ideal chunk sizes\n", + "1. `chunks={\"time\": \"auto\"}` - auto-scale the specified dimension(s) to get to accommodate ideal chunk sizes\n", " - replace `\"time\"` and/or add additional dims to the dictionary for auto-scaling\n", - "2. `chunks=\"auto\"`: allow chunking _all_ dimensions to accommodate ideal chunk sizes\n", - "\n", - "_Disclaimer: Dask's automatic chunking scheme might not be optimal for some datasets\n", - "and/or computational operations._\n", + "2. `chunks=\"auto\"` - allow chunking _all_ dimensions to accommodate ideal chunk sizes\n", "\n", - "> — https://docs.dask.org/en/latest/array-chunks.html#automatic-chunking\n" + "Source: https://docs.dask.org/en/latest/array-chunks.html#automatic-chunking\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> DISCLAIMER: Dask's automatic chunking scheme might not always optimally align the chunks\n", + "> to the data and/or the type computation being performed. For these cases, it is recommended\n", + "> that you manually chunk for better precision with chunk alignment.\n" ] }, { @@ -254,16 +276,19 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Chunking and Parallelism with Xarray\n", + "### Code example: chunking and parallelism with xCDAT + Dask\n", + "\n", + "The code example below demonstrates chunking a dataset in Xarray and grouping the data\n", + "in parallel across chunks.\n", "\n", - "The code example below demonstrates chunking a dataset in Xarray. By default, dask uses its multi-threaded scheduler, which distributes work across multiple cores and allows for processing some datasets that do not fit into memory.\n", + "**By default, dask uses its multi-threaded scheduler**, which distributes work across multiple cores and allows for processing some datasets that do not fit into memory.\n", "\n", - "If you are interested in using a distributed scheduler (local or cluster) for more resource-intensive computational operations, refer to the last cell of this notebook.\n" + "If you are interested in using a distributed scheduler (local or cluster) for more resource-intensive computational operations, there is more information below in this notebook.\n" ] }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 12, "metadata": {}, "outputs": [], "source": [ @@ -277,7 +302,7 @@ "# Disclaimer: The dataset used in the example is only a few hundred MBs to make\n", "# downloading the file quick. A file this small should normally **NOT** be\n", "# chunked since computational performance will most likely suffer.\n", - "filepath = \"http://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CMIP/CSIRO/ACCESS-ESM1-5/historical/r10i1p1f1/Amon/tas/gn/v20200605/tas_Amon_ACCESS-ESM1-5_historical_r10i1p1f1_gn_185001-201412.nc\"\n" + "filepath = \"http://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CMIP/CSIRO/ACCESS-ESM1-5/historical/r10i1p1f1/Amon/tas/gn/v20200605/tas_Amon_ACCESS-ESM1-5_historical_r10i1p1f1_gn_185001-201412.nc\"" ] }, { @@ -285,21 +310,21 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Chunk the time dimension by 10. Alternatively, let Dask auto-scale all dimensions to get a good chunk size using `chunks=\"auto\"`, which references the `.chunks` attribute.\n" + "We're letting Dask auto-scale all dimensions to get a good chunk size using `chunks=\"auto\"`, which references the `.chunks` attribute.\n" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 13, "metadata": {}, "outputs": [], "source": [ - "ds = xr.open_dataset(filepath, chunks={\"time\": \"10\"})" + "ds = xr.open_dataset(filepath, chunks=\"auto\")" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 14, "metadata": {}, "outputs": [ { @@ -677,10 +702,10 @@ " height float64 8B ...\n", "Dimensions without coordinates: bnds\n", "Data variables:\n", - " time_bnds (time, bnds) datetime64[ns] 32kB dask.array<chunksize=(1, 2), meta=np.ndarray>\n", + " time_bnds (time, bnds) datetime64[ns] 32kB dask.array<chunksize=(1980, 2), meta=np.ndarray>\n", " lat_bnds (lat, bnds) float64 2kB dask.array<chunksize=(145, 2), meta=np.ndarray>\n", " lon_bnds (lon, bnds) float64 3kB dask.array<chunksize=(192, 2), meta=np.ndarray>\n", - " tas (time, lat, lon) float32 220MB dask.array<chunksize=(1, 145, 192), meta=np.ndarray>\n", + " tas (time, lat, lon) float32 220MB dask.array<chunksize=(1695, 122, 162), meta=np.ndarray>\n", "Attributes: (12/48)\n", " Conventions: CF-1.7 CMIP-6.2\n", " activity_id: CMIP\n", @@ -694,10 +719,10 @@ " license: CMIP6 model data produced by CSIRO is li...\n", " cmor_version: 3.4.0\n", " tracking_id: hdl:21.14100/af78ae5e-f3a6-4e99-8cfe-5f2...\n", - " DODS_EXTRA.Unlimited_Dimension: time