Skip to content

Commit

Permalink
Added Multi-input & Scalar Support for Transform UDFs (#17881)
Browse files Browse the repository at this point in the history
This merge request implements multi-input and scalar support for UDF transforms.

Authors:
  - Basit Ayantunde (https://github.com/lamarrr)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Matthew Murray (https://github.com/Matt711)
  - Nghia Truong (https://github.com/ttnghia)

URL: #17881
  • Loading branch information
lamarrr authored Feb 13, 2025
1 parent 83aafcd commit 7914858
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 75 deletions.
27 changes: 16 additions & 11 deletions cpp/include/cudf/transform.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,27 +32,32 @@ namespace CUDF_EXPORT cudf {
*/

/**
* @brief Creates a new column by applying a unary function against every
* element of an input column.
* @brief Creates a new column by applying a transform function against every
* element of the input columns.
*
* Computes:
* `out[i] = F(in[i])`
* `out[i] = F(inputs[i]...)`.
*
* The output null mask is the same is the input null mask so if input[i] is
* null then output[i] is also null
* Note that for every scalar in `inputs` (columns of size 1), `input[i] == input[0]`
*
* @param input An immutable view of the input column to transform
* @param unary_udf The PTX/CUDA string of the unary function to apply
* The output null mask is the same as the null mask of the input columns, so if input[i] is
* null then output[i] is also null. The size of the resulting column is the size of the largest
* column.
* All input columns must have equivalent null masks.
*
*
* @param inputs Immutable views of the input columns to transform
* @param transform_udf The PTX/CUDA string of the transform function to apply
* @param output_type The output type that is compatible with the output type in the UDF
* @param is_ptx true: the UDF is treated as PTX code; false: the UDF is treated as CUDA code
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return The column resulting from applying the unary function to
* @return The column resulting from applying the transform function to
* every element of the input
*/
std::unique_ptr<column> transform(
column_view const& input,
std::string const& unary_udf,
std::vector<column_view> const& inputs,
std::string const& transform_udf,
data_type output_type,
bool is_ptx,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
Expand Down
20 changes: 16 additions & 4 deletions cpp/src/transform/jit/kernel.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,8 +33,20 @@ namespace cudf {
namespace transformation {
namespace jit {

template <typename TypeOut, typename TypeIn>
CUDF_KERNEL void kernel(cudf::size_type size, TypeOut* out_data, TypeIn* in_data)
/// @brief This class supports striding into columns of data as either scalars or actual
/// columns at no runtime cost. Although it implies the kernel will be recompiled if scalar and
/// column inputs are interchanged.
template <typename T, int multiplier>
struct strided {
T data;

__device__ T const& get(int64_t index) const { return (&data)[index * multiplier]; }

__device__ T& get(int64_t index) { return (&data)[index * multiplier]; }
};

template <typename Out, typename... In>
CUDF_KERNEL void kernel(cudf::size_type size, Out* __restrict__ out, In const* __restrict__... ins)
{
// cannot use global_thread_id utility due to a JIT build issue by including
// the `cudf/detail/utilities/cuda.cuh` header
Expand All @@ -43,7 +55,7 @@ CUDF_KERNEL void kernel(cudf::size_type size, TypeOut* out_data, TypeIn* in_data
thread_index_type const stride = block_size * gridDim.x;

for (auto i = start; i < static_cast<thread_index_type>(size); i += stride) {
GENERIC_UNARY_OP(&out_data[i], in_data[i]);
GENERIC_TRANSFORM_OP(&out->get(i), ins->get(i)...);
}
}

Expand Down
167 changes: 132 additions & 35 deletions cpp/src/transform/transform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,75 +34,172 @@ namespace cudf {
namespace transformation {
namespace jit {
namespace {
void unary_operation(mutable_column_view output,
column_view input,
std::string const& udf,
data_type output_type,
bool is_ptx,
rmm::cuda_stream_view stream)

using device_data_t = void*;

std::vector<std::string> build_jit_typenames(mutable_column_view output,
std::vector<column_view> const& inputs)
{
static constexpr auto SCALAR_STRIDE = 0;
static constexpr auto COLUMN_STRIDE = 1;

auto const column_type_name = [](data_type data_type, bool is_scalar) {
return jitify2::reflection::Template("cudf::transformation::jit::strided")
.instantiate(type_to_name(data_type), is_scalar ? SCALAR_STRIDE : COLUMN_STRIDE);
};

std::vector<std::string> typenames;

typenames.push_back(column_type_name(output.type(), false));
std::transform(
inputs.begin(), inputs.end(), std::back_inserter(typenames), [&](auto const& input) {
bool const is_scalar = input.size() != output.size();
return column_type_name(input.type(), is_scalar);
});

return typenames;
}

std::map<uint32_t, std::string> build_ptx_params(mutable_column_view output,
std::vector<column_view> const& inputs)
{
std::map<uint32_t, std::string> params;
uint32_t index = 0;

auto const add_column = [&](bool is_output, data_type type) {
auto const param_type = type_to_name(type);
params.emplace(index++, is_output ? (param_type + "*") : param_type);
};

add_column(true, output.type());

for (auto& input : inputs) {
add_column(false, input.type());
}

return params;
}

std::vector<device_data_t> build_device_data(mutable_column_view output,
std::vector<column_view> const& inputs)
{
std::vector<device_data_t> data;

data.push_back(const_cast<device_data_t>(cudf::jit::get_data_ptr(output)));

std::transform(inputs.begin(), inputs.end(), std::back_inserter(data), [](auto const& input) {
return const_cast<device_data_t>(cudf::jit::get_data_ptr(input));
});

return data;
}

std::vector<void*> build_launch_args(cudf::size_type& size, std::vector<device_data_t>& device_data)
{
std::string const kernel_name =
jitify2::reflection::Template("cudf::transformation::jit::kernel") //
.instantiate(cudf::type_to_name(output.type()), // list of template arguments
cudf::type_to_name(input.type()));

std::string cuda_source = is_ptx ? cudf::jit::parse_single_function_ptx(
udf, //
"GENERIC_UNARY_OP",
{
{0, "void *"}, // output argument
{1, cudf::type_to_name(input.type())} // input argument
})
: cudf::jit::parse_single_function_cuda(udf, //
"GENERIC_UNARY_OP");
// JITIFY and NVRTC need non-const pointers even if they aren't written to
std::vector<void*> args;
args.push_back(&size);
std::transform(
device_data.begin(), device_data.end(), std::back_inserter(args), [](auto& data) -> void* {
return &data;
});

return args;
}

void transform_operation(size_type base_column_size,
mutable_column_view output,
std::vector<column_view> const& inputs,
std::string const& udf,
data_type output_type,
bool is_ptx,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
std::string const kernel_name = jitify2::reflection::Template("cudf::transformation::jit::kernel")
.instantiate(build_jit_typenames(output, inputs));

std::string const cuda_source =
is_ptx ? cudf::jit::parse_single_function_ptx(
udf, "GENERIC_TRANSFORM_OP", build_ptx_params(output, inputs))
: cudf::jit::parse_single_function_cuda(udf, "GENERIC_TRANSFORM_OP");

auto device_data = build_device_data(output, inputs);

auto args = build_launch_args(base_column_size, device_data);

cudf::jit::get_program_cache(*transform_jit_kernel_cu_jit)
.get_kernel(
kernel_name, {}, {{"transform/jit/operation-udf.hpp", cuda_source}}, {"-arch=sm_."}) //
->configure_1d_max_occupancy(0, 0, nullptr, stream.value()) //
->launch(output.size(), //
cudf::jit::get_data_ptr(output),
cudf::jit::get_data_ptr(input));
->launch(args.data());
}
} // namespace

} // namespace jit
} // namespace transformation

namespace detail {
std::unique_ptr<column> transform(column_view const& input,
std::string const& unary_udf,
std::unique_ptr<column> transform(std::vector<column_view> const& inputs,
std::string const& transform_udf,
data_type output_type,
bool is_ptx,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(is_fixed_width(input.type()), "Unexpected non-fixed-width type.");

std::unique_ptr<column> output = make_fixed_width_column(
output_type, input.size(), copy_bitmask(input, stream, mr), input.null_count(), stream, mr);

if (input.is_empty()) { return output; }
CUDF_EXPECTS(is_fixed_width(output_type), "Transforms only support fixed-width types");
CUDF_EXPECTS(
std::all_of(
inputs.begin(), inputs.end(), [](auto& input) { return is_fixed_width(input.type()); }),
"Transforms only support fixed-width types");

auto const base_column = std::max_element(
inputs.begin(), inputs.end(), [](auto& a, auto& b) { return a.size() < b.size(); });

CUDF_EXPECTS(std::all_of(inputs.begin(),
inputs.end(),
[&](auto const& input) {
return (input.size() == 1) || (input.size() == base_column->size());
}),
"All transform input columns must have the same size or be scalar (have size 1)");

CUDF_EXPECTS(std::all_of(inputs.begin(),
inputs.end(),
[&](auto const& input) {
return (input.size() == 1 && input.null_count() == 0) ||
(input.null_count() == base_column->null_count());
}),
"All transform input columns must have the same null-count");

auto output = make_fixed_width_column(output_type,
base_column->size(),
copy_bitmask(*base_column, stream, mr),
base_column->null_count(),
stream,
mr);

if (base_column->is_empty()) { return output; }

mutable_column_view const output_view = *output;

// transform
transformation::jit::unary_operation(output_view, input, unary_udf, output_type, is_ptx, stream);
transformation::jit::transform_operation(
base_column->size(), output_view, inputs, transform_udf, output_type, is_ptx, stream, mr);

return output;
}

} // namespace detail

std::unique_ptr<column> transform(column_view const& input,
std::string const& unary_udf,
std::unique_ptr<column> transform(std::vector<column_view> const& inputs,
std::string const& transform_udf,
data_type output_type,
bool is_ptx,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::transform(input, unary_udf, output_type, is_ptx, stream, mr);
return detail::transform(inputs, transform_udf, output_type, is_ptx, stream, mr);
}

} // namespace cudf
9 changes: 6 additions & 3 deletions cpp/tests/streams/transform_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,8 +32,11 @@ void test_udf(char const* udf, Data data_init, cudf::size_type size, bool is_ptx
auto data_iter = cudf::detail::make_counting_transform_iterator(0, data_init);
cudf::test::fixed_width_column_wrapper<dtype, typename decltype(data_iter)::value_type> in(
data_iter, data_iter + size, all_valid);
cudf::transform(
in, udf, cudf::data_type(cudf::type_to_id<dtype>()), is_ptx, cudf::test::get_default_stream());
cudf::transform({in},
udf,
cudf::data_type(cudf::type_to_id<dtype>()),
is_ptx,
cudf::test::get_default_stream());
}

TEST_F(TransformTest, Transform)
Expand Down
Loading

0 comments on commit 7914858

Please sign in to comment.