Skip to content

Commit

Permalink
Merge pull request #1008 from rapidsai/branch-22.10
Browse files Browse the repository at this point in the history
  • Loading branch information
raydouglass committed Oct 12, 2022
2 parents 9a61ce5 + 62a1ee8 commit d7c6750
Show file tree
Hide file tree
Showing 12 changed files with 520 additions and 79 deletions.
25 changes: 24 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
# dask-cuda 22.08.00 (17 Aug 2022)
# dask-cuda 22.10.00 (12 Oct 2022)

## 🐛 Bug Fixes

- Revert "Update rearrange_by_column patch for explicit comms" ([#1001](https://github.com/rapidsai/dask-cuda/pull/1001)) [@rjzamora](https://github.com/rjzamora)
- Address CI failures caused by upstream distributed and cupy changes ([#993](https://github.com/rapidsai/dask-cuda/pull/993)) [@rjzamora](https://github.com/rjzamora)
- DeviceSerialized.__reduce_ex__: convert frame to numpy arrays ([#977](https://github.com/rapidsai/dask-cuda/pull/977)) [@madsbk](https://github.com/madsbk)

## 📖 Documentation

- Remove line-break that's breaking link ([#982](https://github.com/rapidsai/dask-cuda/pull/982)) [@ntabris](https://github.com/ntabris)
- Dask-cuda best practices ([#976](https://github.com/rapidsai/dask-cuda/pull/976)) [@quasiben](https://github.com/quasiben)

## 🚀 New Features

- Add Groupby benchmark ([#979](https://github.com/rapidsai/dask-cuda/pull/979)) [@rjzamora](https://github.com/rjzamora)

## 🛠️ Improvements

- Pin `dask` and `distributed` for release ([#1003](https://github.com/rapidsai/dask-cuda/pull/1003)) [@galipremsagar](https://github.com/galipremsagar)
- Update rearrange_by_column patch for explicit comms ([#992](https://github.com/rapidsai/dask-cuda/pull/992)) [@rjzamora](https://github.com/rjzamora)
- benchmarks: Add option to suppress output of point to point data ([#985](https://github.com/rapidsai/dask-cuda/pull/985)) [@wence-](https://github.com/wence-)
- Unpin `dask` and `distributed` for development ([#971](https://github.com/rapidsai/dask-cuda/pull/971)) [@galipremsagar](https://github.com/galipremsagar)

# dask-cuda 22.08.00 (17 Aug 2022)
## 🚨 Breaking Changes

- Fix useless property ([#944](https://github.com/rapidsai/dask-cuda/pull/944)) [@wence-](https://github.com/wence-)
Expand Down
14 changes: 11 additions & 3 deletions ci/cpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export CUDA_REL=${CUDA_VERSION%.*}
export GPUCI_CONDA_RETRY_MAX=1
export GPUCI_CONDA_RETRY_SLEEP=30

# Whether to keep `dask/label/dev` channel in the env. If INSTALL_DASK_MAIN=0,
# `dask/label/dev` channel is removed.
export INSTALL_DASK_MAIN=0

# Switch to project root; also root of repo checkout
cd "$WORKSPACE"

Expand All @@ -43,9 +47,13 @@ gpuci_logger "Activate conda env"
. /opt/conda/etc/profile.d/conda.sh
conda activate rapids

# Remove rapidsai-nightly channel if we are building main branch
# Remove `rapidsai-nightly` & `dask/label/dev` channel if we are building main branch
if [ "$SOURCE_BRANCH" = "main" ]; then
conda config --system --remove channels rapidsai-nightly
conda config --system --remove channels dask/label/dev
elif [[ "${INSTALL_DASK_MAIN}" == 0 ]]; then
# Remove `dask/label/dev` channel if INSTALL_DASK_MAIN=0
conda config --system --remove channels dask/label/dev
fi

gpuci_logger "Check compiler versions"
Expand All @@ -61,8 +69,8 @@ conda list --show-channel-urls
# FIX Added to deal with Anancoda SSL verification issues during conda builds
conda config --set ssl_verify False

pip install git+https://github.com/dask/dask.git@main
pip install git+https://github.com/dask/distributed.git@main
pip install git+https://github.com/dask/dask.git@2022.9.2
pip install git+https://github.com/dask/distributed.git@2022.9.2

################################################################################
# BUILD - Package builds
Expand Down
5 changes: 3 additions & 2 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ cd "$WORKSPACE"
export GIT_DESCRIBE_TAG=`git describe --tags`
export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'`
export UCX_PATH=$CONDA_PREFIX
export UCXPY_VERSION=0.27.*
export UCXPY_VERSION=0.28.*
unset GIT_DESCRIBE_TAG

# Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x,
Expand All @@ -38,7 +38,7 @@ export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1
export INSTALL_DASK_MAIN=0

# Dask version to install when `INSTALL_DASK_MAIN=0`
export DASK_STABLE_VERSION="2022.7.1"
export DASK_STABLE_VERSION="2022.9.2"

################################################################################
# SETUP - Check environment
Expand Down Expand Up @@ -77,6 +77,7 @@ if [[ "${INSTALL_DASK_MAIN}" == 1 ]]; then
else
gpuci_logger "gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall"
gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall
conda config --system --remove channels dask/label/dev
fi


Expand Down
273 changes: 273 additions & 0 deletions dask_cuda/benchmarks/local_cudf_groupby.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
import contextlib
from collections import ChainMap
from time import perf_counter as clock

import pandas as pd

import dask
import dask.dataframe as dd
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes

from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
parse_benchmark_args,
print_key_value,
print_separator,
print_throughput_bandwidth,
)


def apply_groupby(
df,
sort=False,
split_out=1,
split_every=8,
shuffle=None,
):
# Handle special "explicit-comms" case
config = {}
if shuffle == "explicit-comms":
shuffle = "tasks"
config = {"explicit-comms": True}

with dask.config.set(config):
agg = df.groupby("key", sort=sort).agg(
{"int64": ["max", "count"], "float64": "mean"},
split_out=split_out,
split_every=split_every,
shuffle=shuffle,
)

wait(agg.persist())
return agg


def generate_chunk(chunk_info, unique_size=1, gpu=True):
# Setting a seed that triggers max amount of comm in the two-GPU case.
if gpu:
import cupy as xp

import cudf as xdf
else:
import numpy as xp
import pandas as xdf

i_chunk, local_size = chunk_info
xp.random.seed(i_chunk * 1_000)
return xdf.DataFrame(
{
"key": xp.random.randint(0, unique_size, size=local_size, dtype="int64"),
"int64": xp.random.permutation(xp.arange(local_size, dtype="int64")),
"float64": xp.random.permutation(xp.arange(local_size, dtype="float64")),
}
)


def get_random_ddf(args):

total_size = args.chunk_size * args.in_parts
chunk_kwargs = {
"unique_size": max(int(args.unique_ratio * total_size), 1),
"gpu": True if args.type == "gpu" else False,
}

return dd.from_map(
generate_chunk,
[(i, args.chunk_size) for i in range(args.in_parts)],
meta=generate_chunk((0, 1), **chunk_kwargs),
enforce_metadata=False,
**chunk_kwargs,
)


def bench_once(client, args, write_profile=None):

# Generate random Dask dataframe
df = get_random_ddf(args)

data_processed = len(df) * sum([t.itemsize for t in df.dtypes])
shuffle = {
"True": "tasks",
"False": False,
}.get(args.shuffle, args.shuffle)

if write_profile is None:
ctx = contextlib.nullcontext()
else:
ctx = performance_report(filename=args.profile)

with ctx:
t1 = clock()
agg = apply_groupby(
df,
sort=args.sort,
split_out=args.split_out,
split_every=args.split_every,
shuffle=shuffle,
)
t2 = clock()

output_size = agg.memory_usage(index=True, deep=True).compute().sum()
return (data_processed, output_size, t2 - t1)


def pretty_print_results(args, address_to_index, p2p_bw, results):
if args.markdown:
print("```")
print("Groupby benchmark")
print_separator(separator="-")
print_key_value(key="Use shuffle", value=f"{args.shuffle}")
print_key_value(key="Output partitions", value=f"{args.split_out}")
print_key_value(key="Input partitions", value=f"{args.in_parts}")
print_key_value(key="Sort Groups", value=f"{args.sort}")
print_key_value(key="Rows-per-chunk", value=f"{args.chunk_size}")
print_key_value(key="Unique-group ratio", value=f"{args.unique_ratio}")
print_key_value(key="Protocol", value=f"{args.protocol}")
print_key_value(key="Device(s)", value=f"{args.devs}")
print_key_value(key="Tree-reduction width", value=f"{args.split_every}")
if args.device_memory_limit:
print_key_value(
key="Device memory limit", value=f"{format_bytes(args.device_memory_limit)}"
)
print_key_value(key="RMM Pool", value=f"{not args.disable_rmm_pool}")
if args.protocol == "ucx":
print_key_value(key="TCP", value=f"{args.enable_tcp_over_ucx}")
print_key_value(key="InfiniBand", value=f"{args.enable_infiniband}")
print_key_value(key="NVLink", value=f"{args.enable_nvlink}")
print_key_value(key="Worker thread(s)", value=f"{args.threads_per_worker}")
print_key_value(key="Data processed", value=f"{format_bytes(results[0][0])}")
print_key_value(key="Output size", value=f"{format_bytes(results[0][1])}")
if args.markdown:
print("\n```")
data_processed, output_size, durations = zip(*results)
print_throughput_bandwidth(
args, durations, data_processed, p2p_bw, address_to_index
)


def create_tidy_results(args, p2p_bw, results):
configuration = {
"dataframe_type": "cudf" if args.type == "gpu" else "pandas",
"shuffle": args.shuffle,
"sort": args.sort,
"split_out": args.split_out,
"split_every": args.split_every,
"in_parts": args.in_parts,
"rows_per_chunk": args.chunk_size,
"unique_ratio": args.unique_ratio,
"protocol": args.protocol,
"devs": args.devs,
"device_memory_limit": args.device_memory_limit,
"rmm_pool": not args.disable_rmm_pool,
"tcp": args.enable_tcp_over_ucx,
"ib": args.enable_infiniband,
"nvlink": args.enable_nvlink,
}
timing_data = pd.DataFrame(
[
pd.Series(
data=ChainMap(
configuration,
{
"wallclock": duration,
"data_processed": data_processed,
"output_size": output_size,
},
)
)
for data_processed, output_size, duration in results
]
)
return timing_data, p2p_bw


def parse_args():
special_args = [
{
"name": "--in-parts",
"default": 100,
"metavar": "n",
"type": int,
"help": "Number of input partitions (default '100')",
},
{
"name": [
"-c",
"--chunk-size",
],
"default": 1_000_000,
"metavar": "n",
"type": int,
"help": "Chunk size (default 1_000_000)",
},
{
"name": "--unique-ratio",
"default": 0.01,
"type": float,
"help": "Fraction of rows that are unique groups",
},
{
"name": "--sort",
"default": False,
"action": "store_true",
"help": "Whether to sort the output group order.",
},
{
"name": "--split_out",
"default": 1,
"type": int,
"help": "How many partitions to return.",
},
{
"name": "--split_every",
"default": 8,
"type": int,
"help": "Tree-reduction width.",
},
{
"name": "--shuffle",
"choices": ["False", "True", "tasks", "explicit-comms"],
"default": "False",
"type": str,
"help": "Whether to use shuffle-based groupby.",
},
{
"name": [
"-t",
"--type",
],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Do shuffle with GPU or CPU dataframes (default 'gpu')",
},
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
{
"name": "--runs",
"default": 3,
"type": int,
"help": "Number of runs",
},
]

return parse_benchmark_args(
description="Distributed groupby (dask/cudf) benchmark", args_list=special_args
)


if __name__ == "__main__":
execute_benchmark(
Config(
args=parse_args(),
bench_once=bench_once,
create_tidy_results=create_tidy_results,
pretty_print_results=pretty_print_results,
)
)
Loading

0 comments on commit d7c6750

Please sign in to comment.