diff --git a/.readthedocs.yml b/.readthedocs.yml new file mode 100644 index 00000000..eba812c2 --- /dev/null +++ b/.readthedocs.yml @@ -0,0 +1,16 @@ +version: 2 + +build: + image: latest + +sphinx: + configuration: docs/source/conf.py + +python: + version: 3.7 + install: + - method: pip + path: . + +conda: + environment: conda/environments/builddocs_py37.yml diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6da59671..ea092916 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,40 @@ +0.15 +---- +- Fix-up versioneer (#305) `John Kirkham`_ +- Require Distributed 2.15.0+ (#306) `John Kirkham`_ +- Rely on Dask's ability to serialize collections (#307) `John Kirkham`_ +- Ensure CI installs GPU build of UCX (#308) `Peter Andreas Entschev`_ +- Skip 2nd serialization pass of `DeviceSerialized` (#309) `John Kirkham`_ +- Fix tests related to latest RMM changes (#310) `Peter Andreas Entschev`_ +- Fix dask-cuda-worker's interface argument (#314) `Peter Andreas Entschev`_ +- Check only for memory type during test_get_device_total_memory (#315) `Peter Andreas Entschev`_ +- Fix and improve DGX tests (#316) `Peter Andreas Entschev`_ +- Install dependencies via meta package (#317) `Ray Douglass`_ +- Fix errors when TLS files are not specified (#320) `Peter Andreas Entschev`_ +- Refactor dask-cuda-worker into CUDAWorker class (#324) `Jacob Tomlinson`_ +- Add missing __init__.py to dask_cuda/cli (#327) `Peter Andreas Entschev`_ +- Add Dask distributed GPU tests to CI (#329) `Benjamin Zaitlen`_ +- Fix rmm_pool_size argument name in docstrings (#329) `Benjamin Zaitlen`_ +- Add CPU support to benchmarks (#338) `Benjamin Zaitlen`_ +- Fix isort configuration (#339) `Mads R. B. Kristensen`_ +- Explicit-comms: cleanup and bug fix (#340) `Mads R. B. Kristensen`_ +- Add support for RMM managed memory (#343) `Peter Andreas Entschev`_ +- Update docker image in local build script (#345) `Sean Frye`_ +- Support pickle protocol 5 based spilling (#349) `John Kirkham`_ +- Use get_n_gpus for RMM test with dask-cuda-worker (#356) `Peter Andreas Entschev`_ +- Update RMM tests based on deprecated CNMeM (#359) `John Kirkham`_ +- Fix a black error in explicit comms (#360) `John Kirkham`_ +- Fix an `isort` error (#360) `John Kirkham`_ +- Fix an `isort` error (#360) `John Kirkham`_ +- Set `RMM_NO_INITIALIZE` environment variable (#363) `Benjamin Zaitlen`_ +- Fix bash lines in docs (#369) `Benjamin Zaitlen`_ +- Replace `RMM_NO_INITIALIZE` with `RAPIDS_NO_INITIALIZE` (#371) `John Kirkham`_ +- Fixes for docs and RTD updates (#373) `Benjamin Zaitlen`_ +- Confirm DGX tests are running baremetal (#376) `Peter Andreas Entschev`_ +- Set RAPIDS_NO_INITIALIZE at the top of CUDAWorker/LocalCUDACluster (#379) `Peter Andreas Entschev`_ +- Change pytest's basetemp in CI build script (#380) `Peter Andreas Entschev`_ +- Pin Numba version to exclude 0.51.0 (#385) `Benjamin Zaitlen`_ + 0.14 ---- - Publish branch-0.14 to conda (#262) `Paul Taylor`_ @@ -143,3 +180,5 @@ .. _`Paul Taylor`: https://github.com/trxcllnt .. _`Eli Fajardo`: https://github.com/efajardo-nv .. _`Randy Gelhausen`: https://github.com/randerzander +.. _`Jacob Tomlinson`: https://github.com/jacobtomlinson +.. _`Sean Frye`: https://github.com/sean-frye diff --git a/README.md b/README.md index 872da00f..da343f7c 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[![RTD](https://readthedocs.org/projects/dask-cuda/badge/?version=latest)](https://dask-cuda.readthedocs.io/en/latest/?badge=latest) + Dask CUDA ========= @@ -29,4 +31,5 @@ It only helps with deployment and management of Dask workers in multi-GPU systems. Parallelizing GPU libraries like [RAPIDS](https://rapids.ai) and [CuPy](https://cupy.chainer.org) with Dask is an ongoing effort. You may wish to read about this effort at [blog.dask.org](https://blog.dask.org) for more -information.. +information. Additional information about Dask-CUDA can also be found in the +[docs]( https://dask-cuda.readthedocs.io ). diff --git a/ci/cpu/upload-anaconda.sh b/ci/cpu/upload-anaconda.sh index 4183188d..cf6c34f9 100755 --- a/ci/cpu/upload-anaconda.sh +++ b/ci/cpu/upload-anaconda.sh @@ -7,13 +7,12 @@ set -e export UPLOADFILE=`conda build conda/recipes/dask-cuda --python=$PYTHON --output` CUDA_REL=${CUDA_VERSION%.*} -SOURCE_BRANCH=master LABEL_OPTION="--label main" echo "LABEL_OPTION=${LABEL_OPTION}" # Restrict uploads to master branch -if [ ${GIT_BRANCH} != ${SOURCE_BRANCH} ]; then +if [ ${BUILD_MODE} != "branch" ]; then echo "Skipping upload" return 0 fi diff --git a/ci/cpu/upload-pypi.sh b/ci/cpu/upload-pypi.sh index 07ba15a1..4eb6a2ad 100755 --- a/ci/cpu/upload-pypi.sh +++ b/ci/cpu/upload-pypi.sh @@ -1,10 +1,8 @@ #!/bin/bash set -e -SOURCE_BRANCH=master -# Restrict uploads to master branch -if [ ${GIT_BRANCH} != ${SOURCE_BRANCH} ]; then +if [ ${BUILD_MODE} != "branch" ]; then echo "Skipping upload" return 0 fi diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 5581b81e..b4931161 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -57,15 +57,14 @@ conda list # Fixing Numpy version to avoid RuntimeWarning: numpy.ufunc size changed, may # indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject conda install "cudatoolkit=$CUDA_REL" \ - "cupy>=6.5.0" "numpy=1.16.4" \ "cudf=${MINOR_VERSION}" "dask-cudf=${MINOR_VERSION}" \ - "dask>=2.8.1" "distributed>=2.8.1" + "ucx-py=$MINOR_VERSION.*" "ucx-proc=*=gpu" \ + "rapids-build-env=$MINOR_VERSION.*" -# needed for async tests -conda install -c conda-forge "pytest" "pytest-asyncio" +# https://docs.rapids.ai/maintainers/depmgmt/ +# conda remove -f rapids-build-env +# conda install "your-pkg=1.0.0" -# Use nightly build of ucx-py for now -conda install "ucx-py=$MINOR_VERSION" conda list @@ -101,5 +100,17 @@ else logger "Python py.test for dask-cuda..." cd $WORKSPACE ls dask_cuda/tests/ - UCXPY_IFNAME=eth0 UCX_WARN_UNUSED_ENV_VARS=n UCX_MEMTYPE_CACHE=n py.test -vs --cache-clear --junitxml=${WORKSPACE}/junit-dask-cuda.xml --cov-config=.coveragerc --cov=dask_cuda --cov-report=xml:${WORKSPACE}/dask-cuda-coverage.xml --cov-report term dask_cuda/tests/ + UCXPY_IFNAME=eth0 UCX_WARN_UNUSED_ENV_VARS=n UCX_MEMTYPE_CACHE=n py.test -vs --cache-clear --basetemp=${WORKSPACE}/dask-cuda-tmp --junitxml=${WORKSPACE}/junit-dask-cuda.xml --cov-config=.coveragerc --cov=dask_cuda --cov-report=xml:${WORKSPACE}/dask-cuda-coverage.xml --cov-report term dask_cuda/tests/ + + logger "Running dask.distributed GPU tests" + # Test downstream packages, which requires Python v3.7 + if [ $(python -c "import sys; print(sys.version_info[1])") -ge "7" ]; then + logger "TEST OF DASK/UCX..." + py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_cupy as m;print(m.__file__)"` + py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_numba as m;print(m.__file__)"` + py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_rmm as m;print(m.__file__)"` + py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_collection_cuda as m;print(m.__file__)"` + py.test --cache-clear -vs `python -c "import distributed.tests.test_nanny as m;print(m.__file__)"` + py.test --cache-clear -vs `python -c "import distributed.tests.test_gpu_metrics as m;print(m.__file__)"` + fi fi diff --git a/ci/local/build.sh b/ci/local/build.sh index 443e3c14..72ccf9e4 100755 --- a/ci/local/build.sh +++ b/ci/local/build.sh @@ -1,6 +1,9 @@ #!/bin/bash -DOCKER_IMAGE="gpuci/rapidsai-base:cuda10.0-ubuntu16.04-gcc5-py3.6" +GIT_DESCRIBE_TAG=`git describe --tags` +MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` + +DOCKER_IMAGE="gpuci/rapidsai:${MINOR_VERSION}-cuda10.1-devel-ubuntu16.04-py3.7" REPO_PATH=${PWD} RAPIDS_DIR_IN_CONTAINER="/rapids" CPP_BUILD_DIR="cpp/build" @@ -139,4 +142,4 @@ docker run --rm -it ${GPU_OPTS} \ -v "$PASSWD_FILE":/etc/passwd:ro \ -v "$GROUP_FILE":/etc/group:ro \ --cap-add=SYS_PTRACE \ - "${DOCKER_IMAGE}" bash -c "${COMMAND}" \ No newline at end of file + "${DOCKER_IMAGE}" bash -c "${COMMAND}" diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 3f626876..ece107eb 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -25,10 +25,10 @@ requirements: run: - python x.x - dask-core >=2.4.0 - - distributed >=2.7.0 + - distributed >=2.18.0 - pynvml >=8.0.3 - numpy >=1.16.0 - - numba >=0.40.1 + - numba >=0.50.0,!=0.51.0 test: imports: diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index bbc1ac59..91b049f7 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -1,5 +1,6 @@ from ._version import get_versions from .local_cuda_cluster import LocalCUDACluster +from .cuda_worker import CUDAWorker __version__ = get_versions()["version"] del get_versions diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index abbe1191..8762f0cf 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -2,10 +2,13 @@ from collections import defaultdict from time import perf_counter as clock +import numpy + from dask.base import tokenize from dask.dataframe.core import new_dd_object from dask.distributed import Client, performance_report, wait from dask.utils import format_bytes, format_time, parse_bytes + from dask_cuda import explicit_comms from dask_cuda.benchmarks.utils import ( get_cluster_options, @@ -14,17 +17,21 @@ setup_memory_pool, ) -import cudf -import cupy -import numpy - # Benchmarking cuDF merge operation based on # -def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match): +def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): # Setting a seed that triggers max amount of comm in the two-GPU case. - cupy.random.seed(17561648246761420848) + if gpu: + import cupy as xp + + import cudf as xdf + else: + import numpy as xp + import pandas as xdf + + xp.random.seed(2 ** 32 - 1) chunk_type = chunk_type or "build" frac_match = frac_match or 1.0 @@ -40,16 +47,14 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match): start = local_size * i_chunk stop = start + local_size - parts_array = cupy.arange(num_chunks, dtype="int64") - suffle_array = cupy.repeat(parts_array, math.ceil(local_size / num_chunks)) + parts_array = xp.arange(num_chunks, dtype="int64") + suffle_array = xp.repeat(parts_array, math.ceil(local_size / num_chunks)) - df = cudf.DataFrame( + df = xdf.DataFrame( { - "key": cupy.arange(start, stop=stop, dtype="int64"), - "shuffle": cupy.random.permutation(suffle_array)[:local_size], - "payload": cupy.random.permutation( - cupy.arange(local_size, dtype="int64") - ), + "key": xp.arange(start, stop=stop, dtype="int64"), + "shuffle": xp.random.permutation(suffle_array)[:local_size], + "payload": xp.random.permutation(xp.arange(local_size, dtype="int64")), } ) else: @@ -69,26 +74,24 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match): for i in range(num_chunks): bgn = (local_size * i) + (sub_local_size * i_chunk) end = bgn + sub_local_size - ar = cupy.arange(bgn, stop=end, dtype="int64") - arrays.append(cupy.random.permutation(ar)[:sub_local_size_use]) - key_array_match = cupy.concatenate(tuple(arrays), axis=0) + ar = xp.arange(bgn, stop=end, dtype="int64") + arrays.append(xp.random.permutation(ar)[:sub_local_size_use]) + key_array_match = xp.concatenate(tuple(arrays), axis=0) # Step 2. Add values that DON'T match missing_size = local_size - key_array_match.shape[0] start = local_size * num_chunks + local_size * i_chunk stop = start + missing_size - key_array_no_match = cupy.arange(start, stop=stop, dtype="int64") + key_array_no_match = xp.arange(start, stop=stop, dtype="int64") # Step 3. Combine and create the final dataframe chunk (dask_cudf partition) - key_array_combine = cupy.concatenate( + key_array_combine = xp.concatenate( (key_array_match, key_array_no_match), axis=0 ) - df = cudf.DataFrame( + df = xdf.DataFrame( { - "key": cupy.random.permutation(key_array_combine), - "payload": cupy.random.permutation( - cupy.arange(local_size, dtype="int64") - ), + "key": xp.random.permutation(key_array_combine), + "payload": xp.random.permutation(xp.arange(local_size, dtype="int64")), } ) return df @@ -97,13 +100,22 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match): def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): parts = [chunk_size for i in range(num_chunks)] - meta = generate_chunk(0, 4, 1, chunk_type, None) + device_type = True if args.type == "gpu" else False + meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) divisions = [None] * (len(parts) + 1) name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) graph = { - (name, i): (generate_chunk, i, part, len(parts), chunk_type, frac_match) + (name, i): ( + generate_chunk, + i, + part, + len(parts), + chunk_type, + frac_match, + device_type, + ) for i, part in enumerate(parts) } @@ -177,20 +189,24 @@ def main(args): cluster_kwargs = cluster_options["kwargs"] scheduler_addr = cluster_options["scheduler_addr"] - cluster = Cluster(*cluster_args, **cluster_kwargs) - if args.multi_node: - import time + if args.sched_addr: + client = Client(args.sched_addr) + else: + cluster = Cluster(*cluster_args, **cluster_kwargs) + if args.multi_node: + import time - # Allow some time for workers to start and connect to scheduler - # TODO: make this a command-line argument? - time.sleep(15) + # Allow some time for workers to start and connect to scheduler + # TODO: make this a command-line argument? + time.sleep(15) - client = Client(scheduler_addr if args.multi_node else cluster) + client = Client(scheduler_addr if args.multi_node else cluster) - client.run(setup_memory_pool, disable_pool=args.no_rmm_pool) - # Create an RMM pool on the scheduler due to occasional deserialization - # of CUDA objects. May cause issues with InfiniBand otherwise. - client.run_on_scheduler(setup_memory_pool, 1e9, disable_pool=args.no_rmm_pool) + if args.type == "gpu": + client.run(setup_memory_pool, disable_pool=args.no_rmm_pool) + # Create an RMM pool on the scheduler due to occasional deserialization + # of CUDA objects. May cause issues with InfiniBand otherwise. + client.run_on_scheduler(setup_memory_pool, 1e9, disable_pool=args.no_rmm_pool) scheduler_workers = client.run_on_scheduler(get_scheduler_workers) n_workers = len(scheduler_workers) @@ -227,6 +243,7 @@ def main(args): print("Merge benchmark") print("-------------------------------") print(f"backend | {args.backend}") + print(f"merge type | {args.type}") print(f"rows-per-chunk | {args.chunk_size}") print(f"protocol | {args.protocol}") print(f"device(s) | {args.devs}") @@ -278,6 +295,13 @@ def parse_args(): "type": str, "help": "The backend to use.", }, + { + "name": ["-t", "--type",], + "choices": ["cpu", "gpu"], + "default": "gpu", + "type": str, + "help": "Do merge with GPU or CPU dataframes", + }, { "name": ["-c", "--chunk-size",], "default": 1_000_000, diff --git a/dask_cuda/benchmarks/local_cupy_transpose_sum.py b/dask_cuda/benchmarks/local_cupy_transpose_sum.py index 86177dd2..fe9c9d3f 100644 --- a/dask_cuda/benchmarks/local_cupy_transpose_sum.py +++ b/dask_cuda/benchmarks/local_cupy_transpose_sum.py @@ -2,9 +2,13 @@ from collections import defaultdict from time import perf_counter as clock -import dask.array as da +import cupy +import numpy as np + +from dask import array as da from dask.distributed import Client, performance_report, wait from dask.utils import format_bytes, format_time, parse_bytes + from dask_cuda.benchmarks.utils import ( get_cluster_options, get_scheduler_workers, @@ -12,9 +16,6 @@ setup_memory_pool, ) -import cupy -import numpy as np - async def _run(client, args): # Create a simple random array diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 80fea4df..32d92158 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -1,6 +1,7 @@ import argparse from dask.distributed import SSHCluster + from dask_cuda.local_cuda_cluster import LocalCUDACluster @@ -82,6 +83,13 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] dest="multi_node", help="Runs a multi-node cluster on the hosts specified by --hosts.", ) + parser.add_argument( + "--scheduler-address", + default=None, + type=str, + dest="sched_addr", + help="Scheduler Address -- assumes cluster is created outside of benchmark.", + ) parser.add_argument( "--hosts", default=None, @@ -176,9 +184,10 @@ def get_scheduler_workers(dask_scheduler=None): def setup_memory_pool(pool_size=None, disable_pool=False): - import rmm import cupy + import rmm + rmm.reinitialize( pool_allocator=not disable_pool, devices=0, initial_pool_size=pool_size, ) diff --git a/dask_cuda/cli/__init__.py b/dask_cuda/cli/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dask_cuda/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py similarity index 56% rename from dask_cuda/dask_cuda_worker.py rename to dask_cuda/cli/dask_cuda_worker.py index 47c393e3..92c7ede2 100755 --- a/dask_cuda/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -1,38 +1,15 @@ from __future__ import absolute_import, division, print_function -import atexit import logging -import multiprocessing -import os -from distributed import Nanny +import click +from tornado.ioloop import IOLoop, TimeoutError + from distributed.cli.utils import check_python_3, install_signal_handlers -from distributed.config import config from distributed.preloading import validate_preload_argv -from distributed.proctitle import ( - enable_proctitle_on_children, - enable_proctitle_on_current, -) from distributed.security import Security -from distributed.utils import get_ip_interface, parse_bytes -from distributed.worker import parse_memory_limit -import click -from toolz import valmap -from tornado import gen -from tornado.ioloop import IOLoop, TimeoutError - -from .device_host_file import DeviceHostFile -from .local_cuda_cluster import cuda_visible_devices -from .utils import ( - CPUAffinity, - RMMPool, - get_cpu_affinity, - get_device_total_memory, - get_n_gpus, - get_ucx_config, - get_ucx_net_devices, -) +from ..cuda_worker import CUDAWorker logger = logging.getLogger(__name__) @@ -122,6 +99,16 @@ "the given size, otherwise no RMM pool is created. This can be " "an integer (bytes) or string (like 5GB or 5000M).", ) +@click.option( + "--rmm-managed-memory/--no-rmm-managed-memory", + default=False, + help="If enabled, initialize each worker with RMM and set it to " + "use managed memory. If disabled, RMM may still be used if " + "--rmm-pool-size is specified, but in that case with default " + "(non-managed) memory type." + "WARNING: managed memory is currently incompatible with NVLink, " + "trying to enable both will result in an exception.", +) @click.option( "--reconnect/--no-reconnect", default=True, @@ -129,7 +116,7 @@ ) @click.option("--pid-file", type=str, default="", help="File to write the process PID") @click.option( - "--local-directory", default="", type=str, help="Directory to place worker files" + "--local-directory", default=None, type=str, help="Directory to place worker files" ) @click.option( "--resources", @@ -211,6 +198,7 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, + rmm_managed_memory, pid_file, resources, dashboard, @@ -231,147 +219,51 @@ def main( net_devices, **kwargs, ): - enable_proctitle_on_current() - enable_proctitle_on_children() - if tls_ca_file and tls_cert and tls_key: - sec = Security( - tls_ca_file=tls_ca_file, tls_worker_cert=tls_cert, tls_worker_key=tls_key + security = Security( + tls_ca_file=tls_ca_file, tls_worker_cert=tls_cert, tls_worker_key=tls_key, ) else: - sec = None - - try: - nprocs = len(os.environ["CUDA_VISIBLE_DEVICES"].split(",")) - except KeyError: - nprocs = get_n_gpus() - - if not nthreads: - nthreads = min(1, multiprocessing.cpu_count() // nprocs) - - memory_limit = parse_memory_limit(memory_limit, nthreads, total_cores=nprocs) - - if pid_file: - with open(pid_file, "w") as f: - f.write(str(os.getpid())) - - def del_pid_file(): - if os.path.exists(pid_file): - os.remove(pid_file) - - atexit.register(del_pid_file) - - services = {} + security = None + + worker = CUDAWorker( + scheduler, + host, + nthreads, + name, + memory_limit, + device_memory_limit, + rmm_pool_size, + rmm_managed_memory, + pid_file, + resources, + dashboard, + dashboard_address, + local_directory, + scheduler_file, + interface, + death_timeout, + preload, + dashboard_prefix, + security, + enable_tcp_over_ucx, + enable_infiniband, + enable_nvlink, + enable_rdmacm, + net_devices, + **kwargs, + ) + + async def on_signal(signum): + logger.info("Exiting on signal %d", signum) + await worker.close() - if dashboard: - try: - from distributed.dashboard import BokehWorker - except ImportError: - pass - else: - if dashboard_prefix: - result = (BokehWorker, {"prefix": dashboard_prefix}) - else: - result = BokehWorker - services[("dashboard", dashboard_address)] = result - - if resources: - resources = resources.replace(",", " ").split() - resources = dict(pair.split("=") for pair in resources) - resources = valmap(float, resources) - else: - resources = None + async def run(): + await worker + await worker.finished() loop = IOLoop.current() - preload_argv = kwargs.get("preload_argv", []) - kwargs = {"worker_port": None, "listen_address": None} - t = Nanny - - if not scheduler and not scheduler_file and "scheduler-address" not in config: - raise ValueError( - "Need to provide scheduler address like\n" - "dask-worker SCHEDULER_ADDRESS:8786" - ) - - if interface: - if host: - raise ValueError("Can not specify both interface and host") - else: - host = get_ip_interface(interface) - - if rmm_pool_size is not None: - try: - import rmm # noqa F401 - except ImportError: - raise ValueError( - "RMM pool requested but module 'rmm' is not available. " - "For installation instructions, please see " - "https://github.com/rapidsai/rmm" - ) # pragma: no cover - rmm_pool_size = parse_bytes(rmm_pool_size) - - nannies = [ - t( - scheduler, - scheduler_file=scheduler_file, - nthreads=nthreads, - services=services, - loop=loop, - resources=resources, - memory_limit=memory_limit, - interface=get_ucx_net_devices( - cuda_device_index=i, - ucx_net_devices=net_devices, - get_openfabrics=False, - get_network=True, - ), - preload=(list(preload) or []) + ["dask_cuda.initialize"], - preload_argv=(list(preload_argv) or []) + ["--create-cuda-context"], - security=sec, - env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, - plugins={CPUAffinity(get_cpu_affinity(i)), RMMPool(rmm_pool_size)}, - name=name if nprocs == 1 or not name else name + "-" + str(i), - local_directory=local_directory, - config={ - "ucx": get_ucx_config( - enable_tcp_over_ucx=enable_tcp_over_ucx, - enable_infiniband=enable_infiniband, - enable_nvlink=enable_nvlink, - enable_rdmacm=enable_rdmacm, - net_devices=net_devices, - cuda_device_index=i, - ) - }, - data=( - DeviceHostFile, - { - "device_memory_limit": get_device_total_memory(index=i) - if (device_memory_limit == "auto" or device_memory_limit == int(0)) - else parse_bytes(device_memory_limit), - "memory_limit": memory_limit, - "local_directory": local_directory, - }, - ), - **kwargs, - ) - for i in range(nprocs) - ] - - @gen.coroutine - def close_all(): - # Unregister all workers from scheduler - yield [n._close(timeout=2) for n in nannies] - - def on_signal(signum): - logger.info("Exiting on signal %d", signum) - close_all() - - @gen.coroutine - def run(): - yield nannies - yield [n.finished() for n in nannies] - install_signal_handlers(loop, cleanup=on_signal) try: diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py new file mode 100644 index 00000000..16ae2ad5 --- /dev/null +++ b/dask_cuda/cuda_worker.py @@ -0,0 +1,236 @@ +from __future__ import absolute_import, division, print_function + +import asyncio +import atexit +import multiprocessing +import os +import warnings + +from toolz import valmap +from tornado.ioloop import IOLoop + +from distributed import Nanny +from distributed.config import config +from distributed.proctitle import ( + enable_proctitle_on_children, + enable_proctitle_on_current, +) +from distributed.utils import parse_bytes +from distributed.worker import parse_memory_limit + +from .device_host_file import DeviceHostFile +from .initialize import initialize +from .local_cuda_cluster import cuda_visible_devices +from .utils import ( + CPUAffinity, + RMMSetup, + get_cpu_affinity, + get_device_total_memory, + get_n_gpus, + get_ucx_config, + get_ucx_net_devices, +) + + +def _get_interface(interface, host, cuda_device_index, ucx_net_devices): + if host: + return None + else: + return interface or get_ucx_net_devices( + cuda_device_index=cuda_device_index, + ucx_net_devices=ucx_net_devices, + get_openfabrics=False, + get_network=True, + ) + + +class CUDAWorker: + def __init__( + self, + scheduler, + host=None, + nthreads=0, + name=None, + memory_limit="auto", + device_memory_limit="auto", + rmm_pool_size=None, + rmm_managed_memory=False, + pid_file=None, + resources=None, + dashboard=True, + dashboard_address=":0", + local_directory=None, + scheduler_file=None, + interface=None, + death_timeout=None, + preload=[], + dashboard_prefix=None, + security=None, + enable_tcp_over_ucx=False, + enable_infiniband=False, + enable_nvlink=False, + enable_rdmacm=False, + net_devices=None, + **kwargs, + ): + # Required by RAPIDS libraries (e.g., cuDF) to ensure no context + # initialization happens before we can set CUDA_VISIBLE_DEVICES + os.environ["RAPIDS_NO_INITIALIZE"] = "True" + + enable_proctitle_on_current() + enable_proctitle_on_children() + + try: + nprocs = len(os.environ["CUDA_VISIBLE_DEVICES"].split(",")) + except KeyError: + nprocs = get_n_gpus() + + if not nthreads: + nthreads = min(1, multiprocessing.cpu_count() // nprocs) + + memory_limit = parse_memory_limit(memory_limit, nthreads, total_cores=nprocs) + + if pid_file: + with open(pid_file, "w") as f: + f.write(str(os.getpid())) + + def del_pid_file(): + if os.path.exists(pid_file): + os.remove(pid_file) + + atexit.register(del_pid_file) + + services = {} + + if dashboard: + try: + from distributed.dashboard import BokehWorker + except ImportError: + pass + else: + if dashboard_prefix: + result = (BokehWorker, {"prefix": dashboard_prefix}) + else: + result = BokehWorker + services[("dashboard", dashboard_address)] = result + + if resources: + resources = resources.replace(",", " ").split() + resources = dict(pair.split("=") for pair in resources) + resources = valmap(float, resources) + else: + resources = None + + loop = IOLoop.current() + + preload_argv = kwargs.get("preload_argv", []) + kwargs = {"worker_port": None, "listen_address": None} + t = Nanny + + if not scheduler and not scheduler_file and "scheduler-address" not in config: + raise ValueError( + "Need to provide scheduler address like\n" + "dask-worker SCHEDULER_ADDRESS:8786" + ) + + if interface and host: + raise ValueError("Can not specify both interface and host") + + if rmm_pool_size is not None or rmm_managed_memory: + try: + import rmm # noqa F401 + except ImportError: + raise ValueError( + "RMM pool requested but module 'rmm' is not available. " + "For installation instructions, please see " + "https://github.com/rapidsai/rmm" + ) # pragma: no cover + rmm_pool_size = parse_bytes(rmm_pool_size) + else: + if enable_nvlink: + warnings.warn( + "When using NVLink we recommend setting a " + "`rmm_pool_size`. Please see: " + "https://dask-cuda.readthedocs.io/en/latest/ucx.html" + "#important-notes for more details" + ) + if rmm_pool_size is not None: + rmm_pool_size = parse_bytes(rmm_pool_size) + + if enable_nvlink and rmm_managed_memory: + raise ValueError( + "RMM managed memory and NVLink are currently incompatible." + ) + + # Ensure this parent dask-cuda-worker process uses the same UCX + # configuration as child worker processes created by it. + initialize( + create_cuda_context=False, + enable_tcp_over_ucx=enable_tcp_over_ucx, + enable_infiniband=enable_infiniband, + enable_nvlink=enable_nvlink, + enable_rdmacm=enable_rdmacm, + net_devices=net_devices, + cuda_device_index=0, + ) + + self.nannies = [ + t( + scheduler, + scheduler_file=scheduler_file, + nthreads=nthreads, + services=services, + loop=loop, + resources=resources, + memory_limit=memory_limit, + interface=_get_interface(interface, host, i, net_devices), + host=host, + preload=(list(preload) or []) + ["dask_cuda.initialize"], + preload_argv=(list(preload_argv) or []) + ["--create-cuda-context"], + security=security, + env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, + plugins={ + CPUAffinity(get_cpu_affinity(i)), + RMMSetup(rmm_pool_size, rmm_managed_memory), + }, + name=name if nprocs == 1 or not name else name + "-" + str(i), + local_directory=local_directory, + config={ + "ucx": get_ucx_config( + enable_tcp_over_ucx=enable_tcp_over_ucx, + enable_infiniband=enable_infiniband, + enable_nvlink=enable_nvlink, + enable_rdmacm=enable_rdmacm, + net_devices=net_devices, + cuda_device_index=i, + ) + }, + data=( + DeviceHostFile, + { + "device_memory_limit": get_device_total_memory(index=i) + if ( + device_memory_limit == "auto" + or device_memory_limit == int(0) + ) + else parse_bytes(device_memory_limit), + "memory_limit": memory_limit, + "local_directory": local_directory, + }, + ), + **kwargs, + ) + for i in range(nprocs) + ] + + def __await__(self): + return self._wait().__await__() + + async def _wait(self): + await asyncio.gather(*self.nannies) + + async def finished(self): + await asyncio.gather(*[n.finished() for n in self.nannies]) + + async def close(self, timeout=2): + await asyncio.gather(*[n.close(timeout=timeout) for n in self.nannies]) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 2ca738ce..00c74709 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -1,6 +1,9 @@ import functools import os +from zict import Buffer, File, Func +from zict.common import ZictBase + import dask from distributed.protocol import ( dask_deserialize, @@ -13,10 +16,6 @@ from distributed.utils import nbytes from distributed.worker import weight -import numpy -from zict import Buffer, File, Func -from zict.common import ZictBase - from .is_device_object import is_device_object from .utils import nvtx_annotate @@ -31,50 +30,40 @@ class DeviceSerialized: that are in host memory """ - def __init__(self, header, parts): + def __init__(self, header, frames): self.header = header - self.parts = parts + self.frames = frames def __sizeof__(self): - return sum(map(nbytes, self.parts)) + return sum(map(nbytes, self.frames)) + + def __reduce_ex__(self, protocol): + header, frames = device_serialize(self) + frames = [f.obj for f in frames] + return device_deserialize, (header, frames) @dask_serialize.register(DeviceSerialized) def device_serialize(obj): - headers = [] - all_frames = [] - for part in obj.parts: - header, frames = serialize(part) - header["frame-start-stop"] = [len(all_frames), len(all_frames) + len(frames)] - headers.append(header) - all_frames.extend(frames) - - header = {"sub-headers": headers, "main-header": obj.header} - - return header, all_frames + header = {"obj-header": obj.header} + frames = obj.frames + return header, frames @dask_deserialize.register(DeviceSerialized) def device_deserialize(header, frames): - parts = [] - for sub_header in header["sub-headers"]: - start, stop = sub_header.pop("frame-start-stop") - part = deserialize(sub_header, frames[start:stop]) - parts.append(part) - - return DeviceSerialized(header["main-header"], parts) + return DeviceSerialized(header["obj-header"], frames) @nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda") def device_to_host(obj: object) -> DeviceSerialized: header, frames = serialize(obj, serializers=["dask", "pickle"]) - frames = [numpy.asarray(f) for f in frames] return DeviceSerialized(header, frames) @nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda") def host_to_device(s: DeviceSerialized) -> object: - return deserialize(s.header, s.parts) + return deserialize(s.header, s.frames) class DeviceHostFile(ZictBase): @@ -104,8 +93,10 @@ def __init__( ): if local_directory is None: local_directory = dask.config.get("temporary-directory") or os.getcwd() + + if not os.path.exists(local_directory): os.makedirs(local_directory, exist_ok=True) - local_directory = os.path.join(local_directory, "dask-worker-space") + local_directory = os.path.join(local_directory, "dask-worker-space") self.disk_func_path = os.path.join(local_directory, "storage") diff --git a/dask_cuda/explicit_comms/comms.py b/dask_cuda/explicit_comms/comms.py index 44b2bde0..72cfbbc4 100644 --- a/dask_cuda/explicit_comms/comms.py +++ b/dask_cuda/explicit_comms/comms.py @@ -125,7 +125,7 @@ def __init__(self, client=None): ) ) - # Each worker creates a UCX endpoint to all workers with greater rank + # Each worker creates an endpoint to all workers with greater rank self.run(_create_endpoints, self.worker_direct_addresses) # At this point all workers should have a rank and endpoints to @@ -211,14 +211,11 @@ def dataframe_operation(self, coroutine, df_list, extra_args=tuple()): """ df_parts_list = [] for df in df_list: - df_parts_list.append( - utils.workers_to_parts( - self.client.sync(utils.extract_ddf_partitions, df) - ) - ) + df_parts_list.append(utils.extract_ddf_partitions(df)) # Let's create a dict for each dataframe that specifices the # number of partitions each worker has + world = set() dfs_nparts = [] for df_parts in df_parts_list: nparts = {} @@ -226,14 +223,18 @@ def dataframe_operation(self, coroutine, df_list, extra_args=tuple()): npart = len(df_parts.get(worker, [])) if npart > 0: nparts[rank] = npart + world.add(rank) dfs_nparts.append(nparts) # Submit `coroutine` on each worker given the df_parts that # belong the specific worker as input ret = [] - for worker in self.worker_addresses: - dfs = [] - for df_parts in df_parts_list: - dfs.append(df_parts.get(worker, [])) - ret.append(self.submit(worker, coroutine, dfs_nparts, dfs, *extra_args)) + for rank, worker in enumerate(self.worker_addresses): + if rank in world: + dfs = [] + for df_parts in df_parts_list: + dfs.append(df_parts.get(worker, [])) + ret.append( + self.submit(worker, coroutine, world, dfs_nparts, dfs, *extra_args) + ) return utils.dataframes_to_dask_dataframe(ret) diff --git a/dask_cuda/explicit_comms/dataframe_merge.py b/dask_cuda/explicit_comms/dataframe_merge.py index 41e35d87..91e0e3e4 100644 --- a/dask_cuda/explicit_comms/dataframe_merge.py +++ b/dask_cuda/explicit_comms/dataframe_merge.py @@ -1,11 +1,10 @@ import asyncio +import pandas + from dask.dataframe.shuffle import partitioning_index, shuffle_group from distributed.protocol import to_serialize -import cudf -import pandas - from . import comms @@ -62,10 +61,15 @@ async def exchange_and_concat_bins(rank, eps, bins): def concat(df_list): if len(df_list) == 0: return None - elif isinstance(df_list[0], (cudf.DataFrame, cudf.Series)): - return cudf.concat(df_list) else: - return pandas.concat(df_list) + typ = str(type(df_list[0])) + if "cudf" in typ: + # delay import of cudf to handle CPU only tests + import cudf + + return cudf.concat(df_list) + else: + return pandas.concat(df_list) def partition_by_hash(df, columns, n_chunks, ignore_index=False): @@ -138,13 +142,15 @@ async def single_partition_join( return left_table.merge(right_table, left_on=left_on, right_on=right_on) -async def _dataframe_merge(s, dfs_nparts, dfs_parts, left_on, right_on): +async def _dataframe_merge(s, workers, dfs_nparts, dfs_parts, left_on, right_on): """ Worker job that merge local DataFrames Parameters ---------- s: dict Worker session state + workers: set + Set of ranks of all the participants dfs_nparts: list of dict List of dict that for each worker rank specifices the number of partitions that worker has. If the worker doesn't @@ -174,40 +180,52 @@ def df_concat(df_parts): else: return concat(df_parts) + assert s["rank"] in workers + + # Trimming such that all participanting workers get a rank within 0..len(workers) + trim_map = {} + for i in range(s["nworkers"]): + if i in workers: + trim_map[i] = len(trim_map) + + rank = trim_map[s["rank"]] + eps = {trim_map[i]: s["eps"][trim_map[i]] for i in workers if i != s["rank"]} + df1 = df_concat(dfs_parts[0]) df2 = df_concat(dfs_parts[1]) if len(dfs_nparts[0]) == 1 and len(dfs_nparts[1]) == 1: return df1.merge(df2, left_on=left_on, right_on=right_on) - elif len(dfs_nparts[0]) == 1: return await single_partition_join( - s["nworkers"], - s["rank"], - s["eps"], + len(workers), + rank, + eps, df1, df2, left_on, right_on, "left", - next(iter(dfs_nparts[0])), # Extracting the only key in `dfs_nparts[0]` + trim_map[ + next(iter(dfs_nparts[0])) + ], # Extracting the only key in `dfs_nparts[0]` ) elif len(dfs_nparts[1]) == 1: return await single_partition_join( - s["nworkers"], - s["rank"], - s["eps"], + len(workers), + rank, + eps, df1, df2, left_on, right_on, "right", - next(iter(dfs_nparts[1])), # Extracting the only key in `dfs_nparts[1]` + trim_map[ + next(iter(dfs_nparts[1])) + ], # Extracting the only key in `dfs_nparts[1]` ) else: - return await hash_join( - s["nworkers"], s["rank"], s["eps"], df1, df2, left_on, right_on - ) + return await hash_join(len(workers), rank, eps, df1, df2, left_on, right_on) def dataframe_merge(left, right, on=None, left_on=None, right_on=None, how="inner"): diff --git a/dask_cuda/explicit_comms/utils.py b/dask_cuda/explicit_comms/utils.py index be1c443e..893ebb3a 100644 --- a/dask_cuda/explicit_comms/utils.py +++ b/dask_cuda/explicit_comms/utils.py @@ -1,58 +1,26 @@ -from collections import OrderedDict - -import dask.dataframe as dd -from distributed import default_client, wait +from collections import defaultdict from toolz import first -from tornado import gen - -def workers_to_parts(futures): - """ - Builds an ordered dict mapping each worker to their list - of parts - :param futures: list of (worker, part) tuples - :return: - """ - w_to_p_map = OrderedDict() - for w, p in futures: - if w not in w_to_p_map: - w_to_p_map[w] = [] - w_to_p_map[w].append(p) - return w_to_p_map - - -@gen.coroutine -def extract_ddf_partitions(ddf, client=None, agg=True): - """ - Given a Dask dataframe, return an OrderedDict mapping - 'worker -> [list of futures]' for each partition in ddf. +from dask import dataframe as dd +from distributed import default_client, get_client, wait - :param ddf: Dask.dataframe split dataframe partitions into a list of - futures. - :param client: dask.distributed.Client Optional client to use - """ - client = default_client() if client is None else client +def extract_ddf_partitions(ddf): + """ Returns the mapping: worker -> [list of futures]""" + client = get_client() delayed_ddf = ddf.to_delayed() parts = client.compute(delayed_ddf) - yield wait(parts) - - key_to_part_dict = dict([(str(part.key), part) for part in parts]) - who_has = yield client.who_has(parts) - - worker_map = {} # Map from part -> worker - for key, workers in who_has.items(): - worker = first(workers) - worker_map[key_to_part_dict[key]] = worker - - worker_to_parts = [] - for part in parts: - worker = worker_map[part] - worker_to_parts.append((worker, part)) - - yield wait(worker_to_parts) - raise gen.Return(worker_to_parts) + wait(parts) + + key_to_part = dict([(str(part.key), part) for part in parts]) + ret = defaultdict(list) # Map worker -> [list of futures] + for key, workers in client.who_has(parts).items(): + worker = first( + workers + ) # If multiple workers have the part, we pick the first worker + ret[worker].append(key_to_part[key]) + return ret def get_meta(df): diff --git a/dask_cuda/initialize.py b/dask_cuda/initialize.py index a87c0798..e53c10de 100644 --- a/dask_cuda/initialize.py +++ b/dask_cuda/initialize.py @@ -25,11 +25,11 @@ """ import logging -import dask - import click import numba.cuda +import dask + from .utils import get_ucx_config logger = logging.getLogger(__name__) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index eeaab4ea..f0703d35 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -1,5 +1,6 @@ import copy import os +import warnings import dask from dask.distributed import LocalCluster @@ -7,9 +8,10 @@ from distributed.worker import parse_memory_limit from .device_host_file import DeviceHostFile +from .initialize import initialize from .utils import ( CPUAffinity, - RMMPool, + RMMSetup, get_cpu_affinity, get_device_total_memory, get_n_gpus, @@ -59,8 +61,6 @@ class LocalCUDACluster(LocalCluster): CUDA_VISIBLE_DEVICES: str String like ``"0,1,2,3"`` or ``[0, 1, 2, 3]`` to restrict activity to different GPUs - Parameters - ---------- interface: str The external interface used to connect to the scheduler, usually an ethernet interface is used for connection, and not an InfiniBand @@ -99,9 +99,15 @@ class LocalCUDACluster(LocalCluster): configured or is disconnected, for that reason it's limited to InfiniBand only and will still cause unpredictable errors if not _ALL_ interfaces are connected and properly configured. - rmm_pool: None, int or str + rmm_pool_size: None, int or str When None (default), no RMM pool is initialized. If a different value - is given, it can be an integer (bytes) or string (like 5GB or 5000M)." + is given, it can be an integer (bytes) or string (like 5GB or 5000M). + rmm_managed_memory: bool + If True, initialize each worker with RMM and set it to use managed + memory. If False, RMM may still be used if `rmm_pool_size` is specified, + but in that case with default (non-managed) memory type. + WARNING: managed memory is currently incompatible with NVLink, trying + to enable both will result in an exception. Examples -------- @@ -117,7 +123,8 @@ class LocalCUDACluster(LocalCluster): ValueError If ucx_net_devices is an empty string, or if it is "auto" and UCX-Py is not installed, or if it is "auto" and enable_infiniband=False, or UCX-Py - wasn't compiled with hwloc support. + wasn't compiled with hwloc support, or both RMM managed memory and + NVLink are enabled. See Also -------- @@ -141,8 +148,13 @@ def __init__( enable_rdmacm=False, ucx_net_devices=None, rmm_pool_size=None, + rmm_managed_memory=False, **kwargs, ): + # Required by RAPIDS libraries (e.g., cuDF) to ensure no context + # initialization happens before we can set CUDA_VISIBLE_DEVICES + os.environ["RAPIDS_NO_INITIALIZE"] = "True" + if CUDA_VISIBLE_DEVICES is None: CUDA_VISIBLE_DEVICES = cuda_visible_devices(0) if isinstance(CUDA_VISIBLE_DEVICES, str): @@ -156,16 +168,27 @@ def __init__( self.device_memory_limit = device_memory_limit self.rmm_pool_size = rmm_pool_size - if rmm_pool_size is not None: + self.rmm_managed_memory = rmm_managed_memory + if rmm_pool_size is not None or rmm_managed_memory: try: import rmm # noqa F401 except ImportError: raise ValueError( - "RMM pool requested but module 'rmm' is not available. " - "For installation instructions, please see " - "https://github.com/rapidsai/rmm" + "RMM pool or managed memory requested but module 'rmm' " + "is not available. For installation instructions, please " + "see https://github.com/rapidsai/rmm" ) # pragma: no cover self.rmm_pool_size = parse_bytes(self.rmm_pool_size) + else: + if enable_nvlink: + warnings.warn( + "When using NVLink we recommend setting a " + "`rmm_pool_size`. Please see: " + "https://dask-cuda.readthedocs.io/en/latest/ucx.html" + "#important-notes for more details" + ) + if self.rmm_pool_size is not None: + self.rmm_pool_size = parse_bytes(self.rmm_pool_size) if not processes: raise ValueError( @@ -197,7 +220,8 @@ def __init__( if ucx_net_devices == "auto": try: - from ucp._libs.topological_distance import TopologicalDistance # noqa + from ucp._libs.topological_distance import \ + TopologicalDistance # NOQA except ImportError: raise ValueError( "ucx_net_devices set to 'auto' but UCX-Py is not " @@ -209,6 +233,15 @@ def __init__( self.set_ucx_net_devices = enable_infiniband self.host = kwargs.get("host", None) + initialize( + enable_tcp_over_ucx=enable_tcp_over_ucx, + enable_nvlink=enable_nvlink, + enable_infiniband=enable_infiniband, + enable_rdmacm=enable_rdmacm, + net_devices=ucx_net_devices, + cuda_device_index=0, + ) + super().__init__( n_workers=0, threads_per_worker=threads_per_worker, @@ -255,7 +288,7 @@ def new_worker_spec(self): "env": {"CUDA_VISIBLE_DEVICES": visible_devices,}, "plugins": { CPUAffinity(get_cpu_affinity(worker_count)), - RMMPool(self.rmm_pool_size), + RMMSetup(self.rmm_pool_size, self.rmm_managed_memory), }, } ) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index c0297519..d4a4e365 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -1,16 +1,15 @@ from __future__ import absolute_import, division, print_function import os -from time import sleep -from dask_cuda.utils import get_gpu_count +import pytest + from distributed import Client -from distributed.metrics import time from distributed.system import MEMORY_LIMIT from distributed.utils_test import loop # noqa: F401 from distributed.utils_test import popen -import pytest +from dask_cuda.utils import get_n_gpus, wait_workers def test_cuda_visible_devices_and_memory_limit(loop): # noqa: F811 @@ -29,13 +28,7 @@ def test_cuda_visible_devices_and_memory_limit(loop): # noqa: F811 ] ): with Client("127.0.0.1:9359", loop=loop) as client: - start = time() - while True: - if len(client.scheduler_info()["workers"]) == 4: - break - else: - assert time() - start < 10 - sleep(0.1) + assert wait_workers(client, n_gpus=4) def get_visible_devices(): return os.environ["CUDA_VISIBLE_DEVICES"] @@ -55,7 +48,7 @@ def get_visible_devices(): del os.environ["CUDA_VISIBLE_DEVICES"] -def test_rmm_pool(loop): # noqa: F811 +def test_rmm(loop): # noqa: F811 rmm = pytest.importorskip("rmm") with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): with popen( @@ -66,18 +59,15 @@ def test_rmm_pool(loop): # noqa: F811 "127.0.0.1", "--rmm-pool-size", "2 GB", + "--rmm-managed-memory", "--no-dashboard", ] ): with Client("127.0.0.1:9369", loop=loop) as client: - start = time() - while True: - if len(client.scheduler_info()["workers"]) == get_gpu_count(): - break - else: - assert time() - start < 10 - sleep(0.1) + assert wait_workers(client, n_gpus=get_n_gpus()) - memory_info = client.run(rmm.get_info) - for v in memory_info.values(): - assert v.total == 2000000000 + memory_resource_type = client.run( + rmm.mr.get_current_device_resource_type + ) + for v in memory_resource_type.values(): + assert v is rmm.mr.PoolMemoryResource diff --git a/dask_cuda/tests/test_device_host_file.py b/dask_cuda/tests/test_device_host_file.py index 00e96d0a..a3327b9c 100644 --- a/dask_cuda/tests/test_device_host_file.py +++ b/dask_cuda/tests/test_device_host_file.py @@ -1,17 +1,24 @@ import os from random import randint +import numpy as np +import pytest + import dask -import dask.array as da +from dask import array as da +from distributed.protocol import ( + deserialize, + deserialize_bytes, + serialize, + serialize_bytelist, +) +from distributed.protocol.pickle import HIGHEST_PROTOCOL + from dask_cuda.device_host_file import ( DeviceHostFile, device_to_host, host_to_device, ) -from distributed.protocol import deserialize_bytes, serialize_bytelist - -import numpy as np -import pytest cupy = pytest.importorskip("cupy") @@ -170,3 +177,20 @@ def test_serialize_cupy_collection(collection, length, value): assert isinstance(res, collection) values = res.values() if collection is dict else res [assert_func(v, x) for v in values] + + header, frames = serialize(obj, serializers=["pickle"]) + + if HIGHEST_PROTOCOL >= 5: + assert len(frames) == (1 + len(obj.frames)) + else: + assert len(frames) == 1 + + obj2 = deserialize(header, frames) + res = host_to_device(obj2) + + if length == 0: + assert_func(res, x) + else: + assert isinstance(res, collection) + values = res.values() if collection is dict else res + [assert_func(v, x) for v in values] diff --git a/dask_cuda/tests/test_dgx.py b/dask_cuda/tests/test_dgx.py index 9c2b3b60..86d82e6b 100644 --- a/dask_cuda/tests/test_dgx.py +++ b/dask_cuda/tests/test_dgx.py @@ -1,44 +1,60 @@ import multiprocessing as mp import os import subprocess +from enum import Enum, auto from time import sleep -import dask.array as da -from dask_cuda import LocalCUDACluster -from dask_cuda.initialize import initialize -from dask_cuda.utils import get_gpu_count -from distributed import Client -from distributed.metrics import time -from distributed.utils import get_ip_interface - import numpy import pytest from tornado.ioloop import IOLoop +from dask import array as da +from distributed import Client +from distributed.utils import get_ip_interface + +from dask_cuda import LocalCUDACluster +from dask_cuda.initialize import initialize +from dask_cuda.utils import wait_workers + mp = mp.get_context("spawn") ucp = pytest.importorskip("ucp") psutil = pytest.importorskip("psutil") -def _check_dgx_version(): - dgx_server = None +class DGXVersion(Enum): + DGX_1 = auto() + DGX_2 = auto() + DGX_A100 = auto() - if not os.path.isfile("/etc/dgx-release"): - return dgx_server - for line in open("/etc/dgx-release"): - if line.startswith("DGX_PLATFORM"): - if "DGX Server for DGX-1" in line: - dgx_server = 1 - elif "DGX Server for DGX-2" in line: - dgx_server = 2 - break +def _get_dgx_name(): + product_name_file = "/sys/class/dmi/id/product_name" + dgx_release_file = "/etc/dgx-release" - return dgx_server + # We verify `product_name_file` to check it's a DGX, and check + # if `dgx_release_file` exists to confirm it's not a container. + if not os.path.isfile(product_name_file) or not os.path.isfile(dgx_release_file): + return None + + for line in open(product_name_file): + return line + + +def _get_dgx_version(): + dgx_name = _get_dgx_name() + + if dgx_name is None: + return None + elif "DGX-1" in dgx_name: + return DGXVersion.DGX_1 + elif "DGX-2" in dgx_name: + return DGXVersion.DGX_2 + elif "DGXA100" in dgx_name: + return DGXVersion.DGX_A100 def _get_dgx_net_devices(): - if _check_dgx_version() == 1: + if _get_dgx_version() == DGXVersion.DGX_1: return [ "mlx5_0:1,ib0", "mlx5_0:1,ib0", @@ -49,7 +65,7 @@ def _get_dgx_net_devices(): "mlx5_3:1,ib3", "mlx5_3:1,ib3", ] - elif _check_dgx_version() == 2: + elif _get_dgx_version() == DGXVersion.DGX_2: return [ "mlx5_0:1,ib0", "mlx5_0:1,ib0", @@ -72,7 +88,7 @@ def _get_dgx_net_devices(): return None -if _check_dgx_version() is None: +if _get_dgx_version() is None: pytest.skip("Not a DGX server", allow_module_level=True) @@ -201,6 +217,10 @@ def check_ucx_options(): {"enable_infiniband": True, "enable_nvlink": True, "enable_rdmacm": True}, ], ) +@pytest.mark.skipif( + _get_dgx_version() == DGXVersion.DGX_A100, + reason="Automatic InfiniBand device detection Unsupported for %s" % _get_dgx_name(), +) def test_ucx_infiniband_nvlink(params): p = mp.Process( target=_test_ucx_infiniband_nvlink, @@ -228,6 +248,8 @@ def _test_dask_cuda_worker_ucx_net_devices(enable_rdmacm): sched_env = os.environ.copy() sched_env["DASK_UCX__INFINIBAND"] = "True" sched_env["DASK_UCX__TCP"] = "True" + sched_env["DASK_UCX__CUDA_COPY"] = "True" + sched_env["DASK_UCX__NET_DEVICES"] = openfabrics_devices[0] if enable_rdmacm: sched_env["DASK_UCX__RDMACM"] = "True" @@ -246,7 +268,10 @@ def _test_dask_cuda_worker_ucx_net_devices(enable_rdmacm): # Enable proper variables for client initialize( - enable_tcp_over_ucx=True, enable_infiniband=True, enable_rdmacm=enable_rdmacm + enable_tcp_over_ucx=True, + enable_infiniband=True, + enable_rdmacm=enable_rdmacm, + net_devices=openfabrics_devices[0], ) with subprocess.Popen( @@ -270,13 +295,13 @@ def _test_dask_cuda_worker_ucx_net_devices(enable_rdmacm): ) as worker_proc: with Client(sched_url, loop=loop) as client: - start = time() - while True: - if len(client.scheduler_info()["workers"]) == get_gpu_count(): - break - else: - assert time() - start < 10 - sleep(0.1) + def _timeout_callback(): + # We must ensure processes are terminated to avoid hangs + # if a timeout occurs + worker_proc.kill() + sched_proc.kill() + + assert wait_workers(client, timeout_callback=_timeout_callback) workers_tls = client.run(lambda: ucp.get_config()["TLS"]) workers_tls_priority = client.run( @@ -307,6 +332,10 @@ def _test_dask_cuda_worker_ucx_net_devices(enable_rdmacm): @pytest.mark.parametrize("enable_rdmacm", [False, True]) +@pytest.mark.skipif( + _get_dgx_version() == DGXVersion.DGX_A100, + reason="Automatic InfiniBand device detection Unsupported for %s" % _get_dgx_name(), +) def test_dask_cuda_worker_ucx_net_devices(enable_rdmacm): p = mp.Process( target=_test_dask_cuda_worker_ucx_net_devices, args=(enable_rdmacm,), diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 2bacc8c5..1ad45f8a 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -1,14 +1,18 @@ import multiprocessing as mp -import dask.dataframe as dd -from dask_cuda.explicit_comms import CommsContext, dataframe_merge +import numpy as np +import pandas as pd +import pytest + +import dask +from dask import dataframe as dd from distributed import Client from distributed.deploy.local import LocalCluster import cudf -import numpy as np -import pandas as pd -import pytest +from cudf.tests.utils import assert_eq + +from dask_cuda.explicit_comms import CommsContext, dataframe_merge mp = mp.get_context("spawn") ucp = pytest.importorskip("ucp") @@ -43,6 +47,12 @@ def test_local_cluster(protocol): def _test_dataframe_merge(backend, protocol, n_workers): + dask.config.update( + dask.config.global_config, + {"ucx": {"TLS": "tcp,sockcm,cuda_copy",},}, + priority="new", + ) + with LocalCluster( protocol=protocol, dashboard_address=None, @@ -74,10 +84,10 @@ def _test_dataframe_merge(backend, protocol, n_workers): got = ddf3.compute() if backend == "cudf": - got = got.to_pandas() - got.index.names = ["key"] # TODO: this shouldn't be needed + assert_eq(got, expected) - pd.testing.assert_frame_equal(got, expected) + else: + pd.testing.assert_frame_equal(got, expected) @pytest.mark.parametrize("nworkers", [1, 2, 4]) @@ -88,3 +98,32 @@ def test_dataframe_merge(backend, protocol, nworkers): p.start() p.join() assert not p.exitcode + + +def _test_dataframe_merge_empty_partitions(nrows, npartitions): + with LocalCluster( + protocol="tcp", + dashboard_address=None, + n_workers=npartitions, + threads_per_worker=1, + processes=True, + ) as cluster: + with Client(cluster): + df1 = pd.DataFrame({"key": np.arange(nrows), "payload1": np.arange(nrows)}) + key = np.arange(nrows) + np.random.shuffle(key) + df2 = pd.DataFrame({"key": key, "payload2": np.arange(nrows)}) + expected = df1.merge(df2).set_index("key") + ddf1 = dd.from_pandas(df1, npartitions=npartitions) + ddf2 = dd.from_pandas(df2, npartitions=npartitions) + ddf3 = dataframe_merge(ddf1, ddf2, on="key").set_index("key") + got = ddf3.compute() + pd.testing.assert_frame_equal(got, expected) + + +def test_dataframe_merge_empty_partitions(): + # Notice, we use more partitions than rows + p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4)) + p.start() + p.join() + assert not p.exitcode diff --git a/dask_cuda/tests/test_initialize.py b/dask_cuda/tests/test_initialize.py index 7b450f41..2c56ab34 100644 --- a/dask_cuda/tests/test_initialize.py +++ b/dask_cuda/tests/test_initialize.py @@ -1,15 +1,16 @@ import multiprocessing as mp -import dask.array as da -from dask_cuda.initialize import initialize -from dask_cuda.utils import get_ucx_config -from distributed import Client -from distributed.deploy.local import LocalCluster - import numpy import psutil import pytest +from dask import array as da +from distributed import Client +from distributed.deploy.local import LocalCluster + +from dask_cuda.initialize import initialize +from dask_cuda.utils import get_ucx_config + mp = mp.get_context("spawn") ucp = pytest.importorskip("ucp") diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 968786ce..ab39e0b8 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -1,12 +1,13 @@ import os +import pytest + from dask.distributed import Client -from dask_cuda import LocalCUDACluster, utils -from dask_cuda.initialize import initialize from distributed.system import MEMORY_LIMIT from distributed.utils_test import gen_test -import pytest +from dask_cuda import LocalCUDACluster, utils +from dask_cuda.initialize import initialize @gen_test(timeout=20) @@ -34,7 +35,7 @@ def get_visible_devices(): assert full_mem >= MEMORY_LIMIT - 1024 and full_mem < MEMORY_LIMIT + 1024 for w, devices in result.items(): - ident = devices[0] + ident = devices.split(",")[0] assert int(ident) == cluster.scheduler.workers[w].name with pytest.raises(ValueError): @@ -107,11 +108,15 @@ async def test_n_workers(): @gen_test(timeout=20) -async def test_rmm_pool(): +async def test_rmm(): rmm = pytest.importorskip("rmm") - async with LocalCUDACluster(rmm_pool_size="2GB", asynchronous=True) as cluster: + async with LocalCUDACluster( + rmm_pool_size="2GB", rmm_managed_memory=True, asynchronous=True + ) as cluster: async with Client(cluster, asynchronous=True) as client: - memory_info = await client.run(rmm.get_info) - for v in memory_info.values(): - assert v.total == 2000000000 + memory_resource_type = await client.run( + rmm.mr.get_current_device_resource_type + ) + for v in memory_resource_type.values(): + assert v is rmm.mr.PoolMemoryResource diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 8f45c729..c826d542 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -1,18 +1,19 @@ import os from time import sleep +import pytest +from zict.file import _safe_key as safe_key + import dask -import dask.array as da -from dask_cuda import LocalCUDACluster, utils -from dask_cuda.device_host_file import DeviceHostFile +from dask import array as da from distributed import Client, get_worker, wait from distributed.metrics import time from distributed.sizeof import sizeof from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401 from distributed.worker import Worker -import pytest -from zict.file import _safe_key as safe_key +from dask_cuda import LocalCUDACluster, utils +from dask_cuda.device_host_file import DeviceHostFile if utils.get_device_total_memory() < 1e10: pytest.skip("Not enough GPU memory", allow_module_level=True) diff --git a/dask_cuda/tests/test_ucx_options.py b/dask_cuda/tests/test_ucx_options.py index 8326f459..37f61d15 100644 --- a/dask_cuda/tests/test_ucx_options.py +++ b/dask_cuda/tests/test_ucx_options.py @@ -1,13 +1,13 @@ import multiprocessing as mp +import numpy +import pytest + import dask -import dask.array as da +from dask import array as da from distributed import Client from distributed.deploy.local import LocalCluster -import numpy -import pytest - mp = mp.get_context("spawn") ucp = pytest.importorskip("ucp") diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index 9780cd93..7a02bee0 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -1,5 +1,8 @@ import os +import pytest +from numba import cuda + from dask_cuda.utils import ( get_cpu_affinity, get_device_total_memory, @@ -10,9 +13,6 @@ unpack_bitmask, ) -import pytest -from numba import cuda - def test_get_n_gpus(): assert isinstance(get_n_gpus(), int) @@ -52,16 +52,15 @@ def test_cpu_affinity(): for i in range(get_n_gpus()): affinity = get_cpu_affinity(i) os.sched_setaffinity(0, affinity) - assert list(os.sched_getaffinity(0)) == affinity + assert os.sched_getaffinity(0) == set(affinity) def test_get_device_total_memory(): for i in range(get_n_gpus()): with cuda.gpus[i]: - assert ( - get_device_total_memory(i) - == cuda.current_context().get_memory_info()[1] - ) + total_mem = get_device_total_memory(i) + assert type(total_mem) is int + assert total_mem > 0 @pytest.mark.parametrize("enable_tcp", [True, False]) diff --git a/dask_cuda/tests/test_worker_spec.py b/dask_cuda/tests/test_worker_spec.py index 454bf106..a157dcf9 100644 --- a/dask_cuda/tests/test_worker_spec.py +++ b/dask_cuda/tests/test_worker_spec.py @@ -1,7 +1,8 @@ -from dask_cuda.worker_spec import worker_spec +import pytest + from distributed import Nanny -import pytest +from dask_cuda.worker_spec import worker_spec def _check_option(spec, k, v): diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index b52156fb..72c7736c 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -1,5 +1,6 @@ import math import os +import time import warnings from multiprocessing import cpu_count @@ -27,16 +28,21 @@ def setup(self, worker=None): os.sched_setaffinity(0, self.cores) -class RMMPool: - def __init__(self, nbytes): +class RMMSetup: + def __init__(self, nbytes, managed_memory): self.nbytes = nbytes + self.managed_memory = managed_memory def setup(self, worker=None): - if self.nbytes is not None: + if self.nbytes is not None or self.managed_memory is True: import rmm + pool_allocator = False if self.nbytes is None else True + rmm.reinitialize( - pool_allocator=True, managed_memory=False, initial_pool_size=self.nbytes + pool_allocator=pool_allocator, + managed_memory=self.managed_memory, + initial_pool_size=self.nbytes, ) @@ -213,6 +219,7 @@ def get_ucx_config( "rdmacm": None, "net-devices": None, "cuda_copy": None, + "reuse-endpoints": True, } if enable_tcp_over_ucx or enable_infiniband or enable_nvlink: ucx_config["cuda_copy"] = True @@ -307,3 +314,47 @@ def get_preload_options( preload_options["preload_argv"].extend(initialize_ucx_argv) return preload_options + + +def wait_workers( + client, min_timeout=10, seconds_per_gpu=2, n_gpus=None, timeout_callback=None +): + """ + Wait for workers to be available. When a timeout occurs, a callback + is executed if specified. Generally used for tests. + + Parameters + ---------- + client: distributed.Client + Instance of client, used to query for number of workers connected. + min_timeout: float + Minimum number of seconds to wait before timeout. + seconds_per_gpu: float + Seconds to wait for each GPU on the system. For example, if its + value is 2 and there is a total of 8 GPUs (workers) being started, + a timeout will occur after 16 seconds. Note that this value is only + used as timeout when larger than min_timeout. + n_gpus: None or int + If specified, will wait for a that amount of GPUs (i.e., Dask workers) + to come online, else waits for a total of `get_n_gpus` workers. + timeout_callback: None or callable + A callback function to be executed if a timeout occurs, ignored if + None. + + Returns + ------- + True if all workers were started, False if a timeout occurs. + """ + n_gpus = n_gpus or get_n_gpus() + timeout = max(min_timeout, seconds_per_gpu * n_gpus) + + start = time.time() + while True: + if len(client.scheduler_info()["workers"]) == n_gpus: + return True + elif time.time() - start > timeout: + if callable(timeout_callback): + timeout_callback() + return False + else: + time.sleep(0.1) diff --git a/docs/source/conf.py b/docs/source/conf.py index 70483769..4bb283d1 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -23,10 +23,11 @@ copyright = "2020, NVIDIA" author = "NVIDIA" -# The short X.Y version -version = "0.14" -# The full version, including alpha/beta/rc tags -release = "0.14.0" +# The full version, including alpha/beta/rc tags. +from dask_cuda import __version__ as release + +# The short X.Y version. +version = ".".join(release.split(".")[:2]) # -- General configuration --------------------------------------------------- @@ -46,8 +47,11 @@ "sphinx.ext.autosummary", "sphinx.ext.intersphinx", "sphinx.ext.extlinks", + "numpydoc", ] +numpydoc_show_class_members = False + # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] diff --git a/docs/source/index.rst b/docs/source/index.rst index 3d53e39c..507e3066 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,5 +1,5 @@ Dask-CUDA -====== +========= Dask-CUDA is tool for using `Dask `_ on GPUs. It extends Dask's `Single-Machine Cluster `_ and `Workers `_ for optimized distributed GPU workloads. diff --git a/docs/source/ucx.rst b/docs/source/ucx.rst index 9df9f9fd..b149cf94 100644 --- a/docs/source/ucx.rst +++ b/docs/source/ucx.rst @@ -43,11 +43,13 @@ dask-scheduler The ``dask-scheduler`` has no parameters for UCX configuration -- different from what we will see for ``dask-cuda-worker`` on the next section -- for that reason we rely on Dask environment variables. Here's how to start the scheduler with all transports that are currently supported by Dask-CUDA: .. code-block:: bash + DASK_RMM__POOL_SIZE=1GB DASK_UCX__CUDA_COPY=True DASK_UCX__TCP=True DASK_UCX__NVLINK=True DASK_UCX__INFINIBAND=True DASK_UCX__RDMACM=True DASK_UCX__NET_DEVICES=mlx5_0:1 dask-scheduler --protocol ucx --interface ib0 Note above how we use ``DASK_UCX__NET_DEVICES=mlx5_0:1`` (the Mellanox name for ``ib0``) and the same interface with ``--interface ib0``. If the system doesn't have an InfiniBand interface available, you would normally use the main network interface, such as ``eth0``, as seen below: .. code-block:: bash + DASK_RMM__POOL_SIZE=1GB DASK_UCX__CUDA_COPY=True DASK_UCX__TCP=True DASK_UCX__NVLINK=True dask-scheduler --protocol ucx --interface eth0 Setting ``DASK_UCX__NET_DEVICES`` when using an interface that isn't an InfiniBand can generally be skipped. @@ -67,6 +69,7 @@ All ``DASK_*`` configurations described above have analogous parameters in ``das Here's how to start workers with all transports that are currently relevant for us: .. code-block:: bash + dask-cuda-worker ucx://SCHEDULER_IB0_IP:8786 --enable-tcp-over-ucx --enable-nvlink --enable-infiniband -- enable-rdmacm --net-devices="auto" --rmm-pool-size="30GB" @@ -78,6 +81,7 @@ The same configurations used for the scheduler should be used by the client. One One can use ``os.environ`` inside the client script, it's important to set them at the very top before importing anything other than ``os``. See example below: .. code-block:: python + import os os.environ["DASK_RMM__POOL_SIZE"] = "1GB" @@ -124,7 +128,7 @@ All options discussed previously are also available in ``LocalCUDACluster``. It enable_nvlink = True enable_infiniband = True ucx_net_devices="auto" - rmm_pool="24GB" + rmm_pool_size="24GB" ) client = Client(cluster) diff --git a/readthedocs.yml b/readthedocs.yml deleted file mode 100644 index b11377da..00000000 --- a/readthedocs.yml +++ /dev/null @@ -1,9 +0,0 @@ -build: - image: latest - -python: - version: 3.7 - setup_py_install: true - -conda: - file: conda/environments/builddocs_py37.yml diff --git a/requirements.txt b/requirements.txt index cd79b261..16141fc3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ dask>=2.9.0 -distributed>=2.11.0 +distributed>=2.18.0 pynvml>=8.0.3 numpy>=1.16.0 numba>=0.40.1 diff --git a/setup.cfg b/setup.cfg index 79670b83..76dd01bd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,7 +3,7 @@ VCS = git style = pep440 versionfile_source = dask_cuda/_version.py versionfile_build = dask_cuda/_version.py -tag_prefix = +tag_prefix = v parentdir_prefix = dask_cuda- [flake8] @@ -31,8 +31,17 @@ order_by_type=True known_dask= dask distributed +known_rapids= + rmm + cuml + cugraph + dask_cudf + cudf + ucp +known_first_party= dask_cuda -sections=FUTURE,STDLIB,DASK,FIRSTPARTY,LOCALFOLDER +default_section=THIRDPARTY +sections=FUTURE,STDLIB,THIRDPARTY,DASK,RAPIDS,FIRSTPARTY,LOCALFOLDER skip= .eggs .git diff --git a/setup.py b/setup.py index b83131cc..e585e40d 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,10 @@ import os from codecs import open -import versioneer from setuptools import find_packages, setup +import versioneer + # Get the long description from the README file with open(os.path.join(os.path.dirname(__file__), "README.md")) as f: long_description = f.read() @@ -38,6 +39,6 @@ install_requires=open("requirements.txt").read().strip().split("\n"), entry_points=""" [console_scripts] - dask-cuda-worker=dask_cuda.dask_cuda_worker:go + dask-cuda-worker=dask_cuda.cli.dask_cuda_worker:go """, )