Skip to content

Commit

Permalink
Merge pull request #390 from rapidsai/branch-0.15
Browse files Browse the repository at this point in the history
  • Loading branch information
raydouglass committed Aug 26, 2020
2 parents 3fc6db4 + 712364e commit a488e5e
Show file tree
Hide file tree
Showing 39 changed files with 857 additions and 465 deletions.
16 changes: 16 additions & 0 deletions .readthedocs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: 2

build:
image: latest

sphinx:
configuration: docs/source/conf.py

python:
version: 3.7
install:
- method: pip
path: .

conda:
environment: conda/environments/builddocs_py37.yml
39 changes: 39 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,40 @@
0.15
----
- Fix-up versioneer (#305) `John Kirkham`_
- Require Distributed 2.15.0+ (#306) `John Kirkham`_
- Rely on Dask's ability to serialize collections (#307) `John Kirkham`_
- Ensure CI installs GPU build of UCX (#308) `Peter Andreas Entschev`_
- Skip 2nd serialization pass of `DeviceSerialized` (#309) `John Kirkham`_
- Fix tests related to latest RMM changes (#310) `Peter Andreas Entschev`_
- Fix dask-cuda-worker's interface argument (#314) `Peter Andreas Entschev`_
- Check only for memory type during test_get_device_total_memory (#315) `Peter Andreas Entschev`_
- Fix and improve DGX tests (#316) `Peter Andreas Entschev`_
- Install dependencies via meta package (#317) `Ray Douglass`_
- Fix errors when TLS files are not specified (#320) `Peter Andreas Entschev`_
- Refactor dask-cuda-worker into CUDAWorker class (#324) `Jacob Tomlinson`_
- Add missing __init__.py to dask_cuda/cli (#327) `Peter Andreas Entschev`_
- Add Dask distributed GPU tests to CI (#329) `Benjamin Zaitlen`_
- Fix rmm_pool_size argument name in docstrings (#329) `Benjamin Zaitlen`_
- Add CPU support to benchmarks (#338) `Benjamin Zaitlen`_
- Fix isort configuration (#339) `Mads R. B. Kristensen`_
- Explicit-comms: cleanup and bug fix (#340) `Mads R. B. Kristensen`_
- Add support for RMM managed memory (#343) `Peter Andreas Entschev`_
- Update docker image in local build script (#345) `Sean Frye`_
- Support pickle protocol 5 based spilling (#349) `John Kirkham`_
- Use get_n_gpus for RMM test with dask-cuda-worker (#356) `Peter Andreas Entschev`_
- Update RMM tests based on deprecated CNMeM (#359) `John Kirkham`_
- Fix a black error in explicit comms (#360) `John Kirkham`_
- Fix an `isort` error (#360) `John Kirkham`_
- Fix an `isort` error (#360) `John Kirkham`_
- Set `RMM_NO_INITIALIZE` environment variable (#363) `Benjamin Zaitlen`_
- Fix bash lines in docs (#369) `Benjamin Zaitlen`_
- Replace `RMM_NO_INITIALIZE` with `RAPIDS_NO_INITIALIZE` (#371) `John Kirkham`_
- Fixes for docs and RTD updates (#373) `Benjamin Zaitlen`_
- Confirm DGX tests are running baremetal (#376) `Peter Andreas Entschev`_
- Set RAPIDS_NO_INITIALIZE at the top of CUDAWorker/LocalCUDACluster (#379) `Peter Andreas Entschev`_
- Change pytest's basetemp in CI build script (#380) `Peter Andreas Entschev`_
- Pin Numba version to exclude 0.51.0 (#385) `Benjamin Zaitlen`_

0.14
----
- Publish branch-0.14 to conda (#262) `Paul Taylor`_
Expand Down Expand Up @@ -143,3 +180,5 @@
.. _`Paul Taylor`: https://github.com/trxcllnt
.. _`Eli Fajardo`: https://github.com/efajardo-nv
.. _`Randy Gelhausen`: https://github.com/randerzander
.. _`Jacob Tomlinson`: https://github.com/jacobtomlinson
.. _`Sean Frye`: https://github.com/sean-frye
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[![RTD](https://readthedocs.org/projects/dask-cuda/badge/?version=latest)](https://dask-cuda.readthedocs.io/en/latest/?badge=latest)

Dask CUDA
=========

Expand Down Expand Up @@ -29,4 +31,5 @@ It only helps with deployment and management of Dask workers in multi-GPU
systems. Parallelizing GPU libraries like [RAPIDS](https://rapids.ai) and
[CuPy](https://cupy.chainer.org) with Dask is an ongoing effort. You may wish
to read about this effort at [blog.dask.org](https://blog.dask.org) for more
information..
information. Additional information about Dask-CUDA can also be found in the
[docs]( https://dask-cuda.readthedocs.io ).
3 changes: 1 addition & 2 deletions ci/cpu/upload-anaconda.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ set -e
export UPLOADFILE=`conda build conda/recipes/dask-cuda --python=$PYTHON --output`
CUDA_REL=${CUDA_VERSION%.*}

SOURCE_BRANCH=master

LABEL_OPTION="--label main"
echo "LABEL_OPTION=${LABEL_OPTION}"

# Restrict uploads to master branch
if [ ${GIT_BRANCH} != ${SOURCE_BRANCH} ]; then
if [ ${BUILD_MODE} != "branch" ]; then
echo "Skipping upload"
return 0
fi
Expand Down
4 changes: 1 addition & 3 deletions ci/cpu/upload-pypi.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/bin/bash
set -e

SOURCE_BRANCH=master

# Restrict uploads to master branch
if [ ${GIT_BRANCH} != ${SOURCE_BRANCH} ]; then
if [ ${BUILD_MODE} != "branch" ]; then
echo "Skipping upload"
return 0
fi
Expand Down
25 changes: 18 additions & 7 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,14 @@ conda list
# Fixing Numpy version to avoid RuntimeWarning: numpy.ufunc size changed, may
# indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject
conda install "cudatoolkit=$CUDA_REL" \
"cupy>=6.5.0" "numpy=1.16.4" \
"cudf=${MINOR_VERSION}" "dask-cudf=${MINOR_VERSION}" \
"dask>=2.8.1" "distributed>=2.8.1"
"ucx-py=$MINOR_VERSION.*" "ucx-proc=*=gpu" \
"rapids-build-env=$MINOR_VERSION.*"

# needed for async tests
conda install -c conda-forge "pytest" "pytest-asyncio"
# https://docs.rapids.ai/maintainers/depmgmt/
# conda remove -f rapids-build-env
# conda install "your-pkg=1.0.0"

# Use nightly build of ucx-py for now
conda install "ucx-py=$MINOR_VERSION"

conda list

Expand Down Expand Up @@ -101,5 +100,17 @@ else
logger "Python py.test for dask-cuda..."
cd $WORKSPACE
ls dask_cuda/tests/
UCXPY_IFNAME=eth0 UCX_WARN_UNUSED_ENV_VARS=n UCX_MEMTYPE_CACHE=n py.test -vs --cache-clear --junitxml=${WORKSPACE}/junit-dask-cuda.xml --cov-config=.coveragerc --cov=dask_cuda --cov-report=xml:${WORKSPACE}/dask-cuda-coverage.xml --cov-report term dask_cuda/tests/
UCXPY_IFNAME=eth0 UCX_WARN_UNUSED_ENV_VARS=n UCX_MEMTYPE_CACHE=n py.test -vs --cache-clear --basetemp=${WORKSPACE}/dask-cuda-tmp --junitxml=${WORKSPACE}/junit-dask-cuda.xml --cov-config=.coveragerc --cov=dask_cuda --cov-report=xml:${WORKSPACE}/dask-cuda-coverage.xml --cov-report term dask_cuda/tests/

logger "Running dask.distributed GPU tests"
# Test downstream packages, which requires Python v3.7
if [ $(python -c "import sys; print(sys.version_info[1])") -ge "7" ]; then
logger "TEST OF DASK/UCX..."
py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_cupy as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_numba as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_rmm as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_collection_cuda as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.tests.test_nanny as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.tests.test_gpu_metrics as m;print(m.__file__)"`
fi
fi
7 changes: 5 additions & 2 deletions ci/local/build.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/bin/bash

DOCKER_IMAGE="gpuci/rapidsai-base:cuda10.0-ubuntu16.04-gcc5-py3.6"
GIT_DESCRIBE_TAG=`git describe --tags`
MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'`

DOCKER_IMAGE="gpuci/rapidsai:${MINOR_VERSION}-cuda10.1-devel-ubuntu16.04-py3.7"
REPO_PATH=${PWD}
RAPIDS_DIR_IN_CONTAINER="/rapids"
CPP_BUILD_DIR="cpp/build"
Expand Down Expand Up @@ -139,4 +142,4 @@ docker run --rm -it ${GPU_OPTS} \
-v "$PASSWD_FILE":/etc/passwd:ro \
-v "$GROUP_FILE":/etc/group:ro \
--cap-add=SYS_PTRACE \
"${DOCKER_IMAGE}" bash -c "${COMMAND}"
"${DOCKER_IMAGE}" bash -c "${COMMAND}"
4 changes: 2 additions & 2 deletions conda/recipes/dask-cuda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ requirements:
run:
- python x.x
- dask-core >=2.4.0
- distributed >=2.7.0
- distributed >=2.18.0
- pynvml >=8.0.3
- numpy >=1.16.0
- numba >=0.40.1
- numba >=0.50.0,!=0.51.0

test:
imports:
Expand Down
1 change: 1 addition & 0 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._version import get_versions
from .local_cuda_cluster import LocalCUDACluster
from .cuda_worker import CUDAWorker

__version__ = get_versions()["version"]
del get_versions
98 changes: 61 additions & 37 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
from collections import defaultdict
from time import perf_counter as clock

import numpy

from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.distributed import Client, performance_report, wait
from dask.utils import format_bytes, format_time, parse_bytes

from dask_cuda import explicit_comms
from dask_cuda.benchmarks.utils import (
get_cluster_options,
Expand All @@ -14,17 +17,21 @@
setup_memory_pool,
)

import cudf
import cupy
import numpy

# Benchmarking cuDF merge operation based on
# <https://gist.github.com/rjzamora/0ffc35c19b5180ab04bbf7c793c45955>


def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match):
def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu):
# Setting a seed that triggers max amount of comm in the two-GPU case.
cupy.random.seed(17561648246761420848)
if gpu:
import cupy as xp

import cudf as xdf
else:
import numpy as xp
import pandas as xdf

xp.random.seed(2 ** 32 - 1)

chunk_type = chunk_type or "build"
frac_match = frac_match or 1.0
Expand All @@ -40,16 +47,14 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match):
start = local_size * i_chunk
stop = start + local_size

parts_array = cupy.arange(num_chunks, dtype="int64")
suffle_array = cupy.repeat(parts_array, math.ceil(local_size / num_chunks))
parts_array = xp.arange(num_chunks, dtype="int64")
suffle_array = xp.repeat(parts_array, math.ceil(local_size / num_chunks))

df = cudf.DataFrame(
df = xdf.DataFrame(
{
"key": cupy.arange(start, stop=stop, dtype="int64"),
"shuffle": cupy.random.permutation(suffle_array)[:local_size],
"payload": cupy.random.permutation(
cupy.arange(local_size, dtype="int64")
),
"key": xp.arange(start, stop=stop, dtype="int64"),
"shuffle": xp.random.permutation(suffle_array)[:local_size],
"payload": xp.random.permutation(xp.arange(local_size, dtype="int64")),
}
)
else:
Expand All @@ -69,26 +74,24 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match):
for i in range(num_chunks):
bgn = (local_size * i) + (sub_local_size * i_chunk)
end = bgn + sub_local_size
ar = cupy.arange(bgn, stop=end, dtype="int64")
arrays.append(cupy.random.permutation(ar)[:sub_local_size_use])
key_array_match = cupy.concatenate(tuple(arrays), axis=0)
ar = xp.arange(bgn, stop=end, dtype="int64")
arrays.append(xp.random.permutation(ar)[:sub_local_size_use])
key_array_match = xp.concatenate(tuple(arrays), axis=0)

# Step 2. Add values that DON'T match
missing_size = local_size - key_array_match.shape[0]
start = local_size * num_chunks + local_size * i_chunk
stop = start + missing_size
key_array_no_match = cupy.arange(start, stop=stop, dtype="int64")
key_array_no_match = xp.arange(start, stop=stop, dtype="int64")

# Step 3. Combine and create the final dataframe chunk (dask_cudf partition)
key_array_combine = cupy.concatenate(
key_array_combine = xp.concatenate(
(key_array_match, key_array_no_match), axis=0
)
df = cudf.DataFrame(
df = xdf.DataFrame(
{
"key": cupy.random.permutation(key_array_combine),
"payload": cupy.random.permutation(
cupy.arange(local_size, dtype="int64")
),
"key": xp.random.permutation(key_array_combine),
"payload": xp.random.permutation(xp.arange(local_size, dtype="int64")),
}
)
return df
Expand All @@ -97,13 +100,22 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match):
def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args):

parts = [chunk_size for i in range(num_chunks)]
meta = generate_chunk(0, 4, 1, chunk_type, None)
device_type = True if args.type == "gpu" else False
meta = generate_chunk(0, 4, 1, chunk_type, None, device_type)
divisions = [None] * (len(parts) + 1)

name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type)

graph = {
(name, i): (generate_chunk, i, part, len(parts), chunk_type, frac_match)
(name, i): (
generate_chunk,
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
}

Expand Down Expand Up @@ -177,20 +189,24 @@ def main(args):
cluster_kwargs = cluster_options["kwargs"]
scheduler_addr = cluster_options["scheduler_addr"]

cluster = Cluster(*cluster_args, **cluster_kwargs)
if args.multi_node:
import time
if args.sched_addr:
client = Client(args.sched_addr)
else:
cluster = Cluster(*cluster_args, **cluster_kwargs)
if args.multi_node:
import time

# Allow some time for workers to start and connect to scheduler
# TODO: make this a command-line argument?
time.sleep(15)
# Allow some time for workers to start and connect to scheduler
# TODO: make this a command-line argument?
time.sleep(15)

client = Client(scheduler_addr if args.multi_node else cluster)
client = Client(scheduler_addr if args.multi_node else cluster)

client.run(setup_memory_pool, disable_pool=args.no_rmm_pool)
# Create an RMM pool on the scheduler due to occasional deserialization
# of CUDA objects. May cause issues with InfiniBand otherwise.
client.run_on_scheduler(setup_memory_pool, 1e9, disable_pool=args.no_rmm_pool)
if args.type == "gpu":
client.run(setup_memory_pool, disable_pool=args.no_rmm_pool)
# Create an RMM pool on the scheduler due to occasional deserialization
# of CUDA objects. May cause issues with InfiniBand otherwise.
client.run_on_scheduler(setup_memory_pool, 1e9, disable_pool=args.no_rmm_pool)

scheduler_workers = client.run_on_scheduler(get_scheduler_workers)
n_workers = len(scheduler_workers)
Expand Down Expand Up @@ -227,6 +243,7 @@ def main(args):
print("Merge benchmark")
print("-------------------------------")
print(f"backend | {args.backend}")
print(f"merge type | {args.type}")
print(f"rows-per-chunk | {args.chunk_size}")
print(f"protocol | {args.protocol}")
print(f"device(s) | {args.devs}")
Expand Down Expand Up @@ -278,6 +295,13 @@ def parse_args():
"type": str,
"help": "The backend to use.",
},
{
"name": ["-t", "--type",],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Do merge with GPU or CPU dataframes",
},
{
"name": ["-c", "--chunk-size",],
"default": 1_000_000,
Expand Down
9 changes: 5 additions & 4 deletions dask_cuda/benchmarks/local_cupy_transpose_sum.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@
from collections import defaultdict
from time import perf_counter as clock

import dask.array as da
import cupy
import numpy as np

from dask import array as da
from dask.distributed import Client, performance_report, wait
from dask.utils import format_bytes, format_time, parse_bytes

from dask_cuda.benchmarks.utils import (
get_cluster_options,
get_scheduler_workers,
parse_benchmark_args,
setup_memory_pool,
)

import cupy
import numpy as np


async def _run(client, args):
# Create a simple random array
Expand Down
Loading

0 comments on commit a488e5e

Please sign in to comment.