From bde0decce08252d8eb6875d32379c70e992f2c0b Mon Sep 17 00:00:00 2001 From: AJ Schmidt Date: Thu, 4 Nov 2021 10:11:15 -0400 Subject: [PATCH 01/21] DOC v22.02 Updates --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48a2b5e5..ebd5f2a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# dask-cuda 22.02.00 (Date TBD) + +Please see https://github.com/rapidsai/dask-cuda/releases/tag/v22.02.00a for the latest changes to this development branch. + # dask-cuda 21.12.00 (Date TBD) Please see https://github.com/rapidsai/dask-cuda/releases/tag/v21.12.00a for the latest changes to this development branch. From ec539c7c8c364cfe1d894d15fb302e54f755ff50 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 18 Nov 2021 15:25:40 +0100 Subject: [PATCH 02/21] Disk IO interface (#791) This PR standardize the API for spilling to disk and avoid a in-memory copy before writing to file. This is part of the work to support spilling using GPUDirect Storage (GDS). Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/791 --- dask_cuda/disk_io.py | 54 ++++++++++++++++++++++++++++++++++ dask_cuda/proxify_host_file.py | 44 +++++++++++++-------------- dask_cuda/proxy_object.py | 29 +++++++++--------- 3 files changed, 89 insertions(+), 38 deletions(-) create mode 100644 dask_cuda/disk_io.py diff --git a/dask_cuda/disk_io.py b/dask_cuda/disk_io.py new file mode 100644 index 00000000..76bb443a --- /dev/null +++ b/dask_cuda/disk_io.py @@ -0,0 +1,54 @@ +from typing import Iterable, Mapping + +from distributed.utils import nbytes + + +def disk_write(path: str, frames: Iterable, shared_filesystem: bool) -> dict: + """Write frames to disk + + Parameters + ---------- + path: str + File path + frames: Iterable + The frames to write to disk + shared_filesystem: bool + Whether the target filesystem is shared between all workers or not. + If True, the filesystem must support the `os.link()` operation. + + Returns + ------- + header: dict + A dict of metadata + """ + + with open(path, "wb") as f: + for frame in frames: + f.write(frame) + return { + "method": "stdio", + "path": path, + "frame-lengths": tuple(map(nbytes, frames)), + "shared-filesystem": shared_filesystem, + } + + +def disk_read(header: Mapping) -> list: + """Read frames from disk + + Parameters + ---------- + header: Mapping + The metadata of the frames to read + + Returns + ------- + frames: list + List of read frames + """ + + ret = [] + with open(header["path"], "rb") as f: + for length in header["frame-lengths"]: + ret.append(f.read(length)) + return ret diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index e9bbd0c4..7aa910f7 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -35,8 +35,8 @@ register_serialization_family, serialize_and_split, ) -from distributed.protocol.utils import pack_frames, unpack_frames +from dask_cuda.disk_io import disk_read, disk_write from dask_cuda.get_device_memory_objects import DeviceMemoryId, get_device_memory_ids from .proxify_device_objects import proxify_device_objects, unproxify_device_objects @@ -667,35 +667,33 @@ def register_disk_spilling( cls._spill_shared_filesystem = shared_filesystem def disk_dumps(x): - header, frames = serialize_and_split(x, on_error="raise") + serialize_header, frames = serialize_and_split(x, on_error="raise") if frames: compression, frames = zip(*map(maybe_compress, frames)) else: compression = [] - header["compression"] = compression - header["count"] = len(frames) - - path = cls.gen_file_path() - with open(path, "wb") as f: - f.write(pack_frames(frames)) + serialize_header["compression"] = compression + serialize_header["count"] = len(frames) return ( { "serializer": "disk", - "path": path, - "shared-filesystem": cls._spill_shared_filesystem, - "disk-sub-header": header, + "disk-io-header": disk_write( + path=cls.gen_file_path(), + frames=frames, + shared_filesystem=cls._spill_shared_filesystem, + ), + "serialize-header": serialize_header, }, [], ) def disk_loads(header, frames): assert frames == [] - with open(header["path"], "rb") as f: - frames = unpack_frames(f.read()) - os.remove(header["path"]) - if "compression" in header["disk-sub-header"]: - frames = decompress(header["disk-sub-header"], frames) - return merge_and_deserialize(header["disk-sub-header"], frames) + frames = disk_read(header["disk-io-header"]) + os.remove(header["disk-io-header"]["path"]) + if "compression" in header["serialize-header"]: + frames = decompress(header["serialize-header"], frames) + return merge_and_deserialize(header["serialize-header"], frames) register_serialization_family("disk", disk_dumps, disk_loads) @@ -716,15 +714,15 @@ def serialize_proxy_to_disk_inplace(cls, proxy: ProxyObject): if pxy.is_serialized(): header, frames = pxy.obj if header["serializer"] in ("dask", "pickle"): - path = cls.gen_file_path() - with open(path, "wb") as f: - f.write(pack_frames(frames)) pxy.obj = ( { "serializer": "disk", - "path": path, - "shared-filesystem": cls._spill_shared_filesystem, - "disk-sub-header": header, + "disk-io-header": disk_write( + path=cls.gen_file_path(), + frames=frames, + shared_filesystem=cls._spill_shared_filesystem, + ), + "serialize-header": header, }, [], ) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index e8617f4e..fc936ee4 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -1,4 +1,4 @@ -import copy +import copy as _copy import functools import operator import os @@ -7,7 +7,6 @@ import uuid from collections import OrderedDict from contextlib import nullcontext -from copy import copy as _copy from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Tuple, Type, Union import pandas @@ -21,9 +20,10 @@ import distributed.utils from dask.sizeof import sizeof from distributed.protocol.compression import decompress -from distributed.protocol.utils import unpack_frames from distributed.worker import dumps_function, loads_function +from dask_cuda.disk_io import disk_read + try: from dask.dataframe.backends import concat_pandas except ImportError: @@ -34,7 +34,6 @@ except ImportError: from dask.dataframe.utils import make_meta as make_meta_dispatch - from .is_device_object import is_device_object if TYPE_CHECKING: @@ -364,7 +363,7 @@ def __init__(self, detail: ProxyDetail): def _pxy_get(self, copy=False) -> ProxyDetail: if copy: - return _copy(self._pxy_detail) + return _copy.copy(self._pxy_detail) else: return self._pxy_detail @@ -773,21 +772,21 @@ def handle_disk_serialized(pxy: ProxyDetail): On a non-shared filesystem, we deserialize the proxy to host memory. """ header, frames = pxy.obj - if header["shared-filesystem"]: - old_path = header["path"] + disk_io_header = header["disk-io-header"] + if disk_io_header["shared-filesystem"]: + old_path = disk_io_header["path"] new_path = f"{old_path}-linked-{uuid.uuid4()}" os.link(old_path, new_path) - header = copy.copy(header) - header["path"] = new_path + header = _copy.deepcopy(header) + header["disk-io-header"]["path"] = new_path else: # When not on a shared filesystem, we deserialize to host memory assert frames == [] - with open(header["path"], "rb") as f: - frames = unpack_frames(f.read()) - os.remove(header["path"]) - if "compression" in header["disk-sub-header"]: - frames = decompress(header["disk-sub-header"], frames) - header = header["disk-sub-header"] + frames = disk_read(disk_io_header) + os.remove(disk_io_header["path"]) + if "compression" in header["serialize-header"]: + frames = decompress(header["serialize-header"], frames) + header = header["serialize-header"] pxy.serializer = header["serializer"] return header, frames From d4cb2132fd7bc060210a8235299a42055ca4a17f Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 22 Nov 2021 15:15:16 +0100 Subject: [PATCH 03/21] Fixed `ProxyObject.__del__` to use the new Disk IO API from #791 (#802) Anyone that knows how to make a `pytest` that tests exceptions in class destructors? Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/802 --- dask_cuda/proxy_object.py | 2 +- dask_cuda/tests/test_proxify_host_file.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index fc936ee4..c4187701 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -378,7 +378,7 @@ def __del__(self): pxy.manager.remove(self) if pxy.serializer == "disk": header, _ = pxy.obj - os.remove(header["path"]) + os.remove(header["disk-io-header"]["path"]) def _pxy_serialize( self, serializers: Iterable[str], proxy_detail: ProxyDetail = None, diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 078ede88..953694c3 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -124,8 +124,13 @@ def test_one_dev_item_limit(): assert is_proxies_equal(dhf.manager._host.get_proxies(), [k4]) assert is_proxies_equal(dhf.manager._dev.get_proxies(), [k1]) + # Clean up + del k1, k4 + dhf.clear() + assert len(dhf.manager) == 0 -def test_one_item_host_limit(): + +def test_one_item_host_limit(capsys): memory_limit = sizeof(asproxy(one_item_array(), serializers=("dask", "pickle"))) dhf = ProxifyHostFile( device_memory_limit=one_item_nbytes, memory_limit=memory_limit @@ -154,7 +159,6 @@ def test_one_item_host_limit(): assert is_proxies_equal(dhf.manager._disk.get_proxies(), [k1]) assert is_proxies_equal(dhf.manager._host.get_proxies(), [k2]) assert is_proxies_equal(dhf.manager._dev.get_proxies(), [k3]) - dhf.manager.validate() # Accessing k2 spills k3 and unspill k2 @@ -181,6 +185,11 @@ def test_one_item_host_limit(): assert is_proxies_equal(dhf.manager._host.get_proxies(), [k4]) assert is_proxies_equal(dhf.manager._dev.get_proxies(), [k1]) + # Clean up + del k1, k2, k3, k4 + dhf.clear() + assert len(dhf.manager) == 0 + def test_spill_on_demand(): """ From 43af6e0ebe63a454a0e4adca89a94453b9c6245d Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 23 Nov 2021 13:38:02 +0100 Subject: [PATCH 04/21] Update to UCX-Py 0.24 (#805) Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) URL: https://github.com/rapidsai/dask-cuda/pull/805 --- ci/gpu/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 91ad7bed..9f0b200f 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -56,7 +56,7 @@ conda list --show-channel-urls # indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject gpuci_mamba_retry install "cudatoolkit=$CUDA_REL" \ "cudf=${MINOR_VERSION}" "dask-cudf=${MINOR_VERSION}" \ - "ucx-py=0.23.*" "ucx-proc=*=gpu" \ + "ucx-py=0.24.*" "ucx-proc=*=gpu" \ "rapids-build-env=$MINOR_VERSION.*" # Pin pytest-asyncio because latest versions modify the default asyncio From c8618fd41e90ab1e3d38b56be4ffaaeea0cdb7fd Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 23 Nov 2021 13:39:02 +0100 Subject: [PATCH 05/21] Standardize Distributed config separator in get_ucx_config (#806) Some of the UCX configurations used `_` whereas others used `-` as a separator. This has been standardized in https://github.com/dask/distributed/pull/5539, and this updates Dask-CUDA to the new standard. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/dask-cuda/pull/806 --- dask_cuda/tests/test_utils.py | 29 +++++++++----------- dask_cuda/utils.py | 51 ++++++++++++++++------------------- 2 files changed, 35 insertions(+), 45 deletions(-) diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index 4aeac6e4..208978c3 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -4,6 +4,8 @@ import pytest from numba import cuda +from dask.config import canonical_name + from dask_cuda.utils import ( _ucx_111, cuda_visible_devices, @@ -176,30 +178,23 @@ def test_get_ucx_config(enable_tcp_over_ucx, enable_infiniband, net_devices): else: ucx_config = get_ucx_config(**kwargs) - if enable_tcp_over_ucx is True: - assert ucx_config["tcp"] is True - assert ucx_config["cuda_copy"] is True - else: - assert ucx_config["tcp"] is None + assert ucx_config[canonical_name("tcp", ucx_config)] is enable_tcp_over_ucx + assert ucx_config[canonical_name("infiniband", ucx_config)] is enable_infiniband - if enable_infiniband is True: - assert ucx_config["infiniband"] is True - assert ucx_config["cuda_copy"] is True + if enable_tcp_over_ucx is True or enable_infiniband is True: + assert ucx_config[canonical_name("cuda-copy", ucx_config)] is True else: - assert ucx_config["infiniband"] is None + assert ucx_config[canonical_name("cuda-copy", ucx_config)] is None - if enable_tcp_over_ucx is False and enable_infiniband is False: - assert ucx_config["cuda_copy"] is None - - if net_devices == "eth0": - assert ucx_config["net-devices"] == "eth0" - elif net_devices == "auto": + if net_devices == "auto": # Since the actual device is system-dependent, we don't do any # checks at the moment. If any InfiniBand devices are available, # that will be the value of "net-devices", otherwise an empty string. pass - elif net_devices == "": - assert "net-device" not in ucx_config + elif net_devices == "eth0": + assert ucx_config[canonical_name("net-devices", ucx_config)] == "eth0" + else: + assert ucx_config[canonical_name("net-devices", ucx_config)] is None def test_parse_visible_devices(): diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 15794e6e..8729bc5f 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -9,6 +9,9 @@ import pynvml import toolz +import dask +import distributed # noqa: required for dask.config.get("distributed.comm.ucx") +from dask.config import canonical_name from dask.utils import parse_bytes from distributed import Worker, wait @@ -283,10 +286,10 @@ def get_ucx_net_devices( def get_ucx_config( - enable_tcp_over_ucx=False, - enable_infiniband=False, - enable_nvlink=False, - enable_rdmacm=False, + enable_tcp_over_ucx=None, + enable_infiniband=None, + enable_nvlink=None, + enable_rdmacm=None, net_devices="", cuda_device_index=None, ): @@ -296,29 +299,25 @@ def get_ucx_config( "supported when enable_infiniband=True." ) - ucx_config = { - "tcp": None, - "infiniband": None, - "nvlink": None, - "rdmacm": None, - "net-devices": None, - "cuda_copy": None, - "create_cuda_context": None, - "reuse-endpoints": not _ucx_111, - } + ucx_config = dask.config.get("distributed.comm.ucx") + + ucx_config[canonical_name("create-cuda-context", ucx_config)] = True + ucx_config[canonical_name("reuse-endpoints", ucx_config)] = not _ucx_111 + + ucx_config[canonical_name("tcp", ucx_config)] = enable_tcp_over_ucx + ucx_config[canonical_name("infiniband", ucx_config)] = enable_infiniband + ucx_config[canonical_name("nvlink", ucx_config)] = enable_nvlink + ucx_config[canonical_name("rdmacm", ucx_config)] = enable_rdmacm + if enable_tcp_over_ucx or enable_infiniband or enable_nvlink: - ucx_config["cuda_copy"] = True - if enable_tcp_over_ucx: - ucx_config["tcp"] = True - if enable_infiniband: - ucx_config["infiniband"] = True - if enable_nvlink: - ucx_config["nvlink"] = True - if enable_rdmacm: - ucx_config["rdmacm"] = True + ucx_config[canonical_name("cuda-copy", ucx_config)] = True + else: + ucx_config[canonical_name("cuda-copy", ucx_config)] = None if net_devices is not None and net_devices != "": - ucx_config["net-devices"] = get_ucx_net_devices(cuda_device_index, net_devices) + ucx_config[canonical_name("net-devices", ucx_config)] = get_ucx_net_devices( + cuda_device_index, net_devices + ) return ucx_config @@ -640,15 +639,11 @@ class MockWorker(Worker): """ def __init__(self, *args, **kwargs): - import distributed - distributed.diagnostics.nvml.device_get_count = MockWorker.device_get_count self._device_get_count = distributed.diagnostics.nvml.device_get_count super().__init__(*args, **kwargs) def __del__(self): - import distributed - distributed.diagnostics.nvml.device_get_count = self._device_get_count @staticmethod From 51ed961a8ea834481670fc0c1d3069211a4c84a2 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 23 Nov 2021 20:25:10 +0100 Subject: [PATCH 06/21] GPUDirect Storage (GDS) support for spilling (#793) This PR enables the use of GDS when spilling to disk. It requires [cuCIM](https://github.com/rapidsai/cucim/) and can be enabled by setting the Dask `"gds-spilling"` option: ```bash DASK_GDS_SPILLING=True ``` For now this uses the [cuCIM](https://github.com/rapidsai/cucim/) binding for GDS but in the future we should use an independent Python package. Also please note, the GDS access hasn't been optimized in any way. The target of this PR is to get it working :) cc. @jakirkham Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/dask-cuda/pull/793 --- ci/gpu/build.sh | 7 +-- dask_cuda/disk_io.py | 93 ++++++++++++++++++++++++++++++---- dask_cuda/proxify_host_file.py | 41 +++++++++++++-- dask_cuda/tests/test_gds.py | 37 ++++++++++++++ 4 files changed, 160 insertions(+), 18 deletions(-) create mode 100644 dask_cuda/tests/test_gds.py diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 9f0b200f..357c919d 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -53,11 +53,13 @@ conda config --show-sources conda list --show-channel-urls # Fixing Numpy version to avoid RuntimeWarning: numpy.ufunc size changed, may -# indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject +# indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject. +# Also installing cucim in order to test GDS spilling gpuci_mamba_retry install "cudatoolkit=$CUDA_REL" \ "cudf=${MINOR_VERSION}" "dask-cudf=${MINOR_VERSION}" \ "ucx-py=0.24.*" "ucx-proc=*=gpu" \ - "rapids-build-env=$MINOR_VERSION.*" + "rapids-build-env=$MINOR_VERSION.*" \ + "cucim" # Pin pytest-asyncio because latest versions modify the default asyncio # `event_loop_policy`. See https://github.com/dask/distributed/pull/4212 . @@ -67,7 +69,6 @@ gpuci_mamba_retry install "pytest-asyncio=<0.14.0" # gpuci_mamba_retry remove -f rapids-build-env # gpuci_mamba_retry install "your-pkg=1.0.0" - conda info conda config --show-sources conda list --show-channel-urls diff --git a/dask_cuda/disk_io.py b/dask_cuda/disk_io.py index 76bb443a..439f6dba 100644 --- a/dask_cuda/disk_io.py +++ b/dask_cuda/disk_io.py @@ -1,9 +1,51 @@ -from typing import Iterable, Mapping +import weakref +from typing import Callable, Iterable, Mapping, Optional -from distributed.utils import nbytes +import numpy as np +from distributed.utils import Any, nbytes -def disk_write(path: str, frames: Iterable, shared_filesystem: bool) -> dict: +_new_cuda_buffer: Optional[Callable[[int], Any]] = None + + +def get_new_cuda_buffer() -> Callable[[int], Any]: + """Return a function to create an empty CUDA buffer""" + global _new_cuda_buffer + if _new_cuda_buffer is not None: + return _new_cuda_buffer + try: + import rmm + + _new_cuda_buffer = lambda n: rmm.DeviceBuffer(size=n) + return _new_cuda_buffer + except ImportError: + pass + + try: + import cupy + + _new_cuda_buffer = lambda n: cupy.empty((n,), dtype="u1") + return _new_cuda_buffer + except ImportError: + pass + + try: + import numba.cuda + + def numba_device_array(n): + a = numba.cuda.device_array((n,), dtype="u1") + weakref.finalize(a, numba.cuda.current_context) + return a + + _new_cuda_buffer = numba_device_array + return _new_cuda_buffer + except ImportError: + pass + + raise RuntimeError("GPUDirect Storage requires RMM, CuPy, or Numba") + + +def disk_write(path: str, frames: Iterable, shared_filesystem: bool, gds=False) -> dict: """Write frames to disk Parameters @@ -15,40 +57,69 @@ def disk_write(path: str, frames: Iterable, shared_filesystem: bool) -> dict: shared_filesystem: bool Whether the target filesystem is shared between all workers or not. If True, the filesystem must support the `os.link()` operation. + gds: bool + Enable the use of GPUDirect Storage. Notice, the consecutive + `disk_read()` must enable GDS as well. Returns ------- header: dict A dict of metadata """ + cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames) + frame_lengths = tuple(map(nbytes, frames)) + if gds and any(cuda_frames): + import cucim.clara.filesystem as cucim_fs - with open(path, "wb") as f: - for frame in frames: - f.write(frame) + with cucim_fs.open(path, "w") as f: + for frame, length in zip(frames, frame_lengths): + f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0) + + else: + with open(path, "wb") as f: + for frame in frames: + f.write(frame) return { "method": "stdio", "path": path, "frame-lengths": tuple(map(nbytes, frames)), "shared-filesystem": shared_filesystem, + "cuda-frames": cuda_frames, } -def disk_read(header: Mapping) -> list: +def disk_read(header: Mapping, gds=False) -> list: """Read frames from disk Parameters ---------- header: Mapping The metadata of the frames to read + gds: bool + Enable the use of GPUDirect Storage. Notice, this must + match the GDS option set by the prior `disk_write()` call. Returns ------- frames: list List of read frames """ - ret = [] - with open(header["path"], "rb") as f: - for length in header["frame-lengths"]: - ret.append(f.read(length)) + if gds: + import cucim.clara.filesystem as cucim_fs # isort:skip + + with cucim_fs.open(header["path"], "rb") as f: + file_offset = 0 + for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"]): + if is_cuda: + buf = get_new_cuda_buffer()(length) + else: + buf = np.empty((length,), dtype="u1") + f.pread(buf=buf, count=length, file_offset=file_offset, buf_offset=0) + file_offset += length + ret.append(buf) + else: + with open(header["path"], "rb") as f: + for length in header["frame-lengths"]: + ret.append(f.read(length)) return ret diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index 333546d8..7cbdd6b4 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -469,6 +469,9 @@ class ProxifyHostFile(MutableMapping): Enables spilling when the RMM memory pool goes out of memory. If ``None``, the "spill-on-demand" config value are used, which defaults to True. Notice, enabling this does nothing when RMM isn't availabe or not used. + gds_spilling: bool + Enable GPUDirect Storage spilling. If ``None``, the "gds-spilling" config + value are used, which defaults to ``False``. """ # Notice, we define the following as static variables because they are used by @@ -477,6 +480,8 @@ class ProxifyHostFile(MutableMapping): _spill_shared_filesystem: bool _spill_to_disk_prefix: str = f"spilled-data-{uuid.uuid4()}" _spill_to_disk_counter: int = 0 + _gds_enabled: bool = False + lock = threading.RLock() def __init__( @@ -488,10 +493,11 @@ def __init__( shared_filesystem: bool = None, compatibility_mode: bool = None, spill_on_demand: bool = None, + gds_spilling: bool = None, ): self.store: Dict[Hashable, Any] = {} self.manager = ProxyManager(device_memory_limit, memory_limit) - self.register_disk_spilling(local_directory, shared_filesystem) + self.register_disk_spilling(local_directory, shared_filesystem, gds_spilling) if compatibility_mode is None: self.compatibility_mode = dask.config.get( "jit-unspill-compatibility-mode", default=False @@ -625,7 +631,10 @@ def gen_file_path(cls) -> str: @classmethod def register_disk_spilling( - cls, local_directory: str = None, shared_filesystem: bool = None + cls, + local_directory: str = None, + shared_filesystem: bool = None, + gds: bool = None, ): """Register Dask serializers that writes to disk @@ -647,6 +656,9 @@ def register_disk_spilling( Whether the `local_directory` above is shared between all workers or not. If ``None``, the "jit-unspill-shared-fs" config value are used, which defaults to False. + gds: bool + Enable the use of GPUDirect Storage. If ``None``, the "gds-spilling" + config value are used, which defaults to ``False``. """ path = os.path.join( local_directory or dask.config.get("temporary-directory") or os.getcwd(), @@ -666,8 +678,28 @@ def register_disk_spilling( else: cls._spill_shared_filesystem = shared_filesystem + if gds is None: + gds = dask.config.get("gds-spilling", default=False) + if gds: + try: + import cucim.clara.filesystem as cucim_fs # noqa F401 + except ImportError: + raise ImportError("GPUDirect Storage requires the cucim Python package") + else: + cls._gds_enabled = bool(cucim_fs.is_gds_available()) + else: + cls._gds_enabled = False + def disk_dumps(x): - serialize_header, frames = serialize_and_split(x, on_error="raise") + # When using GDS, we prepend "cuda" to serializers to keep the CUDA + # objects on the GPU. Otherwise the "dask" or "pickle" serializer will + # copy everything to host memory. + serializers = ["dask", "pickle"] + if cls._gds_enabled: + serializers = ["cuda"] + serializers + serialize_header, frames = serialize_and_split( + x, serializers=serializers, on_error="raise" + ) if frames: compression, frames = zip(*map(maybe_compress, frames)) else: @@ -681,6 +713,7 @@ def disk_dumps(x): path=cls.gen_file_path(), frames=frames, shared_filesystem=cls._spill_shared_filesystem, + gds=cls._gds_enabled, ), "serialize-header": serialize_header, }, @@ -689,7 +722,7 @@ def disk_dumps(x): def disk_loads(header, frames): assert frames == [] - frames = disk_read(header["disk-io-header"]) + frames = disk_read(header["disk-io-header"], gds=cls._gds_enabled) os.remove(header["disk-io-header"]["path"]) if "compression" in header["serialize-header"]: frames = decompress(header["serialize-header"], frames) diff --git a/dask_cuda/tests/test_gds.py b/dask_cuda/tests/test_gds.py new file mode 100644 index 00000000..b68f132e --- /dev/null +++ b/dask_cuda/tests/test_gds.py @@ -0,0 +1,37 @@ +import pytest + +from distributed.protocol.serialize import deserialize, serialize + +from dask_cuda.proxify_host_file import ProxifyHostFile + + +@pytest.mark.parametrize("cuda_lib", ["cupy", "cudf", "numba.cuda"]) +@pytest.mark.parametrize("gds_enabled", [True, False]) +def test_gds(gds_enabled, cuda_lib): + lib = pytest.importorskip(cuda_lib) + if cuda_lib == "cupy": + data_create = lambda: lib.arange(10) + data_compare = lambda x, y: all(x == y) + elif cuda_lib == "cudf": + data_create = lambda: lib.Series(range(10)) + data_compare = lambda x, y: all((x == y).values_host) + elif cuda_lib == "numba.cuda": + data_create = lambda: lib.to_device(range(10)) + data_compare = lambda x, y: all(x.copy_to_host() == y.copy_to_host()) + + try: + ProxifyHostFile.register_disk_spilling(gds=gds_enabled) + if gds_enabled and not ProxifyHostFile._gds_enabled: + pytest.importorskip("cucim.clara.filesystem") + # In this case, we know that cucim is available but GDS + # isn't installed on the system. For testing, we enable + # the use of the cucim anyways. + ProxifyHostFile._gds_enabled = True + + a = data_create() + header, frames = serialize(a, serializers=("disk",)) + b = deserialize(header, frames) + assert type(a) == type(b) + assert data_compare(a, b) + finally: + ProxifyHostFile.register_disk_spilling() # Reset disk spilling options From 796dc1bb2495eb644a4411ae6ed9dc138dba2b15 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 23 Nov 2021 20:25:50 +0100 Subject: [PATCH 07/21] Unpin Dask and Distributed versions (#810) Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/dask-cuda/pull/810 --- ci/gpu/build.sh | 2 +- conda/recipes/dask-cuda/meta.yaml | 4 ++-- requirements.txt | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 357c919d..16a4f887 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -33,7 +33,7 @@ export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 # Install dask and distributed from master branch. Usually needed during # development time and disabled before a new dask-cuda release. -export INSTALL_DASK_MASTER=0 +export INSTALL_DASK_MASTER=1 ################################################################################ # SETUP - Check environment diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 12d5ca64..fac93448 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -27,8 +27,8 @@ requirements: - setuptools run: - python - - dask>=2021.11.1,<=2021.11.2 - - distributed>=2021.11.1,<=2021.11.2 + - dask>=2021.11.1 + - distributed>=2021.11.1 - pynvml >=8.0.3 - numpy >=1.16.0 - numba >=0.53.1 diff --git a/requirements.txt b/requirements.txt index 169b81b2..f0d36eac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ -dask>=2021.11.1,<=2021.11.2 -distributed>=2021.11.1,<=2021.11.2 +dask>=2021.11.1 +distributed>=2021.11.1 pynvml>=11.0.0 numpy>=1.16.0 numba>=0.53.1 From f1b0e278fadff4c6b8e89a2f21d3f658503d2aeb Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 23 Nov 2021 23:10:36 +0100 Subject: [PATCH 08/21] Clarify `LocalCUDACluster`'s `n_workers` docstrings (#812) Current docstrings for `LocalCUDACluster`'s `n_workers` is not really clear on the behavior when `CUDA_VISIBLE_DEVICES` is also specified, this change elaborates on it. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Benjamin Zaitlen (https://github.com/quasiben) URL: https://github.com/rapidsai/dask-cuda/pull/812 --- dask_cuda/local_cuda_cluster.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 9ccf6b1d..02c8a2e5 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -59,8 +59,10 @@ class LocalCUDACluster(LocalCluster): ``[0, 1, 2, 3]``), or ``None`` to use all available GPUs. n_workers : int or None, default None Number of workers. Can be an integer or ``None`` to fall back on the GPUs - specified by ``CUDA_VISIBLE_DEVICES``. Will override the value of - ``CUDA_VISIBLE_DEVICES`` if specified. + specified by ``CUDA_VISIBLE_DEVICES``. The value of ``n_workers`` must be + smaller or equal to the number of GPUs specified in ``CUDA_VISIBLE_DEVICES`` + when the latter is specified, and if smaller, only the first ``n_workers`` GPUs + will be used. threads_per_worker : int, default 1 Number of threads to be used for each Dask worker process. memory_limit : int, float, str, or None, default "auto" From e4a7754f3013daa9077e23cf6cb0073db92f54f8 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 29 Nov 2021 22:18:00 +0100 Subject: [PATCH 09/21] Simplify UCX configs, permitting UCX_TLS=all (#792) Up until now, we require users to specify what transports should be used by UCX, pushing the configuration burden onto the user, being also error-prone. We can now reduce this configuration burden with just one configuration being added in https://github.com/dask/distributed/pull/5526: `DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT`/`distributed.comm.ucx.create_cuda_context`, which creates the CUDA context _before_ UCX is initialized. This is an example of how to setup a cluster with `dask-cuda-worker` after this change: ``` # Scheduler UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True dask-scheduler --protocol ucx --interface ib0 # Workers UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda dask-cuda-worker ucx://${SCHEDULER_IB0_IP}:8786 --interface ib0 --rmm-pool-size 29GiB # Client UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True python client.py ``` Similarly, one can setup: `LocalCUDACluster(protocol="ucx", interface="ib0")`. Note above how `ib0` is intentionally specified. That is mandatory to use RDMACM, as it is necessary to have listeners bind to an InfiniBand interface, but can be left unspecified when using systems without InfiniBand or if RDMACM isn't required (discouraged on systems that have InfiniBand connectivity). The `UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda` option is specified for optimal InfiniBand performance with CUDA and will be default in UCX 1.12, when specifying it won't be necessary anymore. Changes introduced here are backwards-compatible, meaning the old options such as `--enable-nvlink`/`enable_nvlink=True` are still valid. However, if any of those options is specified, the user is responsible to enable/disable all desired transports, which can also be useful for benchmarking specific transports. Finally, creating a CUDA context may not be necessary by UCX in the future, at a point where it will be possible to remove `DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True` from scheduler/client processes entirely. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/dask-cuda/pull/792 --- dask_cuda/benchmarks/utils.py | 35 ++++------ dask_cuda/cli/dask_cuda_worker.py | 12 ++-- dask_cuda/cuda_worker.py | 8 +-- dask_cuda/initialize.py | 20 +++--- dask_cuda/local_cuda_cluster.py | 20 +++--- dask_cuda/tests/test_dgx.py | 72 ++++++++++++------- dask_cuda/tests/test_initialize.py | 43 +++++++++++- dask_cuda/tests/test_utils.py | 13 ++++ dask_cuda/utils.py | 28 ++++---- docs/source/examples/ucx.rst | 107 +++++++++++++++++++++++++++-- docs/source/ucx.rst | 14 +++- 11 files changed, 273 insertions(+), 99 deletions(-) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 684ac0df..29945757 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -66,24 +66,28 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] ) parser.add_argument( "--enable-tcp-over-ucx", + default=None, action="store_true", dest="enable_tcp_over_ucx", help="Enable TCP over UCX.", ) parser.add_argument( "--enable-infiniband", + default=None, action="store_true", dest="enable_infiniband", help="Enable InfiniBand over UCX.", ) parser.add_argument( "--enable-nvlink", + default=None, action="store_true", dest="enable_nvlink", help="Enable NVLink over UCX.", ) parser.add_argument( "--enable-rdmacm", + default=None, action="store_true", dest="enable_rdmacm", help="Enable RDMACM with UCX.", @@ -181,20 +185,8 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] name = [name] parser.add_argument(*name, **args) - parser.set_defaults( - enable_tcp_over_ucx=True, - enable_infiniband=True, - enable_nvlink=True, - enable_rdmacm=False, - ) args = parser.parse_args() - if args.protocol == "tcp": - args.enable_tcp_over_ucx = False - args.enable_infiniband = False - args.enable_nvlink = False - args.enable_rdmacm = False - if args.multi_node and len(args.hosts.split(",")) < 2: raise ValueError("--multi-node requires at least 2 hosts") @@ -202,6 +194,14 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] def get_cluster_options(args): + ucx_options = { + "ucx_net_devices": args.ucx_net_devices, + "enable_tcp_over_ucx": args.enable_tcp_over_ucx, + "enable_infiniband": args.enable_infiniband, + "enable_nvlink": args.enable_nvlink, + "enable_rdmacm": args.enable_rdmacm, + } + if args.multi_node is True: Cluster = SSHCluster cluster_args = [args.hosts.split(",")] @@ -214,11 +214,6 @@ def get_cluster_options(args): "worker_options": { "protocol": args.protocol, "nthreads": args.threads_per_worker, - "net_devices": args.ucx_net_devices, - "enable_tcp_over_ucx": args.enable_tcp_over_ucx, - "enable_infiniband": args.enable_infiniband, - "enable_nvlink": args.enable_nvlink, - "enable_rdmacm": args.enable_rdmacm, "interface": args.interface, "device_memory_limit": args.device_memory_limit, }, @@ -234,13 +229,9 @@ def get_cluster_options(args): "n_workers": len(args.devs.split(",")), "threads_per_worker": args.threads_per_worker, "CUDA_VISIBLE_DEVICES": args.devs, - "ucx_net_devices": args.ucx_net_devices, - "enable_tcp_over_ucx": args.enable_tcp_over_ucx, - "enable_infiniband": args.enable_infiniband, - "enable_nvlink": args.enable_nvlink, - "enable_rdmacm": args.enable_rdmacm, "interface": args.interface, "device_memory_limit": args.device_memory_limit, + **ucx_options, } if args.no_silence_logs: cluster_kwargs["silence_logs"] = False diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 35bb703e..4f1ac6c1 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -210,28 +210,28 @@ ) @click.option( "--enable-tcp-over-ucx/--disable-tcp-over-ucx", - default=False, + default=None, show_default=True, help="""Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.""", ) @click.option( "--enable-infiniband/--disable-infiniband", - default=False, + default=None, show_default=True, help="""Set environment variables to enable UCX over InfiniBand, implies - ``--enable-tcp-over-ucx``.""", + ``--enable-tcp-over-ucx`` when enabled.""", ) @click.option( "--enable-nvlink/--disable-nvlink", - default=False, + default=None, show_default=True, help="""Set environment variables to enable UCX over NVLink, implies - ``--enable-tcp-over-ucx``.""", + ``--enable-tcp-over-ucx`` when enabled.""", ) @click.option( "--enable-rdmacm/--disable-rdmacm", - default=False, + default=None, show_default=True, help="""Set environment variables to enable UCX RDMA connection manager support, requires ``--enable-infiniband``.""", diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index b9562461..56a6dcdb 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -72,10 +72,10 @@ def __init__( preload=[], dashboard_prefix=None, security=None, - enable_tcp_over_ucx=False, - enable_infiniband=False, - enable_nvlink=False, - enable_rdmacm=False, + enable_tcp_over_ucx=None, + enable_infiniband=None, + enable_nvlink=None, + enable_rdmacm=None, net_devices=None, jit_unspill=None, worker_class=None, diff --git a/dask_cuda/initialize.py b/dask_cuda/initialize.py index 38f403e3..0f78470c 100644 --- a/dask_cuda/initialize.py +++ b/dask_cuda/initialize.py @@ -69,10 +69,10 @@ def _create_cuda_context(): def initialize( create_cuda_context=True, - enable_tcp_over_ucx=False, - enable_infiniband=False, - enable_nvlink=False, - enable_rdmacm=False, + enable_tcp_over_ucx=None, + enable_infiniband=None, + enable_nvlink=None, + enable_rdmacm=None, net_devices="", cuda_device_index=None, ): @@ -107,16 +107,16 @@ def initialize( ---------- create_cuda_context : bool, default True Create CUDA context on initialization. - enable_tcp_over_ucx : bool, default False + enable_tcp_over_ucx : bool, default None Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled. - enable_infiniband : bool, default False + enable_infiniband : bool, default None Set environment variables to enable UCX over InfiniBand, implies - ``enable_tcp_over_ucx=True``. - enable_nvlink : bool, default False + ``enable_tcp_over_ucx=True`` when ``True``. + enable_nvlink : bool, default None Set environment variables to enable UCX over NVLink, implies - ``enable_tcp_over_ucx=True``. - enable_rdmacm : bool, default False + ``enable_tcp_over_ucx=True`` when ``True``. + enable_rdmacm : bool, default None Set environment variables to enable UCX RDMA connection manager support, requires ``enable_infiniband=True``. net_devices : str or callable, default "" diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 02c8a2e5..30d48f0d 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -88,16 +88,16 @@ class LocalCUDACluster(LocalCluster): protocol : str or None, default None Protocol to use for communication. Can be a string (like ``"tcp"`` or ``"ucx"``), or ``None`` to automatically choose the correct protocol. - enable_tcp_over_ucx : bool, default False + enable_tcp_over_ucx : bool, default None Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled. - enable_infiniband : bool, default False + enable_infiniband : bool, default None Set environment variables to enable UCX over InfiniBand, requires - ``protocol="ucx"`` and implies ``enable_tcp_over_ucx=True``. - enable_nvlink : bool, default False + ``protocol="ucx"`` and implies ``enable_tcp_over_ucx=True`` when ``True``. + enable_nvlink : bool, default None Set environment variables to enable UCX over NVLink, requires ``protocol="ucx"`` - and implies ``enable_tcp_over_ucx=True``. - enable_rdmacm : bool, default False + and implies ``enable_tcp_over_ucx=True`` when ``True``. + enable_rdmacm : bool, default None Set environment variables to enable UCX RDMA connection manager support, requires ``protocol="ucx"`` and ``enable_infiniband=True``. ucx_net_devices : str, callable, or None, default None @@ -190,10 +190,10 @@ def __init__( local_directory=None, shared_filesystem=None, protocol=None, - enable_tcp_over_ucx=False, - enable_infiniband=False, - enable_nvlink=False, - enable_rdmacm=False, + enable_tcp_over_ucx=None, + enable_infiniband=None, + enable_nvlink=None, + enable_rdmacm=None, ucx_net_devices=None, rmm_pool_size=None, rmm_managed_memory=False, diff --git a/dask_cuda/tests/test_dgx.py b/dask_cuda/tests/test_dgx.py index 1e0527a3..c9facb53 100644 --- a/dask_cuda/tests/test_dgx.py +++ b/dask_cuda/tests/test_dgx.py @@ -14,7 +14,7 @@ from dask_cuda import LocalCUDACluster from dask_cuda.initialize import initialize -from dask_cuda.utils import _ucx_110, wait_workers +from dask_cuda.utils import _ucx_110, _ucx_111, wait_workers mp = mp.get_context("spawn") # type: ignore psutil = pytest.importorskip("psutil") @@ -167,28 +167,40 @@ def _test_ucx_infiniband_nvlink(enable_infiniband, enable_nvlink, enable_rdmacm) net_devices = _get_dgx_net_devices() openfabrics_devices = [d.split(",")[0] for d in net_devices] - ucx_net_devices = None - if enable_infiniband and not _ucx_110: - ucx_net_devices = "auto" - if _ucx_110 is True: - cm_tls = ["tcp"] - if enable_rdmacm is True: - cm_tls_priority = "rdmacm" - else: - cm_tls_priority = "tcp" + if enable_infiniband is None and enable_nvlink is None and enable_rdmacm is None: + if _ucx_110 is False: + pytest.skip( + "Specifying transports is required on UCX < 1.10", + allow_module_level=True, + ) + enable_tcp_over_ucx = None + cm_tls = ["all"] + cm_tls_priority = ["rdmacm", "tcp", "sockcm"] else: - cm_tls = ["tcp"] - if enable_rdmacm is True: - cm_tls.append("rdmacm") - cm_tls_priority = "rdmacm" + if enable_infiniband and not _ucx_110: + ucx_net_devices = "auto" + + enable_tcp_over_ucx = True + + if _ucx_110 is True: + cm_tls = ["tcp"] + if enable_rdmacm is True: + cm_tls_priority = ["rdmacm"] + else: + cm_tls_priority = ["tcp"] else: - cm_tls.append("sockcm") - cm_tls_priority = "sockcm" + cm_tls = ["tcp"] + if enable_rdmacm is True: + cm_tls.append("rdmacm") + cm_tls_priority = ["rdmacm"] + else: + cm_tls.append("sockcm") + cm_tls_priority = ["sockcm"] initialize( - enable_tcp_over_ucx=True, + enable_tcp_over_ucx=enable_tcp_over_ucx, enable_infiniband=enable_infiniband, enable_nvlink=enable_nvlink, enable_rdmacm=enable_rdmacm, @@ -196,7 +208,7 @@ def _test_ucx_infiniband_nvlink(enable_infiniband, enable_nvlink, enable_rdmacm) with LocalCUDACluster( interface="ib0", - enable_tcp_over_ucx=True, + enable_tcp_over_ucx=enable_tcp_over_ucx, enable_infiniband=enable_infiniband, enable_nvlink=enable_nvlink, enable_rdmacm=enable_rdmacm, @@ -211,14 +223,15 @@ def _test_ucx_infiniband_nvlink(enable_infiniband, enable_nvlink, enable_rdmacm) def check_ucx_options(): conf = ucp.get_config() assert "TLS" in conf - assert "tcp" in conf["TLS"] - assert "cuda_copy" in conf["TLS"] assert all(t in conf["TLS"] for t in cm_tls) - assert cm_tls_priority in conf["SOCKADDR_TLS_PRIORITY"] - if enable_nvlink: - assert "cuda_ipc" in conf["TLS"] - if enable_infiniband: - assert "rc" in conf["TLS"] + assert all(p in conf["SOCKADDR_TLS_PRIORITY"] for p in cm_tls_priority) + if cm_tls != ["all"]: + assert "tcp" in conf["TLS"] + assert "cuda_copy" in conf["TLS"] + if enable_nvlink: + assert "cuda_ipc" in conf["TLS"] + if enable_infiniband: + assert "rc" in conf["TLS"] return True if ucx_net_devices == "auto": @@ -240,6 +253,7 @@ def check_ucx_options(): {"enable_infiniband": True, "enable_nvlink": True, "enable_rdmacm": False}, {"enable_infiniband": True, "enable_nvlink": False, "enable_rdmacm": True}, {"enable_infiniband": True, "enable_nvlink": True, "enable_rdmacm": True}, + {"enable_infiniband": None, "enable_nvlink": None, "enable_rdmacm": None}, ], ) @pytest.mark.skipif( @@ -249,6 +263,14 @@ def check_ucx_options(): def test_ucx_infiniband_nvlink(params): ucp = pytest.importorskip("ucp") # NOQA: F841 + if ( + not _ucx_111 + and params["enable_infiniband"] is None + and params["enable_nvlink"] is None + and params["enable_rdmacm"] is None + ): + pytest.skip("Automatic configuration not supported in UCX < 1.11") + if params["enable_infiniband"]: if not any([at.startswith("rc") for at in ucp.get_active_transports()]): pytest.skip("No support available for 'rc' transport in UCX") diff --git a/dask_cuda/tests/test_initialize.py b/dask_cuda/tests/test_initialize.py index c32ceccd..7f62bf04 100644 --- a/dask_cuda/tests/test_initialize.py +++ b/dask_cuda/tests/test_initialize.py @@ -9,7 +9,7 @@ from distributed.deploy.local import LocalCluster from dask_cuda.initialize import initialize -from dask_cuda.utils import _ucx_110, get_ucx_config +from dask_cuda.utils import _ucx_110, _ucx_111, get_ucx_config mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") @@ -143,3 +143,44 @@ def test_initialize_ucx_infiniband(): p.start() p.join() assert not p.exitcode + + +def _test_initialize_ucx_all(): + initialize() + with LocalCluster( + protocol="ucx", + dashboard_address=None, + n_workers=1, + threads_per_worker=1, + processes=True, + config={"distributed.comm.ucx": get_ucx_config()}, + ) as cluster: + with Client(cluster) as client: + res = da.from_array(numpy.arange(10000), chunks=(1000,)) + res = res.sum().compute() + assert res == 49995000 + + def check_ucx_options(): + conf = ucp.get_config() + assert "TLS" in conf + assert conf["TLS"] == "all" + assert all( + [ + p in conf["SOCKADDR_TLS_PRIORITY"] + for p in ["rdmacm", "tcp", "sockcm"] + ] + ) + return True + + assert client.run_on_scheduler(check_ucx_options) is True + assert all(client.run(check_ucx_options).values()) + + +@pytest.mark.skipif( + not _ucx_111, reason="Automatic configuration not supported in UCX < 1.11", +) +def test_initialize_ucx_all(): + p = mp.Process(target=_test_initialize_ucx_all) + p.start() + p.join() + assert not p.exitcode diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index 208978c3..9a2a5f1b 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -81,6 +81,17 @@ def test_get_device_total_memory(): assert total_mem > 0 +def test_get_preload_options_default(): + pytest.importorskip("ucp") + + opts = get_preload_options(protocol="ucx", create_cuda_context=True,) + + assert "preload" in opts + assert opts["preload"] == ["dask_cuda.initialize"] + assert "preload_argv" in opts + assert opts["preload_argv"] == ["--create-cuda-context"] + + @pytest.mark.parametrize("enable_tcp", [True, False]) @pytest.mark.parametrize( "enable_infiniband_netdev", @@ -178,6 +189,8 @@ def test_get_ucx_config(enable_tcp_over_ucx, enable_infiniband, net_devices): else: ucx_config = get_ucx_config(**kwargs) + assert ucx_config[canonical_name("create_cuda_context", ucx_config)] is True + assert ucx_config[canonical_name("tcp", ucx_config)] is enable_tcp_over_ucx assert ucx_config[canonical_name("infiniband", ucx_config)] is enable_infiniband diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 8729bc5f..a50ebd88 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -290,7 +290,7 @@ def get_ucx_config( enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, - net_devices="", + net_devices=None, cuda_device_index=None, ): if net_devices == "auto" and enable_infiniband is False: @@ -323,11 +323,11 @@ def get_ucx_config( def get_preload_options( protocol=None, - create_cuda_context=False, - enable_tcp_over_ucx=False, - enable_infiniband=False, - enable_nvlink=False, - enable_rdmacm=False, + create_cuda_context=None, + enable_tcp_over_ucx=None, + enable_infiniband=None, + enable_nvlink=None, + enable_rdmacm=None, ucx_net_devices="", cuda_device_index=0, ): @@ -337,29 +337,29 @@ def get_preload_options( Parameters ---------- - protocol: None or str + protocol: None or str, default None If "ucx", options related to UCX (enable_tcp_over_ucx, enable_infiniband, enable_nvlink and ucx_net_devices) are added to preload_argv. - create_cuda_context: bool + create_cuda_context: bool, default None Ensure the CUDA context gets created at initialization, generally needed by Dask workers. - enable_tcp: bool + enable_tcp: bool, default None Set environment variables to enable TCP over UCX, even when InfiniBand or NVLink support are disabled. - enable_infiniband: bool + enable_infiniband: bool, default None Set environment variables to enable UCX InfiniBand support. Implies enable_tcp=True. - enable_rdmacm: bool + enable_rdmacm: bool, default None Set environment variables to enable UCX RDMA connection manager support. Currently requires enable_infiniband=True. - enable_nvlink: bool + enable_nvlink: bool, default None Set environment variables to enable UCX NVLink support. Implies enable_tcp=True. - ucx_net_devices: str or callable + ucx_net_devices: str or callable, default "" A string with the interface name to be used for all devices (empty string means use default), or a callable function taking an integer identifying the GPU index. - cuda_device_index: int + cuda_device_index: int, default 0 The index identifying the CUDA device used by this worker, only used when ucx_net_devices is callable. diff --git a/docs/source/examples/ucx.rst b/docs/source/examples/ucx.rst index 036b9929..cce76e85 100644 --- a/docs/source/examples/ucx.rst +++ b/docs/source/examples/ucx.rst @@ -4,10 +4,32 @@ Enabling UCX communication A CUDA cluster using UCX communication can be started automatically with LocalCUDACluster or manually with the ``dask-cuda-worker`` CLI tool. In either case, a ``dask.distributed.Client`` must be made for the worker cluster using the same Dask UCX configuration; see `UCX Integration -- Configuration <../ucx.html#configuration>`_ for details on all available options. -LocalCUDACluster ----------------- +LocalCUDACluster with Automatic Configuration +--------------------------------------------- -When using LocalCUDACluster with UCX communication, all required UCX configuration is handled through arguments supplied at construction; see `API -- Cluster <../api.html#cluster>`_ for a complete list of these arguments. +Automatic configuration was introduced in Dask-CUDA 22.20 and requires UCX >= 1.11.1. This allows the user to specify only the UCX protocol and let UCX decide which transports to use. + +To connect a client to a cluster with automatically-configured UCX and an RMM pool: + +.. code-block:: python + + from dask.distributed import Client + from dask_cuda import LocalCUDACluster + + cluster = LocalCUDACluster( + protocol="ucx", + interface="ib0", + rmm_pool_size="1GB" + ) + client = Client(cluster) + +.. note:: + The ``interface="ib0"`` is intentionally specified above to ensure RDMACM is used in systems that support InfiniBand. On systems that don't support InfiniBand or where RDMACM isn't required, the ``interface`` argument may be omitted or specified to listen on a different interface. + +LocalCUDACluster with Manual Configuration +------------------------------------------ + +When using LocalCUDACluster with UCX communication and manual configuration, all required UCX configuration is handled through arguments supplied at construction; see `API -- Cluster <../api.html#cluster>`_ for a complete list of these arguments. To connect a client to a cluster with all supported transports and an RMM pool: .. code-block:: python @@ -29,10 +51,83 @@ To connect a client to a cluster with all supported transports and an RMM pool: .. note:: For UCX 1.9 (deprecated) and older, it's necessary to pass ``ucx_net_devices="auto"`` to ``LocalCUDACluster``. UCX 1.11 and above is capable of selecting InfiniBand devices automatically. -dask-cuda-worker ----------------- +dask-cuda-worker with Automatic Configuration +------------------------------------------ + +When using ``dask-cuda-worker`` with UCX communication and automatic configuration, the scheduler, workers, and client must all be started manually, but without specifying any UCX transports explicitly. This is only supported in Dask-CUDA 22.02 and newer and requires UCX >= 1.11.1. + +Scheduler +^^^^^^^^^ + +For automatic UCX configuration, we must ensure a CUDA context is created on the scheduler before UCX is initialized. This can be satisfied by specifying the ``DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True`` environment variable when creating the scheduler. + +To start a Dask scheduler using UCX with automatic configuration and one GB of RMM pool: + +.. code-block:: bash + + $ DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True \ + > DASK_DISTRIBUTED__RMM__POOL_SIZE=1GB \ + > dask-scheduler --protocol ucx --interface ib0 + +.. note:: + The ``interface="ib0"`` is intentionally specified above to ensure RDMACM is used in systems that support InfiniBand. On systems that don't support InfiniBand or where RDMACM isn't required, the ``interface`` argument may be omitted or specified to listen on a different interface. + + We specify ``UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda`` above for optimal performance with InfiniBand, see details `here `_. If not using InfiniBand, that option may be omitted. In UCX 1.12 and newer, that option is default and may be omitted as well even when using InfiniBand. + +Workers +^^^^^^^ + +To start workers with automatic UCX configuration and an RMM pool of 14GB per GPU: + +.. code-block:: bash + + $ UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda + > dask-cuda-worker ucx://:8786 \ + > --rmm-pool-size="14GB" \ + > --interface="ib0" + +.. note:: + Analogous to the scheduler setup, the ``interface="ib0"`` is intentionally specified above to ensure RDMACM is used in systems that support InfiniBand. On systems that don't support InfiniBand or where RDMACM isn't required, the ``interface`` argument may be omitted or specified to listen on a different interface. + + We specify ``UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda`` above for optimal performance with InfiniBand, see details `here `_. If not using InfiniBand, that option may be omitted. In UCX 1.12 and newer, that option is default and may be omitted as well even when using InfiniBand. + +Client +^^^^^^ + +To connect a client to the cluster with automatic UCX configuration we started: + +.. code-block:: python + + import os + + os.environ["UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES"] = "cuda" + + import dask + from dask.distributed import Client + + with dask.config.set({"distributed.comm.ucx.create_cuda_context": True}): + client = Client("ucx://:8786") + +Alternatively, the ``with dask.config.set`` statement from the example above may be omitted and the ``DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True`` environment variable specified instead: + +.. code-block:: python + + import os + + os.environ["UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES"] = "cuda" + os.environ["DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT"] = "True" + + from dask.distributed import Client + + client = Client("ucx://:8786") + +.. note:: + We specify ``UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda`` above for optimal performance with InfiniBand, see details `here `_. If not using InfiniBand, that option may be omitted. In UCX 1.12 and newer, that option is default and may be omitted as well even when using InfiniBand. + +dask-cuda-worker with Manual Configuration +------------------------------------------ -When using ``dask-cuda-worker`` with communication, the scheduler, workers, and client must all be started manually, each using the same UCX configuration. +When using ``dask-cuda-worker`` with UCX communication and manual configuration, the scheduler, workers, and client must all be started manually, each using the same UCX configuration. Scheduler ^^^^^^^^^ diff --git a/docs/source/ucx.rst b/docs/source/ucx.rst index 4246f541..d895d1ef 100644 --- a/docs/source/ucx.rst +++ b/docs/source/ucx.rst @@ -23,7 +23,19 @@ A memory pool also prevents the Dask scheduler from deserializing CUDA data, whi Configuration ------------- -In addition to installations of UCX and UCX-Py on your system, several options must be specified within your Dask configuration to enable the integration. +Automatic +~~~~~~~~~ + +Beginning with Dask-CUDA 22.02 and assuming UCX >= 1.11.1, specifying UCX transports is now optional. + +A local cluster can now be started with ``LocalCUDACluster(protocol="ucx")``, implying automatic UCX transport selection (``UCX_TLS=all``). Starting a cluster separately -- scheduler, workers and client as different processes -- is also possible, as long as Dask scheduler is created with ``dask-scheduler --protocol="ucx"`` and connecting a ``dask-cuda-worker`` to the scheduler will imply automatic UCX transport selection, but that requires the Dask scheduler and client to be started with ``DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True``. See `Enabling UCX communication `_ for more details examples of UCX usage with automatic configuration. + +Configuring transports manually is still possible, please refer to the subsection below. + +Manual +~~~~~~ + +In addition to installations of UCX and UCX-Py on your system, for manual configuration several options must be specified within your Dask configuration to enable the integration. Typically, these will affect ``UCX_TLS`` and ``UCX_SOCKADDR_TLS_PRIORITY``, environment variables used by UCX to decide what transport methods to use and which to prioritize, respectively. However, some will affect related libraries, such as RMM: From 2e9a429a1bf838d5d4944c1c1421685bb19eef7e Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 3 Dec 2021 08:18:24 +0100 Subject: [PATCH 10/21] Fix skipping GDS test if cucim is not installed (#813) When `ProxifyHostFile.register_disk_spilling()` is configured with `gds=True` an `ImportError` is raised and the test fails. To fix that we force enabling cucim (if available) after disk spilling is configured, or skip the test if cucim is unavailable. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/dask-cuda/pull/813 --- dask_cuda/tests/test_gds.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dask_cuda/tests/test_gds.py b/dask_cuda/tests/test_gds.py index b68f132e..cc703ef4 100644 --- a/dask_cuda/tests/test_gds.py +++ b/dask_cuda/tests/test_gds.py @@ -20,12 +20,11 @@ def test_gds(gds_enabled, cuda_lib): data_compare = lambda x, y: all(x.copy_to_host() == y.copy_to_host()) try: - ProxifyHostFile.register_disk_spilling(gds=gds_enabled) + ProxifyHostFile.register_disk_spilling() if gds_enabled and not ProxifyHostFile._gds_enabled: pytest.importorskip("cucim.clara.filesystem") - # In this case, we know that cucim is available but GDS - # isn't installed on the system. For testing, we enable - # the use of the cucim anyways. + # In this case, we know that cucim is available and for testing + # we enable cucim explicitly even if GDS is unavailable. ProxifyHostFile._gds_enabled = True a = data_create() From 55edb303f905b1e5f54d4ea9aa07303ade66de79 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 8 Dec 2021 18:32:56 +0100 Subject: [PATCH 11/21] Fix regex pattern to match to in test_on_demand_debug_info (#819) The string for bad allocation from RMM has changed from to include the type of error, now `std::bad_alloc: out_of_memory: CUDA error a`, previously `std::bad_alloc: CUDA error at`, the regex change included here matches both cases now. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/dask-cuda/pull/819 --- dask_cuda/tests/test_proxify_host_file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 953694c3..c4031e7e 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -429,7 +429,7 @@ def task(): # Submit too large RMM buffer with pytest.raises( - MemoryError, match=r".*std::bad_alloc: CUDA error at:.*" + MemoryError, match=r".*std::bad_alloc:.*CUDA error at:.*" ): client.submit(task).result() From 1fab44b619cdd815ec1741b78c595eece45ee2cb Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 13 Dec 2021 18:02:57 +0100 Subject: [PATCH 12/21] Handle explicitly disabled UCX transports (#820) When UCX transports are explicitly disabled by the user, the other ones must be implicitly enabled. Since Distributed currently considers disabled (`False`) and default (`None`) similarly, we need to workaround that so that the user's choice prevail. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Benjamin Zaitlen (https://github.com/quasiben) URL: https://github.com/rapidsai/dask-cuda/pull/820 --- dask_cuda/tests/test_utils.py | 56 ++++++++++++++++++++++++++++++----- dask_cuda/utils.py | 17 +++++++++++ 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index 9a2a5f1b..d7d60a22 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -170,15 +170,19 @@ def test_get_ucx_net_devices_auto(): get_ucx_net_devices(idx, "auto") -@pytest.mark.parametrize("enable_tcp_over_ucx", [True, False]) -@pytest.mark.parametrize("enable_infiniband", [True, False]) +@pytest.mark.parametrize("enable_tcp_over_ucx", [True, False, None]) +@pytest.mark.parametrize("enable_nvlink", [True, False, None]) +@pytest.mark.parametrize("enable_infiniband", [True, False, None]) @pytest.mark.parametrize("net_devices", ["eth0", "auto", ""]) -def test_get_ucx_config(enable_tcp_over_ucx, enable_infiniband, net_devices): +def test_get_ucx_config( + enable_tcp_over_ucx, enable_infiniband, enable_nvlink, net_devices +): pytest.importorskip("ucp") kwargs = { "enable_tcp_over_ucx": enable_tcp_over_ucx, "enable_infiniband": enable_infiniband, + "enable_nvlink": enable_nvlink, "net_devices": net_devices, "cuda_device_index": 0, } @@ -191,10 +195,48 @@ def test_get_ucx_config(enable_tcp_over_ucx, enable_infiniband, net_devices): assert ucx_config[canonical_name("create_cuda_context", ucx_config)] is True - assert ucx_config[canonical_name("tcp", ucx_config)] is enable_tcp_over_ucx - assert ucx_config[canonical_name("infiniband", ucx_config)] is enable_infiniband - - if enable_tcp_over_ucx is True or enable_infiniband is True: + if enable_tcp_over_ucx is not None: + assert ucx_config[canonical_name("tcp", ucx_config)] is enable_tcp_over_ucx + else: + if ( + enable_infiniband is not True + and enable_nvlink is not True + and not (enable_infiniband is None and enable_nvlink is None) + ): + assert ucx_config[canonical_name("tcp", ucx_config)] is True + else: + assert ucx_config[canonical_name("tcp", ucx_config)] is None + + if enable_infiniband is not None: + assert ucx_config[canonical_name("infiniband", ucx_config)] is enable_infiniband + else: + if ( + enable_tcp_over_ucx is not True + and enable_nvlink is not True + and not (enable_tcp_over_ucx is None and enable_nvlink is None) + ): + assert ucx_config[canonical_name("infiniband", ucx_config)] is True + else: + assert ucx_config[canonical_name("infiniband", ucx_config)] is None + + if enable_nvlink is not None: + assert ucx_config[canonical_name("nvlink", ucx_config)] is enable_nvlink + else: + if ( + enable_tcp_over_ucx is not True + and enable_infiniband is not True + and not (enable_tcp_over_ucx is None and enable_infiniband is None) + ): + assert ucx_config[canonical_name("nvlink", ucx_config)] is True + else: + assert ucx_config[canonical_name("nvlink", ucx_config)] is None + + if any( + opt is not None + for opt in [enable_tcp_over_ucx, enable_infiniband, enable_nvlink] + ) and not all( + opt is False for opt in [enable_tcp_over_ucx, enable_infiniband, enable_nvlink] + ): assert ucx_config[canonical_name("cuda-copy", ucx_config)] is True else: assert ucx_config[canonical_name("cuda-copy", ucx_config)] is None diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index a50ebd88..0d52c07e 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -304,6 +304,23 @@ def get_ucx_config( ucx_config[canonical_name("create-cuda-context", ucx_config)] = True ucx_config[canonical_name("reuse-endpoints", ucx_config)] = not _ucx_111 + # If any transport is explicitly disabled (`False`) by the user, others that + # are not specified should be enabled (`True`). If transports are explicitly + # enabled (`True`), then default (`None`) or an explicit `False` will suffice + # in disabling others. However, if there's a mix of enable (`True`) and + # disable (`False`), then those choices can be assumed as intended by the + # user. + # + # This may be handled more gracefully in Distributed in the future. + opts = [enable_tcp_over_ucx, enable_infiniband, enable_nvlink] + if any(opt is False for opt in opts) and not any(opt is True for opt in opts): + if enable_tcp_over_ucx is None: + enable_tcp_over_ucx = True + if enable_nvlink is None: + enable_nvlink = True + if enable_infiniband is None: + enable_infiniband = True + ucx_config[canonical_name("tcp", ucx_config)] = enable_tcp_over_ucx ucx_config[canonical_name("infiniband", ucx_config)] = enable_infiniband ucx_config[canonical_name("nvlink", ucx_config)] = enable_nvlink From ec1ec0dc18d7f4c6fd3b174b3f0b910737e55721 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 4 Jan 2022 17:20:05 +0100 Subject: [PATCH 13/21] Ignore `DeprecationWarning` from `distutils.Version` classes (#823) Ignore `DeprecationWarning`s being raised in `fsspec`: ```python dask_cuda/tests/test_dask_cuda_worker.py:11: in from distributed.utils_test import loop # noqa: F401 :991: in _find_and_load ??? :975: in _find_and_load_unlocked ??? :671: in _load_unlocked ??? ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/_pytest/assertion/rewrite.py:170: in exec_module exec(co, module.__dict__) ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/distributed/utils_test.py:80: in import dask.array # register config ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/dask/array/__init__.py:3: in from . import backends, fft, lib, linalg, ma, overlap, random ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/dask/array/backends.py:13: in from .percentile import _percentile ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/dask/array/percentile.py:10: in from .core import Array ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/dask/array/core.py:30: in from fsspec import get_mapper ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/fsspec/__init__.py:12: in from .core import get_fs_token_paths, open, open_files, open_local ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/fsspec/core.py:18: in from .compression import compr ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/fsspec/compression.py:6: in from fsspec.spec import AbstractBufferedFile ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/fsspec/spec.py:92: in if pa_version and LooseVersion(pa_version) < LooseVersion("2.0"): ../../miniconda3/envs/rn-115-22.02.220103/lib/python3.8/site-packages/setuptools/_distutils/version.py:53: in __init__ warnings.warn( E DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. ``` Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Benjamin Zaitlen (https://github.com/quasiben) URL: https://github.com/rapidsai/dask-cuda/pull/823 --- pytest.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/pytest.ini b/pytest.ini index 4f5f759b..f57ea2b7 100644 --- a/pytest.ini +++ b/pytest.ini @@ -4,3 +4,4 @@ filterwarnings = error::FutureWarning ignore::DeprecationWarning:pkg_resources ignore:Support for UCX 1.9.0 is deprecated.*:FutureWarning:ucp + ignore:distutils Version classes are deprecated.*:DeprecationWarning: From 172e4490ed1f3360cf0261be0fa62ad7baa91e7c Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 11 Jan 2022 09:42:34 +0100 Subject: [PATCH 14/21] sizeof test: increase tolerance (#825) Closes #824 by increasing the tolerance of `sizeof` test of `cudf.DataFrame`. In the test we clear the `ProxyObject` cache, which makes `sizeof` measure the serialized dataframe. I order to avoid deserializing on every `sizeof` call, we accept this discrepancy between `sizeof` serialized and deserialized objects. Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/825 --- dask_cuda/tests/test_proxy.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 970fbfd5..7fe461c9 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -605,6 +605,8 @@ def test_sizeof_cudf(): pxy._pxy_serialize(serializers=("dask",)) assert a_size == pytest.approx(sizeof(pxy)) assert pxy._pxy_get().is_serialized() + # By clearing the cache, `sizeof(pxy)` now measure the serialized data + # thus we have to increase the tolerance. pxy._pxy_cache = {} - assert a_size == pytest.approx(sizeof(pxy)) + assert a_size == pytest.approx(sizeof(pxy), rel=1e-2) assert pxy._pxy_get().is_serialized() From 823baec530429403974c19f4cab2b386b45e435c Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Wed, 12 Jan 2022 15:07:53 -0500 Subject: [PATCH 15/21] Add avg and std calculation for time and throughput (#828) PR adds AVG/STD calc for time and throughput ``` =============================== Wall-clock | Throughput ------------------------------- 294.73 ms | 82.83 MiB/s 220.04 ms | 110.95 MiB/s 247.50 ms | 98.64 MiB/s 214.36 ms | 113.89 MiB/s 224.59 ms | 108.71 MiB/s 231.22 ms | 105.59 MiB/s 228.30 ms | 106.94 MiB/s 296.44 ms | 82.36 MiB/s 226.30 ms | 107.89 MiB/s 221.86 ms | 110.04 MiB/s =============================== Throughput | 102.78 MiB +/- 10.78 MiB Wall-Clock | 240.53 ms +/- 28.74 ms ``` Authors: - Benjamin Zaitlen (https://github.com/quasiben) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/828 --- dask_cuda/benchmarks/local_cudf_merge.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index f36be747..74429518 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -288,16 +288,27 @@ def main(args): print(f"ib | {args.enable_infiniband}") print(f"nvlink | {args.enable_nvlink}") print(f"data-processed | {format_bytes(took_list[0][0])}") - print("===============================") + print("=" * 80) print("Wall-clock | Throughput") - print("-------------------------------") + print("-" * 80) + t_p = [] + times = [] for idx, (data_processed, took) in enumerate(took_list): throughput = int(data_processed / took) m = format_time(took) + times.append(took) + t_p.append(throughput) m += " " * (15 - len(m)) print(f"{m}| {format_bytes(throughput)}/s") t_runs[idx] = float(format_bytes(throughput).split(" ")[0]) - print("===============================") + t_p = numpy.asarray(t_p) + times = numpy.asarray(times) + print("=" * 80) + print(f"Throughput | {format_bytes(t_p.mean())} +/- {format_bytes(t_p.std()) }") + print( + f"Wall-Clock | {format_time(times.mean())} +/- {format_time(times.std()) }" + ) + print("=" * 80) if args.markdown: print("\n```") From 4fd5f647a98d888d8f95033e430ac6303fef72d7 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 13 Jan 2022 01:48:53 +0530 Subject: [PATCH 16/21] Expose rmm-maximum_pool_size argument (#827) This PR closes https://github.com/rapidsai/dask-cuda/issues/826 Authors: - Vibhu Jawa (https://github.com/VibhuJawa) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/827 --- dask_cuda/cli/dask_cuda_worker.py | 14 ++++++++++++ dask_cuda/cuda_worker.py | 10 ++++++++- dask_cuda/local_cuda_cluster.py | 15 +++++++++++++ dask_cuda/tests/test_local_cuda_cluster.py | 7 ++++++ dask_cuda/utils.py | 25 +++++++++++++++++----- 5 files changed, 65 insertions(+), 6 deletions(-) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 4f1ac6c1..8f50bdce 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -73,6 +73,18 @@ .. note:: This size is a per-worker configuration, and not cluster-wide.""", ) +@click.option( + "--rmm-maximum-pool-size", + default=None, + help="""When ``--rmm-pool-size`` is specified, this argument indicates the maximum pool size. + Can be an integer (bytes), string (like ``"5GB"`` or ``"5000M"``) or ``None``. + By default, the total available memory on the GPU is used. + ``rmm_pool_size`` must be specified to use RMM pool and + to set the maximum pool size. + + .. note:: + This size is a per-worker configuration, and not cluster-wide.""", +) @click.option( "--rmm-managed-memory/--no-rmm-managed-memory", default=False, @@ -277,6 +289,7 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, + rmm_maximum_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, @@ -327,6 +340,7 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, + rmm_maximum_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 56a6dcdb..9634c822 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -58,6 +58,7 @@ def __init__( memory_limit="auto", device_memory_limit="auto", rmm_pool_size=None, + rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_log_directory=None, @@ -152,6 +153,9 @@ def del_pid_file(): ) if rmm_pool_size is not None: rmm_pool_size = parse_bytes(rmm_pool_size) + if rmm_maximum_pool_size is not None: + rmm_maximum_pool_size = parse_bytes(rmm_maximum_pool_size) + else: if enable_nvlink: warnings.warn( @@ -239,7 +243,11 @@ def del_pid_file(): get_cpu_affinity(nvml_device_index(i, cuda_visible_devices(i))) ), RMMSetup( - rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, + rmm_pool_size, + rmm_maximum_pool_size, + rmm_managed_memory, + rmm_async, + rmm_log_directory, ), }, name=name if nprocs == 1 or name is None else str(name) + "-" + str(i), diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 30d48f0d..80fa225a 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -117,6 +117,16 @@ class LocalCUDACluster(LocalCluster): RMM pool size to initialize each worker with. Can be an integer (bytes), string (like ``"5GB"`` or ``"5000M"``), or ``None`` to disable RMM pools. + .. note:: + This size is a per-worker configuration, and not cluster-wide. + rmm_maximum_pool_size : int, str or None, default None + When ``rmm_pool_size`` is set, this argument indicates + the maximum pool size. + Can be an integer (bytes), string (like ``"5GB"`` or ``"5000M"``) or ``None``. + By default, the total available memory on the GPU is used. + ``rmm_pool_size`` must be specified to use RMM pool and + to set the maximum pool size. + .. note:: This size is a per-worker configuration, and not cluster-wide. rmm_managed_memory : bool, default False @@ -196,6 +206,7 @@ def __init__( enable_rdmacm=None, ucx_net_devices=None, rmm_pool_size=None, + rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_log_directory=None, @@ -230,6 +241,7 @@ def __init__( ) self.rmm_pool_size = rmm_pool_size + self.rmm_maximum_pool_size = rmm_maximum_pool_size self.rmm_managed_memory = rmm_managed_memory self.rmm_async = rmm_async if rmm_pool_size is not None or rmm_managed_memory: @@ -248,6 +260,8 @@ def __init__( ) if self.rmm_pool_size is not None: self.rmm_pool_size = parse_bytes(self.rmm_pool_size) + if self.rmm_maximum_pool_size is not None: + self.rmm_maximum_pool_size = parse_bytes(self.rmm_maximum_pool_size) else: if enable_nvlink: warnings.warn( @@ -397,6 +411,7 @@ def new_worker_spec(self): ), RMMSetup( self.rmm_pool_size, + self.rmm_maximum_pool_size, self.rmm_managed_memory, self.rmm_async, self.rmm_log_directory, diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index edf73e4e..17fe6d97 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -150,6 +150,13 @@ async def test_rmm_pool(): assert v is rmm.mr.PoolMemoryResource +@gen_test(timeout=20) +async def test_rmm_maximum_poolsize_without_poolsize_error(): + pytest.importorskip("rmm") + with pytest.raises(ValueError): + await LocalCUDACluster(rmm_maximum_pool_size="2GB", asynchronous=True) + + @gen_test(timeout=20) async def test_rmm_managed(): rmm = pytest.importorskip("rmm") diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 0d52c07e..2a74bc69 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -45,8 +45,22 @@ def setup(self, worker=None): class RMMSetup: - def __init__(self, nbytes, managed_memory, async_alloc, log_directory): - self.nbytes = nbytes + def __init__( + self, + initial_pool_size, + maximum_pool_size, + managed_memory, + async_alloc, + log_directory, + ): + if initial_pool_size is None and maximum_pool_size is not None: + raise ValueError( + "`rmm_maximum_pool_size` was specified without specifying " + "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." + ) + + self.initial_pool_size = initial_pool_size + self.maximum_pool_size = maximum_pool_size self.managed_memory = managed_memory self.async_alloc = async_alloc self.logging = log_directory is not None @@ -63,15 +77,16 @@ def setup(self, worker=None): worker, self.logging, self.log_directory ) ) - elif self.nbytes is not None or self.managed_memory: + elif self.initial_pool_size is not None or self.managed_memory: import rmm - pool_allocator = False if self.nbytes is None else True + pool_allocator = False if self.initial_pool_size is None else True rmm.reinitialize( pool_allocator=pool_allocator, managed_memory=self.managed_memory, - initial_pool_size=self.nbytes, + initial_pool_size=self.initial_pool_size, + maximum_pool_size=self.maximum_pool_size, logging=self.logging, log_file_name=get_rmm_log_file_name( worker, self.logging, self.log_directory From 5084147249f31a4480417c76cf75afceeb54b194 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 14 Jan 2022 14:34:44 +0100 Subject: [PATCH 17/21] Query UCX-Py from gpuCI versioning service (#818) Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) URL: https://github.com/rapidsai/dask-cuda/pull/818 --- ci/gpu/build.sh | 3 ++- ci/release/update-version.sh | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 16a4f887..6011aa2b 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -26,6 +26,7 @@ cd "$WORKSPACE" export GIT_DESCRIBE_TAG=`git describe --tags` export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` export UCX_PATH=$CONDA_PREFIX +export UCXPY_VERSION="0.24.*" # Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x, # will possibly be enabled by default starting on 1.17) @@ -57,7 +58,7 @@ conda list --show-channel-urls # Also installing cucim in order to test GDS spilling gpuci_mamba_retry install "cudatoolkit=$CUDA_REL" \ "cudf=${MINOR_VERSION}" "dask-cudf=${MINOR_VERSION}" \ - "ucx-py=0.24.*" "ucx-proc=*=gpu" \ + "ucx-py=${UCXPY_VERSION}" "ucx-proc=*=gpu" \ "rapids-build-env=$MINOR_VERSION.*" \ "cucim" diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index daa1b97f..afd907b5 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -22,6 +22,7 @@ CURRENT_SHORT_TAG=${CURRENT_MAJOR}.${CURRENT_MINOR} NEXT_MAJOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[1]}') NEXT_MINOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[2]}') NEXT_SHORT_TAG=${NEXT_MAJOR}.${NEXT_MINOR} +NEXT_UCXPY_VERSION="$(curl -s https://version.gpuci.io/rapids/${NEXT_SHORT_TAG}).*" echo "Preparing release $CURRENT_TAG => $NEXT_FULL_TAG" @@ -30,5 +31,5 @@ function sed_runner() { sed -i.bak ''"$1"'' $2 && rm -f ${2}.bak } - -# No-op \ No newline at end of file +# Update UCX-Py version +sed_runner "s/export UCXPY_VERSION=.*/export UCXPY_VERSION="${NEXT_UCXPY_VERSION}"/g" ci/gpu/build.sh From e3cad7d195f84076be37e5c2ecd5aaf0e9050b93 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 14 Jan 2022 16:39:14 +0100 Subject: [PATCH 18/21] Merge branch-21.12 into branch-22.02 (#829) Closes #821 Authors: - Peter Andreas Entschev (https://github.com/pentschev) - AJ Schmidt (https://github.com/ajschmidt8) Approvers: - Jordan Jacobelli (https://github.com/Ethyling) URL: https://github.com/rapidsai/dask-cuda/pull/829 --- CHANGELOG.md | 45 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1fdb288..f09e324f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,50 @@ Please see https://github.com/rapidsai/dask-cuda/releases/tag/v22.02.00a for the latest changes to this development branch. -# dask-cuda 21.12.00 (Date TBD) +# dask-cuda 21.12.00 (9 Dec 2021) -Please see https://github.com/rapidsai/dask-cuda/releases/tag/v21.12.00a for the latest changes to this development branch. +## 🐛 Bug Fixes + +- Remove automatic `doc` labeler ([#807](https://github.com/rapidsai/dask-cuda/pull/807)) [@pentschev](https://github.com/pentschev) +- Add create_cuda_context UCX config from Distributed ([#801](https://github.com/rapidsai/dask-cuda/pull/801)) [@pentschev](https://github.com/pentschev) +- Ignore deprecation warnings from pkg_resources ([#784](https://github.com/rapidsai/dask-cuda/pull/784)) [@pentschev](https://github.com/pentschev) +- Fix parsing of device by UUID ([#780](https://github.com/rapidsai/dask-cuda/pull/780)) [@pentschev](https://github.com/pentschev) +- Avoid creating CUDA context in LocalCUDACluster parent process ([#765](https://github.com/rapidsai/dask-cuda/pull/765)) [@pentschev](https://github.com/pentschev) +- Remove gen_cluster spill tests ([#758](https://github.com/rapidsai/dask-cuda/pull/758)) [@pentschev](https://github.com/pentschev) +- Update memory_pause_fraction in test_spill ([#757](https://github.com/rapidsai/dask-cuda/pull/757)) [@pentschev](https://github.com/pentschev) + +## 📖 Documentation + +- Add troubleshooting page with PCI Bus ID issue description ([#777](https://github.com/rapidsai/dask-cuda/pull/777)) [@pentschev](https://github.com/pentschev) + +## 🚀 New Features + +- Handle UCX-Py FutureWarning on UCX < 1.11.1 deprecation ([#799](https://github.com/rapidsai/dask-cuda/pull/799)) [@pentschev](https://github.com/pentschev) +- Pin max `dask` & `distributed` versions ([#794](https://github.com/rapidsai/dask-cuda/pull/794)) [@galipremsagar](https://github.com/galipremsagar) +- Update to UCX-Py 0.23 ([#752](https://github.com/rapidsai/dask-cuda/pull/752)) [@pentschev](https://github.com/pentschev) + +## 🛠️ Improvements + +- Fix spill-to-disk triggered by Dask explicitly ([#800](https://github.com/rapidsai/dask-cuda/pull/800)) [@madsbk](https://github.com/madsbk) +- Fix Changelog Merge Conflicts for `branch-21.12` ([#797](https://github.com/rapidsai/dask-cuda/pull/797)) [@ajschmidt8](https://github.com/ajschmidt8) +- Use unittest.mock.patch for all os.environ tests ([#787](https://github.com/rapidsai/dask-cuda/pull/787)) [@pentschev](https://github.com/pentschev) +- Logging when RMM allocation fails ([#782](https://github.com/rapidsai/dask-cuda/pull/782)) [@madsbk](https://github.com/madsbk) +- Tally IDs instead of device buffers directly ([#779](https://github.com/rapidsai/dask-cuda/pull/779)) [@madsbk](https://github.com/madsbk) +- Avoid proxy object aliasing ([#775](https://github.com/rapidsai/dask-cuda/pull/775)) [@madsbk](https://github.com/madsbk) +- Test of sizeof proxy object ([#774](https://github.com/rapidsai/dask-cuda/pull/774)) [@madsbk](https://github.com/madsbk) +- gc.collect when spilling on demand ([#771](https://github.com/rapidsai/dask-cuda/pull/771)) [@madsbk](https://github.com/madsbk) +- Reenable explicit comms tests ([#770](https://github.com/rapidsai/dask-cuda/pull/770)) [@madsbk](https://github.com/madsbk) +- Simplify JIT-unspill and writing docs ([#768](https://github.com/rapidsai/dask-cuda/pull/768)) [@madsbk](https://github.com/madsbk) +- Increase CUDAWorker close timeout ([#764](https://github.com/rapidsai/dask-cuda/pull/764)) [@pentschev](https://github.com/pentschev) +- Ignore known but expected test warnings ([#759](https://github.com/rapidsai/dask-cuda/pull/759)) [@pentschev](https://github.com/pentschev) +- Spilling on demand ([#756](https://github.com/rapidsai/dask-cuda/pull/756)) [@madsbk](https://github.com/madsbk) +- Revert "Temporarily skipping some tests because of a bug in Dask ([#753)" (#754](https://github.com/rapidsai/dask-cuda/pull/753)" (#754)) [@madsbk](https://github.com/madsbk) +- Temporarily skipping some tests because of a bug in Dask ([#753](https://github.com/rapidsai/dask-cuda/pull/753)) [@madsbk](https://github.com/madsbk) +- Removing the `FrameProxyObject` workaround ([#751](https://github.com/rapidsai/dask-cuda/pull/751)) [@madsbk](https://github.com/madsbk) +- Use cuDF Frame instead of Table ([#748](https://github.com/rapidsai/dask-cuda/pull/748)) [@madsbk](https://github.com/madsbk) +- Remove proxy object locks ([#747](https://github.com/rapidsai/dask-cuda/pull/747)) [@madsbk](https://github.com/madsbk) +- Unpin `dask` & `distributed` in CI ([#742](https://github.com/rapidsai/dask-cuda/pull/742)) [@galipremsagar](https://github.com/galipremsagar) +- Update SSHCluster usage in benchmarks with new CUDAWorker ([#326](https://github.com/rapidsai/dask-cuda/pull/326)) [@pentschev](https://github.com/pentschev) # dask-cuda 21.10.00 (7 Oct 2021) From 54c675ba46ffbd6bd2d4ebf4cb8ad404ea7f1191 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Mon, 24 Jan 2022 09:08:49 -0600 Subject: [PATCH 19/21] Pin `dask` & `distributed` versions (#832) Changed required to be in-line with: https://github.com/rapidsai/cudf/pull/10108/ Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/832 --- ci/gpu/build.sh | 2 +- conda/recipes/dask-cuda/meta.yaml | 4 ++-- requirements.txt | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 6011aa2b..4041f2a4 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -34,7 +34,7 @@ export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 # Install dask and distributed from master branch. Usually needed during # development time and disabled before a new dask-cuda release. -export INSTALL_DASK_MASTER=1 +export INSTALL_DASK_MASTER=0 ################################################################################ # SETUP - Check environment diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index fac93448..01258403 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -27,8 +27,8 @@ requirements: - setuptools run: - python - - dask>=2021.11.1 - - distributed>=2021.11.1 + - dask>=2021.11.1,<=2022.01.0 + - distributed>=2021.11.1,<=2022.01.0 - pynvml >=8.0.3 - numpy >=1.16.0 - numba >=0.53.1 diff --git a/requirements.txt b/requirements.txt index f0d36eac..ac17bd86 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ -dask>=2021.11.1 -distributed>=2021.11.1 +dask>=2021.11.1,<=2022.01.0 +distributed>=2021.11.1,<=2022.01.0 pynvml>=11.0.0 numpy>=1.16.0 numba>=0.53.1 From c715b23be4d5d3ddad246fda261d6786ccef83f7 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Thu, 27 Jan 2022 14:10:32 -0800 Subject: [PATCH 20/21] Fix Dask-CUDA version to 22.02 (#835) Authors: - https://github.com/jakirkham Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/835 --- docs/source/examples/ucx.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/examples/ucx.rst b/docs/source/examples/ucx.rst index cce76e85..09e1c168 100644 --- a/docs/source/examples/ucx.rst +++ b/docs/source/examples/ucx.rst @@ -7,7 +7,7 @@ In either case, a ``dask.distributed.Client`` must be made for the worker cluste LocalCUDACluster with Automatic Configuration --------------------------------------------- -Automatic configuration was introduced in Dask-CUDA 22.20 and requires UCX >= 1.11.1. This allows the user to specify only the UCX protocol and let UCX decide which transports to use. +Automatic configuration was introduced in Dask-CUDA 22.02 and requires UCX >= 1.11.1. This allows the user to specify only the UCX protocol and let UCX decide which transports to use. To connect a client to a cluster with automatically-configured UCX and an RMM pool: From d0845add9b90e64c9e332d1ece670600d31a71ae Mon Sep 17 00:00:00 2001 From: Raymond Douglass Date: Wed, 2 Feb 2022 10:49:42 -0500 Subject: [PATCH 21/21] update changelog --- CHANGELOG.md | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f09e324f..df3c1e25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,35 @@ -# dask-cuda 22.02.00 (Date TBD) +# dask-cuda 22.02.00 (2 Feb 2022) -Please see https://github.com/rapidsai/dask-cuda/releases/tag/v22.02.00a for the latest changes to this development branch. +## 🐛 Bug Fixes + +- Ignoe `DepecationWaning` fom `distutils.Vesion` classes ([#823](https://github.com/rapidsai/dask-cuda/pull/823)) [@pentschev](https://github.com/pentschev) +- Handle explicitly disabled UCX tanspots ([#820](https://github.com/rapidsai/dask-cuda/pull/820)) [@pentschev](https://github.com/pentschev) +- Fix egex patten to match to in test_on_demand_debug_info ([#819](https://github.com/rapidsai/dask-cuda/pull/819)) [@pentschev](https://github.com/pentschev) +- Fix skipping GDS test if cucim is not installed ([#813](https://github.com/rapidsai/dask-cuda/pull/813)) [@pentschev](https://github.com/pentschev) +- Unpin Dask and Distibuted vesions ([#810](https://github.com/rapidsai/dask-cuda/pull/810)) [@pentschev](https://github.com/pentschev) +- Update to UCX-Py 0.24 ([#805](https://github.com/rapidsai/dask-cuda/pull/805)) [@pentschev](https://github.com/pentschev) + +## 📖 Documentation + +- Fix Dask-CUDA vesion to 22.02 ([#835](https://github.com/rapidsai/dask-cuda/pull/835)) [@jakikham](https://github.com/jakikham) +- Mege banch-21.12 into banch-22.02 ([#829](https://github.com/rapidsai/dask-cuda/pull/829)) [@pentschev](https://github.com/pentschev) +- Claify `LocalCUDACluste`'s `n_wokes` docstings ([#812](https://github.com/rapidsai/dask-cuda/pull/812)) [@pentschev](https://github.com/pentschev) + +## 🚀 New Featues + +- Pin `dask` & `distibuted` vesions ([#832](https://github.com/rapidsai/dask-cuda/pull/832)) [@galipemsaga](https://github.com/galipemsaga) +- Expose mm-maximum_pool_size agument ([#827](https://github.com/rapidsai/dask-cuda/pull/827)) [@VibhuJawa](https://github.com/VibhuJawa) +- Simplify UCX configs, pemitting UCX_TLS=all ([#792](https://github.com/rapidsai/dask-cuda/pull/792)) [@pentschev](https://github.com/pentschev) + +## 🛠️ Impovements + +- Add avg and std calculation fo time and thoughput ([#828](https://github.com/rapidsai/dask-cuda/pull/828)) [@quasiben](https://github.com/quasiben) +- sizeof test: incease toleance ([#825](https://github.com/rapidsai/dask-cuda/pull/825)) [@madsbk](https://github.com/madsbk) +- Quey UCX-Py fom gpuCI vesioning sevice ([#818](https://github.com/rapidsai/dask-cuda/pull/818)) [@pentschev](https://github.com/pentschev) +- Standadize Distibuted config sepaato in get_ucx_config ([#806](https://github.com/rapidsai/dask-cuda/pull/806)) [@pentschev](https://github.com/pentschev) +- Fixed `PoxyObject.__del__` to use the new Disk IO API fom #791 ([#802](https://github.com/rapidsai/dask-cuda/pull/802)) [@madsbk](https://github.com/madsbk) +- GPUDiect Stoage (GDS) suppot fo spilling ([#793](https://github.com/rapidsai/dask-cuda/pull/793)) [@madsbk](https://github.com/madsbk) +- Disk IO inteface ([#791](https://github.com/rapidsai/dask-cuda/pull/791)) [@madsbk](https://github.com/madsbk) # dask-cuda 21.12.00 (9 Dec 2021)