Skip to content

Commit

Permalink
cuda.parallel: Minor perf improvements (#3718)
Browse files Browse the repository at this point in the history
* Add get_data_pointer utility and fast paths for CuPy

* Remove type validations between calls to __init__ and __call__

* Require array size

* Use CuPy to get the compute capability

* Struct scalars support __array_interface__

* Address review feedback

---------

Co-authored-by: Ashwin Srinath <[email protected]>
  • Loading branch information
shwina and shwina authored Feb 11, 2025
1 parent bc57f2b commit a03ce7b
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 88 deletions.
2 changes: 2 additions & 0 deletions docs/repo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ autodoc.mock_imports = [
"cuda.bindings",
"llvmlite",
"numpy",
"cupy",
]

enhanced_search_enabled = true
Expand All @@ -349,6 +350,7 @@ autodoc.mock_imports = [
"cuda.cccl",
"llvmlite",
"numpy",
"cupy",
]

enhanced_search_enabled = true
Expand Down
4 changes: 2 additions & 2 deletions python/cuda_parallel/cuda/parallel/experimental/_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import functools

from numba import cuda
import cupy as cp


def cache_with_key(key):
Expand All @@ -25,7 +25,7 @@ def deco(func):

@functools.wraps(func)
def inner(*args, **kwargs):
cc = cuda.get_current_device().compute_capability
cc = cp.cuda.Device().compute_capability
cache_key = (key(*args, **kwargs), cc)
if cache_key not in cache:
result = func(*args, **kwargs)
Expand Down
43 changes: 27 additions & 16 deletions python/cuda_parallel/cuda/parallel/experimental/_utils/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,49 @@
Utilities for extracting information from protocols such as `__cuda_array_interface__` and `__cuda_stream__`.
"""

from typing import Optional, Tuple
from typing import Optional

import numpy as np

from ..typing import DeviceArrayLike


def get_dtype(arr: DeviceArrayLike) -> np.dtype:
typestr = arr.__cuda_array_interface__["typestr"]

if typestr.startswith("|V"):
# it's a structured dtype, use the descr field:
return np.dtype(arr.__cuda_array_interface__["descr"])
else:
# a simple dtype, use the typestr field:
return np.dtype(typestr)

def get_data_pointer(arr: DeviceArrayLike) -> int:
try:
# TODO: this is a fast path for CuPy until
# we have a more general solution.
return arr.data.ptr # type: ignore
except AttributeError:
return arr.__cuda_array_interface__["data"][0]

def get_strides(arr: DeviceArrayLike) -> Optional[Tuple]:
return arr.__cuda_array_interface__["strides"]

def get_dtype(arr: DeviceArrayLike) -> np.dtype:
try:
# TODO: this is a fast path for CuPy until
# we have a more general solution.
return np.dtype(arr.dtype) # type: ignore
except Exception:
cai = arr.__cuda_array_interface__
typestr = cai["typestr"]

def get_shape(arr: DeviceArrayLike) -> Tuple:
return arr.__cuda_array_interface__["shape"]
if typestr.startswith("|V"):
# it's a structured dtype, use the descr field:
return np.dtype(cai["descr"])
else:
# a simple dtype, use the typestr field:
return np.dtype(typestr)


def is_contiguous(arr: DeviceArrayLike) -> bool:
shape, strides = get_shape(arr), get_strides(arr)
cai = arr.__cuda_array_interface__

strides = cai["strides"]

if strides is None:
return True

shape = cai["shape"]

if any(dim == 0 for dim in shape):
# array has no elements
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations # TODO: required for Python 3.7 docs env

import ctypes
from functools import cached_property
from typing import Callable

import numba
Expand All @@ -30,6 +31,7 @@ def __init__(self, h_init: np.ndarray | GpuStruct, op: Callable):
self.ltoir, _ = cuda.compile(op, sig=(value_type, value_type), output="ltoir")
self.name = op.__name__.encode("utf-8")

@cached_property
def handle(self) -> cccl.Op:
return cccl.Op(
cccl.OpKind.STATELESS,
Expand Down Expand Up @@ -59,24 +61,20 @@ def __init__(
# Referenced from __del__:
self.build_result = None

d_in_cccl = cccl.to_cccl_iter(d_in)
self._ctor_d_in_cccl_type_enum_name = cccl.type_enum_as_name(
d_in_cccl.value_type.type.value
)
self._ctor_d_out_dtype = protocols.get_dtype(d_out)
self._ctor_init_dtype = h_init.dtype
self.d_in_cccl = cccl.to_cccl_iter(d_in)
self.d_out_cccl = cccl.to_cccl_iter(d_out)
self.h_init_cccl = cccl.to_cccl_value(h_init)
cc_major, cc_minor = cuda.get_current_device().compute_capability
cub_path, thrust_path, libcudacxx_path, cuda_include_path = get_paths()
bindings = get_bindings()
self.op_wrapper = _Op(h_init, op)
d_out_cccl = cccl.to_cccl_iter(d_out)
self.build_result = cccl.DeviceReduceBuildResult()

self.bindings = get_bindings()
error = bindings.cccl_device_reduce_build(
ctypes.byref(self.build_result),
d_in_cccl,
d_out_cccl,
self.op_wrapper.handle(),
self.d_in_cccl,
self.d_out_cccl,
self.op_wrapper.handle,
cccl.to_cccl_value(h_init),
cc_major,
cc_minor,
Expand All @@ -97,43 +95,39 @@ def __call__(
h_init: np.ndarray | GpuStruct,
stream=None,
):
d_in_cccl = cccl.to_cccl_iter(d_in)
if d_in_cccl.type.value == cccl.IteratorKind.ITERATOR:
assert num_items is not None
if self.d_in_cccl.type.value == cccl.IteratorKind.POINTER:
self.d_in_cccl.state = protocols.get_data_pointer(d_in)
else:
assert d_in_cccl.type.value == cccl.IteratorKind.POINTER
if num_items is None:
num_items = d_in.size
else:
assert num_items == d_in.size
_dtype_validation(
self._ctor_d_in_cccl_type_enum_name,
cccl.type_enum_as_name(d_in_cccl.value_type.type.value),
)
_dtype_validation(self._ctor_d_out_dtype, protocols.get_dtype(d_out))
_dtype_validation(self._ctor_init_dtype, h_init.dtype)
self.d_in_cccl.state = d_in.state

if self.d_out_cccl.type.value == cccl.IteratorKind.POINTER:
self.d_out_cccl.state = protocols.get_data_pointer(d_out)
else:
self.d_out_cccl.state = d_out.state

self.h_init_cccl.state = h_init.__array_interface__["data"][0]

stream_handle = protocols.validate_and_get_stream(stream)
bindings = get_bindings()

if temp_storage is None:
temp_storage_bytes = ctypes.c_size_t()
d_temp_storage = None
else:
temp_storage_bytes = ctypes.c_size_t(temp_storage.nbytes)
# Note: this is slightly slower, but supports all ndarray-like objects as long as they support CAI
# TODO: switch to use gpumemoryview once it's ready
d_temp_storage = temp_storage.__cuda_array_interface__["data"][0]
d_out_cccl = cccl.to_cccl_iter(d_out)
error = bindings.cccl_device_reduce(
d_temp_storage = protocols.get_data_pointer(temp_storage)

error = self.bindings.cccl_device_reduce(
self.build_result,
d_temp_storage,
ctypes.byref(temp_storage_bytes),
d_in_cccl,
d_out_cccl,
self.d_in_cccl,
self.d_out_cccl,
ctypes.c_ulonglong(num_items),
self.op_wrapper.handle(),
cccl.to_cccl_value(h_init),
self.op_wrapper.handle,
self.h_init_cccl,
stream_handle,
)

if error != enums.CUDA_SUCCESS:
raise ValueError("Error reducing")

Expand Down
4 changes: 4 additions & 0 deletions python/cuda_parallel/cuda/parallel/experimental/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ def __post_init__(self):
[tuple(getattr(self, name) for name in anns)], dtype=self.dtype
)

def __array_interface__(self):
return self._data.__array_interface__

setattr(this, "__post_init__", __post_init__)
setattr(this, "__array_interface__", property(__array_interface__))

# Wrap `this` in a dataclass for convenience:
this = dataclass(this)
Expand Down
9 changes: 7 additions & 2 deletions python/cuda_parallel/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ classifiers = [
"License :: OSI Approved :: Apache Software License",
]
requires-python = ">=3.9"
dependencies = ["cuda-cccl", "numba>=0.60.0", "cuda-python==12.*"]
dependencies = [
"cuda-cccl",
"numba>=0.60.0",
"cuda-python==12.*",
"cupy-cuda12x",
]
dynamic = ["version"]
readme = { file = "README.md", content-type = "text/markdown" }

Expand Down Expand Up @@ -57,7 +62,7 @@ input = "cuda/parallel/_version.py"
python_version = "3.10"

[[tool.mypy.overrides]]
module = ["numba.*", "llvmlite"]
module = ["numba.*", "llvmlite", "cupy"]
ignore_missing_imports = true
follow_imports = "skip"

Expand Down
33 changes: 4 additions & 29 deletions python/cuda_parallel/tests/test_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ def op(a, b):
num_items = 2**num_items_pow2
h_input = random_int(num_items, dtype)
d_input = numba.cuda.to_device(h_input)
temp_storage_size = reduce_into(None, d_input, d_output, None, h_init)
temp_storage_size = reduce_into(None, d_input, d_output, d_input.size, h_init)
d_temp_storage = numba.cuda.device_array(temp_storage_size, dtype=np.uint8)
reduce_into(d_temp_storage, d_input, d_output, None, h_init)
reduce_into(d_temp_storage, d_input, d_output, d_input.size, h_init)
h_output = d_output.copy_to_host()
assert h_output[0] == sum(h_input) + init_value

Expand All @@ -63,40 +63,15 @@ def op(a, b):
for num_items in [42, 420000]:
h_input = np.random.random(num_items) + 1j * np.random.random(num_items)
d_input = numba.cuda.to_device(h_input)
temp_storage_bytes = reduce_into(None, d_input, d_output, None, h_init)
temp_storage_bytes = reduce_into(None, d_input, d_output, d_input.size, h_init)
d_temp_storage = numba.cuda.device_array(temp_storage_bytes, np.uint8)
reduce_into(d_temp_storage, d_input, d_output, None, h_init)
reduce_into(d_temp_storage, d_input, d_output, d_input.size, h_init)

result = d_output.copy_to_host()[0]
expected = np.sum(h_input, initial=h_init[0])
assert result == pytest.approx(expected)


def test_device_reduce_dtype_mismatch():
def min_op(a, b):
return a if a < b else b

dtypes = [np.int32, np.int64]
h_inits = [np.array([], dt) for dt in dtypes]
h_inputs = [np.array([], dt) for dt in dtypes]
d_outputs = [numba.cuda.device_array(1, dt) for dt in dtypes]
d_inputs = [numba.cuda.to_device(h_inp) for h_inp in h_inputs]

reduce_into = algorithms.reduce_into(d_inputs[0], d_outputs[0], min_op, h_inits[0])

for ix in range(3):
with pytest.raises(
TypeError, match=r"^dtype mismatch: __init__=int32, __call__=int64$"
):
reduce_into(
None,
d_inputs[int(ix == 0)],
d_outputs[int(ix == 1)],
None,
h_inits[int(ix == 2)],
)


def _test_device_sum_with_iterator(
l_varr, start_sum_with, i_input, dtype_inp, dtype_out, use_numpy_array
):
Expand Down
8 changes: 4 additions & 4 deletions python/cuda_parallel/tests/test_reduce_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ def min_op(a, b):
reduce_into = algorithms.reduce_into(d_output, d_output, min_op, h_init)

# Determine temporary device storage requirements
temp_storage_size = reduce_into(None, d_input, d_output, None, h_init)
temp_storage_size = reduce_into(None, d_input, d_output, len(d_input), h_init)

# Allocate temporary storage
d_temp_storage = cp.empty(temp_storage_size, dtype=np.uint8)

# Run reduction
reduce_into(d_temp_storage, d_input, d_output, None, h_init)
reduce_into(d_temp_storage, d_input, d_output, len(d_input), h_init)

# Check the result is correct
expected_output = 0
Expand Down Expand Up @@ -203,10 +203,10 @@ def max_g_value(x, y):
h_init = Pixel(0, 0, 0)

reduce_into = algorithms.reduce_into(d_rgb, d_out, max_g_value, h_init)
temp_storage_bytes = reduce_into(None, d_rgb, d_out, len(d_rgb), h_init)
temp_storage_bytes = reduce_into(None, d_rgb, d_out, d_rgb.size, h_init)

d_temp_storage = cp.empty(temp_storage_bytes, dtype=np.uint8)
_ = reduce_into(d_temp_storage, d_rgb, d_out, len(d_rgb), h_init)
_ = reduce_into(d_temp_storage, d_rgb, d_out, d_rgb.size, h_init)

h_rgb = d_rgb.get()
expected = h_rgb[h_rgb.view("int32")[:, 1].argmax()]
Expand Down

0 comments on commit a03ce7b

Please sign in to comment.