From 44b2e798bd6c280985d052634c0c1e495f57a609 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Sun, 24 Nov 2024 15:11:32 -0800 Subject: [PATCH 1/4] Remove the unused detail `int_fastdiv.h` header (#17426) This PR removes the unused detail `int_fastdiv.h` header. Authors: - Yunsong Wang (https://github.com/PointKernel) Approvers: - Nghia Truong (https://github.com/ttnghia) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/17426 --- .../cudf/detail/utilities/int_fastdiv.h | 175 ------------------ 1 file changed, 175 deletions(-) delete mode 100644 cpp/include/cudf/detail/utilities/int_fastdiv.h diff --git a/cpp/include/cudf/detail/utilities/int_fastdiv.h b/cpp/include/cudf/detail/utilities/int_fastdiv.h deleted file mode 100644 index ff442af5194..00000000000 --- a/cpp/include/cudf/detail/utilities/int_fastdiv.h +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. - * - * Copyright 2014 Maxim Milakov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -class int_fastdiv { - public: - // divisor != 0 - __host__ __device__ __forceinline__ int_fastdiv(int divisor = 0) : d(divisor) - { - update_magic_numbers(); - } - - __host__ __device__ __forceinline__ int_fastdiv& operator=(int divisor) - { - this->d = divisor; - update_magic_numbers(); - return *this; - } - - __host__ __device__ __forceinline__ operator int() const { return d; } - - private: - int d; - int M; - int s; - int n_add_sign; - - // Hacker's Delight, Second Edition, Chapter 10, Integer Division By Constants - __host__ __device__ __forceinline__ void update_magic_numbers() - { - if (d == 1) { - M = 0; - s = -1; - n_add_sign = 1; - return; - } else if (d == -1) { - M = 0; - s = -1; - n_add_sign = -1; - return; - } - - int p; - unsigned int ad, anc, delta, q1, r1, q2, r2, t; - unsigned const two31 = 0x8000'0000u; - ad = (d == 0) ? 1 : abs(d); - t = two31 + ((unsigned int)d >> 31); - anc = t - 1 - t % ad; - p = 31; - q1 = two31 / anc; - r1 = two31 - q1 * anc; - q2 = two31 / ad; - r2 = two31 - q2 * ad; - do { - ++p; - q1 = 2 * q1; - r1 = 2 * r1; - if (r1 >= anc) { - ++q1; - r1 -= anc; - } - q2 = 2 * q2; - r2 = 2 * r2; - if (r2 >= ad) { - ++q2; - r2 -= ad; - } - delta = ad - r2; - } while (q1 < delta || (q1 == delta && r1 == 0)); - this->M = q2 + 1; - if (d < 0) this->M = -this->M; - this->s = p - 32; - - if ((d > 0) && (M < 0)) - n_add_sign = 1; - else if ((d < 0) && (M > 0)) - n_add_sign = -1; - else - n_add_sign = 0; - } - - __host__ __device__ __forceinline__ friend int operator/(int const divident, - int_fastdiv const& divisor); -}; - -__host__ __device__ __forceinline__ int operator/(int const n, int_fastdiv const& divisor) -{ - int q; -#ifdef __CUDA_ARCH__ - asm("mul.hi.s32 %0, %1, %2;" : "=r"(q) : "r"(divisor.M), "r"(n)); -#else - q = (((unsigned long long)((long long)divisor.M * (long long)n)) >> 32); -#endif - q += n * divisor.n_add_sign; - if (divisor.s >= 0) { - q >>= divisor.s; // we rely on this to be implemented as arithmetic shift - q += (((unsigned int)q) >> 31); - } - return q; -} - -__host__ __device__ __forceinline__ int operator%(int const n, int_fastdiv const& divisor) -{ - int quotient = n / divisor; - int remainder = n - quotient * divisor; - return remainder; -} - -__host__ __device__ __forceinline__ int operator/(unsigned int const n, int_fastdiv const& divisor) -{ - return ((int)n) / divisor; -} - -__host__ __device__ __forceinline__ int operator%(unsigned int const n, int_fastdiv const& divisor) -{ - return ((int)n) % divisor; -} - -__host__ __device__ __forceinline__ int operator/(short const n, int_fastdiv const& divisor) -{ - return ((int)n) / divisor; -} - -__host__ __device__ __forceinline__ int operator%(short const n, int_fastdiv const& divisor) -{ - return ((int)n) % divisor; -} - -__host__ __device__ __forceinline__ int operator/(unsigned short const n, - int_fastdiv const& divisor) -{ - return ((int)n) / divisor; -} - -__host__ __device__ __forceinline__ int operator%(unsigned short const n, - int_fastdiv const& divisor) -{ - return ((int)n) % divisor; -} - -__host__ __device__ __forceinline__ int operator/(char const n, int_fastdiv const& divisor) -{ - return ((int)n) / divisor; -} - -__host__ __device__ __forceinline__ int operator%(char const n, int_fastdiv const& divisor) -{ - return ((int)n) % divisor; -} - -__host__ __device__ __forceinline__ int operator/(unsigned char const n, int_fastdiv const& divisor) -{ - return ((int)n) / divisor; -} - -__host__ __device__ __forceinline__ int operator%(unsigned char const n, int_fastdiv const& divisor) -{ - return ((int)n) % divisor; -} From 8d8cd7818b310845f4c2ad3ffc1521a267df3973 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 25 Nov 2024 05:57:47 -0500 Subject: [PATCH 2/4] Expose stream-ordering to groupby APIs (#17324) Adds stream parameter to ``` cudf::groupby::scan cudf::groupby::aggregate cudf::groupby::shift cudf::groupby::get_groups cudf::groupby::replace_nulls ``` Added stream gtests to verify correct stream forwarding. Reference: https://github.com/rapidsai/cudf/issues/13744 Authors: - Shruti Shivakumar (https://github.com/shrshi) Approvers: - Paul Mattione (https://github.com/pmattione-nvidia) - Nghia Truong (https://github.com/ttnghia) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17324 --- cpp/include/cudf/groupby.hpp | 20 +++++------ cpp/src/groupby/groupby.cu | 22 +++++------- cpp/tests/streams/groupby_test.cpp | 57 +++++++++++++++++++++++++++++- 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/cpp/include/cudf/groupby.hpp b/cpp/include/cudf/groupby.hpp index c9df02f167a..ca3c97880df 100644 --- a/cpp/include/cudf/groupby.hpp +++ b/cpp/include/cudf/groupby.hpp @@ -178,6 +178,7 @@ class groupby { * * @param requests The set of columns to aggregate and the aggregations to * perform + * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned table and columns' device memory * @return Pair containing the table with each group's unique key and * a vector of aggregation_results for each request in the same order as @@ -185,16 +186,7 @@ class groupby { */ std::pair, std::vector> aggregate( host_span requests, - rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); - - /** - * @copydoc aggregate(host_span, rmm::device_async_resource_ref) - * - * @param stream CUDA stream used for device memory operations and kernel launches. - */ - std::pair, std::vector> aggregate( - host_span requests, - rmm::cuda_stream_view stream, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** * @brief Performs grouped scans on the specified values. @@ -242,6 +234,7 @@ class groupby { * ``` * * @param requests The set of columns to scan and the scans to perform + * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned table and columns' device memory * @return Pair containing the table with each group's key and * a vector of aggregation_results for each request in the same order as @@ -249,6 +242,7 @@ class groupby { */ std::pair, std::vector> scan( host_span requests, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -295,6 +289,7 @@ class groupby { * @param values Table whose columns to be shifted * @param offsets The offsets by which to shift the input * @param fill_values Fill values for indeterminable outputs + * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned table and columns' device memory * @return Pair containing the tables with each group's key and the columns shifted * @@ -305,6 +300,7 @@ class groupby { table_view const& values, host_span offsets, std::vector> const& fill_values, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -329,11 +325,13 @@ class groupby { * and the `values` of the `groups` object will be `nullptr`. * * @param values Table representing values on which a groupby operation is to be performed + * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned tables's device memory in the * returned groups * @return A `groups` object representing grouped keys and values */ groups get_groups(cudf::table_view values = {}, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -367,6 +365,7 @@ class groupby { * @param[in] values A table whose column null values will be replaced * @param[in] replace_policies Specify the position of replacement values relative to null values, * one for each column + * @param[in] stream CUDA stream used for device memory operations and kernel launches. * @param[in] mr Device memory resource used to allocate device memory of the returned column * * @return Pair that contains a table with the sorted keys and the result column @@ -374,6 +373,7 @@ class groupby { std::pair, std::unique_ptr> replace_nulls( table_view const& values, host_span replace_policies, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); private: diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index 6eb82618e2a..c42038026e5 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -191,13 +191,6 @@ void verify_valid_requests(host_span requests) } // namespace -// Compute aggregation requests -std::pair, std::vector> groupby::aggregate( - host_span requests, rmm::device_async_resource_ref mr) -{ - return aggregate(requests, cudf::get_default_stream(), mr); -} - // Compute aggregation requests std::pair, std::vector> groupby::aggregate( host_span requests, @@ -220,7 +213,9 @@ std::pair, std::vector> groupby::aggr // Compute scan requests std::pair, std::vector> groupby::scan( - host_span requests, rmm::device_async_resource_ref mr) + host_span requests, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); CUDF_EXPECTS( @@ -233,13 +228,14 @@ std::pair, std::vector> groupby::scan if (_keys.num_rows() == 0) { return std::pair(empty_like(_keys), empty_results(requests)); } - return sort_scan(requests, cudf::get_default_stream(), mr); + return sort_scan(requests, stream, mr); } -groupby::groups groupby::get_groups(table_view values, rmm::device_async_resource_ref mr) +groupby::groups groupby::get_groups(table_view values, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - auto const stream = cudf::get_default_stream(); auto grouped_keys = helper().sorted_keys(stream, mr); auto const& group_offsets = helper().group_offsets(stream); @@ -262,6 +258,7 @@ groupby::groups groupby::get_groups(table_view values, rmm::device_async_resourc std::pair, std::unique_ptr
> groupby::replace_nulls( table_view const& values, host_span replace_policies, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -271,7 +268,6 @@ std::pair, std::unique_ptr
> groupby::replace_nulls "Size mismatch between num_columns and replace_policies."); if (values.is_empty()) { return std::pair(empty_like(_keys), empty_like(values)); } - auto const stream = cudf::get_default_stream(); auto const& group_labels = helper().group_labels(stream); std::vector> results; @@ -306,6 +302,7 @@ std::pair, std::unique_ptr
> groupby::shift( table_view const& values, host_span offsets, std::vector> const& fill_values, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -320,7 +317,6 @@ std::pair, std::unique_ptr
> groupby::shift( }), "values and fill_value should have the same type.", cudf::data_type_error); - auto stream = cudf::get_default_stream(); std::vector> results; auto const& group_offsets = helper().group_offsets(stream); std::transform( diff --git a/cpp/tests/streams/groupby_test.cpp b/cpp/tests/streams/groupby_test.cpp index 03cabbc4de0..73d6d31b282 100644 --- a/cpp/tests/streams/groupby_test.cpp +++ b/cpp/tests/streams/groupby_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,9 @@ #include #include +#include #include +#include using K = int32_t; // Key type. @@ -65,3 +67,56 @@ TYPED_TEST(groupby_stream_test, test_count) this->test_groupby(make_count_agg(), force_use_sort_impl::YES); this->test_groupby(make_count_agg(cudf::null_policy::INCLUDE)); } + +struct GroupbyTest : public cudf::test::BaseFixture {}; + +TEST_F(GroupbyTest, Scan) +{ + using key_wrapper = cudf::test::fixed_width_column_wrapper; + using value_wrapper = cudf::test::fixed_width_column_wrapper; + + key_wrapper keys{1, 2, 3, 1, 2, 2, 1, 3, 3, 2}; + value_wrapper vals({5, 6, 7, 8, 9, 0, 1, 2, 3, 4}); + + auto agg = cudf::make_min_aggregation(); + std::vector requests; + requests.emplace_back(); + requests[0].values = vals; + requests[0].aggregations.push_back(std::move(agg)); + + cudf::groupby::groupby gb_obj(cudf::table_view({keys})); + // cudf::groupby scan uses sort implementation + auto result = gb_obj.scan(requests, cudf::test::get_default_stream()); +} + +TEST_F(GroupbyTest, Shift) +{ + cudf::test::fixed_width_column_wrapper key{1, 2, 1, 2, 2, 1, 1}; + cudf::test::fixed_width_column_wrapper val{3, 4, 5, 6, 7, 8, 9}; + cudf::size_type offset = 2; + auto slr = cudf::make_default_constructed_scalar(cudf::column_view(val).type(), + cudf::test::get_default_stream()); + + cudf::groupby::groupby gb_obj(cudf::table_view({key})); + std::vector offsets{offset}; + auto got = + gb_obj.shift(cudf::table_view{{val}}, offsets, {*slr}, cudf::test::get_default_stream()); +} + +TEST_F(GroupbyTest, GetGroups) +{ + cudf::test::fixed_width_column_wrapper keys{1, 1, 2, 1, 2, 3}; + cudf::test::fixed_width_column_wrapper values({0, 0, 1, 1, 2, 2}); + cudf::groupby::groupby gb(cudf::table_view({keys})); + auto gb_groups = gb.get_groups(cudf::table_view({values}), cudf::test::get_default_stream()); +} + +TEST_F(GroupbyTest, ReplaceNullsTest) +{ + cudf::test::fixed_width_column_wrapper key{0, 1, 0, 1, 0, 1}; + cudf::test::fixed_width_column_wrapper val({42, 7, 24, 10, 1, 1000}, {1, 1, 1, 0, 0, 0}); + cudf::groupby::groupby gb_obj(cudf::table_view({key})); + std::vector policies{cudf::replace_policy::PRECEDING}; + auto p = + gb_obj.replace_nulls(cudf::table_view({val}), policies, cudf::test::get_default_stream()); +} From d93e9c267ac7a1a8792d9fc77d2ba8ab7be2683c Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 25 Nov 2024 05:58:16 -0500 Subject: [PATCH 3/4] Expose stream-ordering to strings attribute APIs (#17398) Adds stream parameter to ``` cudf::strings::count_characters cudf::strings::count_bytes cudf::strings::code_points ``` Added stream gtests to verify correct stream forwarding. Reference: https://github.com/rapidsai/cudf/issues/13744 Authors: - Shruti Shivakumar (https://github.com/shrshi) Approvers: - David Wendt (https://github.com/davidwendt) - Nghia Truong (https://github.com/ttnghia) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17398 --- cpp/include/cudf/strings/attributes.hpp | 6 ++ cpp/src/strings/attributes.cu | 9 ++- cpp/tests/CMakeLists.txt | 1 + cpp/tests/streams/strings/attributes_test.cpp | 59 +++++++++++++++++++ 4 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 cpp/tests/streams/strings/attributes_test.cpp diff --git a/cpp/include/cudf/strings/attributes.hpp b/cpp/include/cudf/strings/attributes.hpp index 5f2eda8fa5b..0de57b556ad 100644 --- a/cpp/include/cudf/strings/attributes.hpp +++ b/cpp/include/cudf/strings/attributes.hpp @@ -41,11 +41,13 @@ namespace strings { * Any null string will result in a null entry for that row in the output column. * * @param input Strings instance for this operation + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned column's device memory * @return New column with lengths for each string */ std::unique_ptr count_characters( strings_column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -59,11 +61,13 @@ std::unique_ptr count_characters( * Any null string will result in a null entry for that row in the output column. * * @param input Strings instance for this operation + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned column's device memory * @return New column with the number of bytes for each string */ std::unique_ptr count_bytes( strings_column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -79,11 +83,13 @@ std::unique_ptr count_bytes( * Any null string is ignored. No null entries will appear in the output column. * * @param input Strings instance for this operation + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned column's device memory * @return New INT32 column with code point integer values for each character */ std::unique_ptr code_points( strings_column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @} */ // end of strings_apis group diff --git a/cpp/src/strings/attributes.cu b/cpp/src/strings/attributes.cu index c56d25fde2b..1c14cab4b1f 100644 --- a/cpp/src/strings/attributes.cu +++ b/cpp/src/strings/attributes.cu @@ -264,24 +264,27 @@ std::unique_ptr code_points(strings_column_view const& input, // external APIS std::unique_ptr count_characters(strings_column_view const& input, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::count_characters(input, cudf::get_default_stream(), mr); + return detail::count_characters(input, stream, mr); } std::unique_ptr count_bytes(strings_column_view const& input, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::count_bytes(input, cudf::get_default_stream(), mr); + return detail::count_bytes(input, stream, mr); } std::unique_ptr code_points(strings_column_view const& input, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::code_points(input, cudf::get_default_stream(), mr); + return detail::code_points(input, stream, mr); } } // namespace strings diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 91c00d6af34..8928d27a871 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -724,6 +724,7 @@ ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_STREAM_COMPACTION_TEST streams/stream_compaction_test.cpp STREAM_MODE testing) ConfigureTest( STREAM_STRINGS_TEST + streams/strings/attributes_test.cpp streams/strings/case_test.cpp streams/strings/combine_test.cpp streams/strings/contains_test.cpp diff --git a/cpp/tests/streams/strings/attributes_test.cpp b/cpp/tests/streams/strings/attributes_test.cpp new file mode 100644 index 00000000000..e15681eb8a7 --- /dev/null +++ b/cpp/tests/streams/strings/attributes_test.cpp @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include + +struct StringsAttributesTest : public cudf::test::BaseFixture {}; + +TEST_F(StringsAttributesTest, CodePoints) +{ + std::vector h_strings{"eee", "bb", nullptr, "", "aa", "bbb", "ééé"}; + cudf::test::strings_column_wrapper strings( + h_strings.begin(), + h_strings.end(), + thrust::make_transform_iterator(h_strings.begin(), [](auto str) { return str != nullptr; })); + auto strings_view = cudf::strings_column_view(strings); + + auto results = cudf::strings::code_points(strings_view, cudf::test::get_default_stream()); +} + +TEST_F(StringsAttributesTest, CountCharacters) +{ + std::vector h_strings( + 40000, "something a bit longer than 32 bytes ééé ééé ééé ééé ééé ééé ééé"); + cudf::test::strings_column_wrapper strings(h_strings.begin(), h_strings.end()); + auto strings_view = cudf::strings_column_view(strings); + + auto results = cudf::strings::count_characters(strings_view, cudf::test::get_default_stream()); +} + +TEST_F(StringsAttributesTest, CountBytes) +{ + std::vector h_strings{ + "eee", "bb", nullptr, "", "aa", "ééé", "something a bit longer than 32 bytes"}; + cudf::test::strings_column_wrapper strings( + h_strings.begin(), + h_strings.end(), + thrust::make_transform_iterator(h_strings.begin(), [](auto str) { return str != nullptr; })); + auto strings_view = cudf::strings_column_view(strings); + + auto results = cudf::strings::count_bytes(strings_view, cudf::test::get_default_stream()); +} From f05e89db8f6750232a452d072fa9f9ea988a6b34 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Mon, 25 Nov 2024 13:03:54 -0600 Subject: [PATCH 4/4] Single-partition Dask executor for cuDF-Polars (#17262) The goal here is to lay down the initial foundation for dask-based evaluation of `IR` graphs in cudf-polars. The first pass will only support single-partition workloads. This functionality could be achieved with much less-complicated changes to cudf-polars. However, we **do** want to build multi-partition support on top of this. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Lawrence Mitchell (https://github.com/wence-) - James Lamb (https://github.com/jameslamb) URL: https://github.com/rapidsai/cudf/pull/17262 --- ci/run_cudf_polars_pytests.sh | 4 + python/cudf_polars/cudf_polars/callback.py | 18 +- python/cudf_polars/cudf_polars/dsl/ir.py | 25 +- .../cudf_polars/cudf_polars/dsl/translate.py | 3 +- .../cudf_polars/experimental/parallel.py | 236 ++++++++++++++++++ .../cudf_polars/testing/asserts.py | 11 +- python/cudf_polars/tests/conftest.py | 16 ++ .../tests/experimental/test_parallel.py | 21 ++ python/cudf_polars/tests/test_executors.py | 68 +++++ 9 files changed, 388 insertions(+), 14 deletions(-) create mode 100644 python/cudf_polars/cudf_polars/experimental/parallel.py create mode 100644 python/cudf_polars/tests/experimental/test_parallel.py create mode 100644 python/cudf_polars/tests/test_executors.py diff --git a/ci/run_cudf_polars_pytests.sh b/ci/run_cudf_polars_pytests.sh index c10612a065a..bf5a3ccee8e 100755 --- a/ci/run_cudf_polars_pytests.sh +++ b/ci/run_cudf_polars_pytests.sh @@ -8,4 +8,8 @@ set -euo pipefail # Support invoking run_cudf_polars_pytests.sh outside the script directory cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/ +# Test the default "cudf" executor python -m pytest --cache-clear "$@" tests + +# Test the "dask-experimental" executor +python -m pytest --cache-clear "$@" tests --executor dask-experimental diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 8dc5715195d..95527028aa9 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -9,7 +9,7 @@ import os import warnings from functools import cache, partial -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal import nvtx @@ -181,6 +181,7 @@ def _callback( *, device: int | None, memory_resource: int | None, + executor: Literal["pylibcudf", "dask-experimental"] | None, ) -> pl.DataFrame: assert with_columns is None assert pyarrow_predicate is None @@ -191,7 +192,14 @@ def _callback( set_device(device), set_memory_resource(memory_resource), ): - return ir.evaluate(cache={}).to_polars() + if executor is None or executor == "pylibcudf": + return ir.evaluate(cache={}).to_polars() + elif executor == "dask-experimental": + from cudf_polars.experimental.parallel import evaluate_dask + + return evaluate_dask(ir).to_polars() + else: + raise ValueError(f"Unknown executor '{executor}'") def validate_config_options(config: dict) -> None: @@ -208,7 +216,9 @@ def validate_config_options(config: dict) -> None: ValueError If the configuration contains unsupported options. """ - if unsupported := (config.keys() - {"raise_on_fail", "parquet_options"}): + if unsupported := ( + config.keys() - {"raise_on_fail", "parquet_options", "executor"} + ): raise ValueError( f"Engine configuration contains unsupported settings: {unsupported}" ) @@ -243,6 +253,7 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: device = config.device memory_resource = config.memory_resource raise_on_fail = config.config.get("raise_on_fail", False) + executor = config.config.get("executor", None) validate_config_options(config.config) with nvtx.annotate(message="ConvertIR", domain="cudf_polars"): @@ -272,5 +283,6 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: ir, device=device, memory_resource=memory_resource, + executor=executor, ) ) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 62a2da9dcea..6617b71be81 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1599,13 +1599,15 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR): # polars requires that all to-explode columns have the # same sub-shapes raise NotImplementedError("Explode with more than one column") + self.options = (tuple(to_explode),) elif self.name == "rename": - old, new, _ = self.options + old, new, strict = self.options # TODO: perhaps polars should validate renaming in the IR? if len(new) != len(set(new)) or ( set(new) & (set(df.schema.keys()) - set(old)) ): raise NotImplementedError("Duplicate new names in rename.") + self.options = (tuple(old), tuple(new), strict) elif self.name == "unpivot": indices, pivotees, variable_name, value_name = self.options value_name = "value" if value_name is None else value_name @@ -1623,13 +1625,15 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR): self.options = ( tuple(indices), tuple(pivotees), - (variable_name, schema[variable_name]), - (value_name, schema[value_name]), + variable_name, + value_name, ) - self._non_child_args = (name, self.options) + self._non_child_args = (schema, name, self.options) @classmethod - def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: + def do_evaluate( + cls, schema: Schema, name: str, options: Any, df: DataFrame + ) -> DataFrame: """Evaluate and return a dataframe.""" if name == "rechunk": # No-op in our data model @@ -1651,8 +1655,8 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: ( indices, pivotees, - (variable_name, variable_dtype), - (value_name, value_dtype), + variable_name, + value_name, ) = options npiv = len(pivotees) index_columns = [ @@ -1669,7 +1673,7 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: plc.interop.from_arrow( pa.array( pivotees, - type=plc.interop.to_arrow(variable_dtype), + type=plc.interop.to_arrow(schema[variable_name]), ), ) ] @@ -1677,7 +1681,10 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: df.num_rows, ).columns() value_column = plc.concatenate.concatenate( - [df.column_map[pivotee].astype(value_dtype).obj for pivotee in pivotees] + [ + df.column_map[pivotee].astype(schema[value_name]).obj + for pivotee in pivotees + ] ) return DataFrame( [ diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index 12fc2a196cd..9480ce6e535 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -633,9 +633,10 @@ def _(node: pl_expr.Sort, translator: Translator, dtype: plc.DataType) -> expr.E @_translate_expr.register def _(node: pl_expr.SortBy, translator: Translator, dtype: plc.DataType) -> expr.Expr: + options = node.sort_options return expr.SortBy( dtype, - node.sort_options, + (options[0], tuple(options[1]), tuple(options[2])), translator.translate_expr(n=node.expr), *(translator.translate_expr(n=n) for n in node.by), ) diff --git a/python/cudf_polars/cudf_polars/experimental/parallel.py b/python/cudf_polars/cudf_polars/experimental/parallel.py new file mode 100644 index 00000000000..6518dd60c7d --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/parallel.py @@ -0,0 +1,236 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Partitioned LogicalPlan nodes.""" + +from __future__ import annotations + +import operator +from functools import reduce, singledispatch +from typing import TYPE_CHECKING, Any + +from cudf_polars.dsl.ir import IR +from cudf_polars.dsl.traversal import traversal + +if TYPE_CHECKING: + from collections.abc import MutableMapping + from typing import TypeAlias + + from cudf_polars.containers import DataFrame + from cudf_polars.dsl.nodebase import Node + from cudf_polars.typing import GenericTransformer + + +class PartitionInfo: + """ + Partitioning information. + + This class only tracks the partition count (for now). + """ + + __slots__ = ("count",) + + def __init__(self, count: int): + self.count = count + + +LowerIRTransformer: TypeAlias = ( + "GenericTransformer[IR, MutableMapping[IR, PartitionInfo]]" +) +"""Protocol for Lowering IR nodes.""" + + +def get_key_name(node: Node) -> str: + """Generate the key name for a Node.""" + return f"{type(node).__name__.lower()}-{hash(node)}" + + +@singledispatch +def lower_ir_node( + ir: IR, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """ + Rewrite an IR node and extract partitioning information. + + Parameters + ---------- + ir + IR node to rewrite. + rec + Recursive LowerIRTransformer callable. + + Returns + ------- + new_ir, partition_info + The rewritten node, and a mapping from unique nodes in + the full IR graph to associated partitioning information. + + Notes + ----- + This function is used by `lower_ir_graph`. + + See Also + -------- + lower_ir_graph + """ + raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover + + +@lower_ir_node.register(IR) +def _(ir: IR, rec: LowerIRTransformer) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + if len(ir.children) == 0: + # Default leaf node has single partition + return ir, {ir: PartitionInfo(count=1)} + + # Lower children + children, _partition_info = zip(*(rec(c) for c in ir.children), strict=False) + partition_info = reduce(operator.or_, _partition_info) + + # Check that child partitioning is supported + count = max(partition_info[c].count for c in children) + if count > 1: + raise NotImplementedError( + f"Class {type(ir)} does not support multiple partitions." + ) # pragma: no cover + + # Return reconstructed node and partition-info dict + partition = PartitionInfo(count=1) + new_node = ir.reconstruct(children) + partition_info[new_node] = partition + return new_node, partition_info + + +def lower_ir_graph(ir: IR) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """ + Rewrite an IR graph and extract partitioning information. + + Parameters + ---------- + ir + Root of the graph to rewrite. + + Returns + ------- + new_ir, partition_info + The rewritten graph, and a mapping from unique nodes + in the new graph to associated partitioning information. + + Notes + ----- + This function traverses the unique nodes of the graph with + root `ir`, and applies :func:`lower_ir_node` to each node. + + See Also + -------- + lower_ir_node + """ + from cudf_polars.dsl.traversal import CachingVisitor + + mapper = CachingVisitor(lower_ir_node) + return mapper(ir) + + +@singledispatch +def generate_ir_tasks( + ir: IR, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + """ + Generate a task graph for evaluation of an IR node. + + Parameters + ---------- + ir + IR node to generate tasks for. + partition_info + Partitioning information, obtained from :func:`lower_ir_graph`. + + Returns + ------- + mapping + A (partial) dask task graph for the evaluation of an ir node. + + Notes + ----- + Task generation should only produce the tasks for the current node, + referring to child tasks by name. + + See Also + -------- + task_graph + """ + raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover + + +@generate_ir_tasks.register(IR) +def _( + ir: IR, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + # Single-partition default behavior. + # This is used by `generate_ir_tasks` for all unregistered IR sub-types. + if partition_info[ir].count > 1: + raise NotImplementedError( + f"Failed to generate multiple output tasks for {ir}." + ) # pragma: no cover + + child_names = [] + for child in ir.children: + child_names.append(get_key_name(child)) + if partition_info[child].count > 1: + raise NotImplementedError( + f"Failed to generate tasks for {ir} with child {child}." + ) # pragma: no cover + + key_name = get_key_name(ir) + return { + (key_name, 0): ( + ir.do_evaluate, + *ir._non_child_args, + *((child_name, 0) for child_name in child_names), + ) + } + + +def task_graph( + ir: IR, partition_info: MutableMapping[IR, PartitionInfo] +) -> tuple[MutableMapping[Any, Any], str | tuple[str, int]]: + """ + Construct a task graph for evaluation of an IR graph. + + Parameters + ---------- + ir + Root of the graph to rewrite. + partition_info + A mapping from all unique IR nodes to the + associated partitioning information. + + Returns + ------- + graph + A Dask-compatible task graph for the entire + IR graph with root `ir`. + + Notes + ----- + This function traverses the unique nodes of the + graph with root `ir`, and extracts the tasks for + each node with :func:`generate_ir_tasks`. + + See Also + -------- + generate_ir_tasks + """ + graph = reduce( + operator.or_, + (generate_ir_tasks(node, partition_info) for node in traversal(ir)), + ) + return graph, (get_key_name(ir), 0) + + +def evaluate_dask(ir: IR) -> DataFrame: + """Evaluate an IR graph with Dask.""" + from dask import get + + ir, partition_info = lower_ir_graph(ir) + + graph, key = task_graph(ir, partition_info) + return get(graph, key) diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index ba0bb12a0fb..d986f150b2e 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -20,6 +20,11 @@ __all__: list[str] = ["assert_gpu_result_equal", "assert_ir_translation_raises"] +# Will be overriden by `conftest.py` with the value from the `--executor` +# command-line argument +Executor = None + + def assert_gpu_result_equal( lazydf: pl.LazyFrame, *, @@ -34,6 +39,7 @@ def assert_gpu_result_equal( rtol: float = 1e-05, atol: float = 1e-08, categorical_as_str: bool = False, + executor: str | None = None, ) -> None: """ Assert that collection of a lazyframe on GPU produces correct results. @@ -71,6 +77,9 @@ def assert_gpu_result_equal( Absolute tolerance for float comparisons categorical_as_str Decat categoricals to strings before comparing + executor + The executor configuration to pass to `GPUEngine`. If not specified + uses the module level `Executor` attribute. Raises ------ @@ -80,7 +89,7 @@ def assert_gpu_result_equal( If GPU collection failed in some way. """ if engine is None: - engine = GPUEngine(raise_on_fail=True) + engine = GPUEngine(raise_on_fail=True, executor=executor or Executor) final_polars_collect_kwargs, final_cudf_collect_kwargs = _process_kwargs( collect_kwargs, polars_collect_kwargs, cudf_collect_kwargs diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index 9bbce6bc080..6338bf0cae1 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -8,3 +8,19 @@ @pytest.fixture(params=[False, True], ids=["no_nulls", "nulls"], scope="session") def with_nulls(request): return request.param + + +def pytest_addoption(parser): + parser.addoption( + "--executor", + action="store", + default="pylibcudf", + choices=("pylibcudf", "dask-experimental"), + help="Executor to use for GPUEngine.", + ) + + +def pytest_configure(config): + import cudf_polars.testing.asserts + + cudf_polars.testing.asserts.Executor = config.getoption("--executor") diff --git a/python/cudf_polars/tests/experimental/test_parallel.py b/python/cudf_polars/tests/experimental/test_parallel.py new file mode 100644 index 00000000000..d46ab88eebf --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_parallel.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import polars as pl +from polars import GPUEngine +from polars.testing import assert_frame_equal + + +def test_evaluate_dask(): + df = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": [5, 6, 7], "d": [7, 9, 8]}) + q = df.select(pl.col("a") - (pl.col("b") + pl.col("c") * 2), pl.col("d")).sort("d") + + expected = q.collect(engine="cpu") + got_gpu = q.collect(engine=GPUEngine(raise_on_fail=True)) + got_dask = q.collect( + engine=GPUEngine(raise_on_fail=True, executor="dask-experimental") + ) + assert_frame_equal(expected, got_gpu) + assert_frame_equal(expected, got_dask) diff --git a/python/cudf_polars/tests/test_executors.py b/python/cudf_polars/tests/test_executors.py new file mode 100644 index 00000000000..3eaea2ec9ea --- /dev/null +++ b/python/cudf_polars/tests/test_executors.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.mark.parametrize("executor", [None, "pylibcudf", "dask-experimental"]) +def test_executor_basics(executor): + if executor == "dask-experimental": + pytest.importorskip("dask") + + df = pl.LazyFrame( + { + "a": pl.Series([[1, 2], [3]], dtype=pl.List(pl.Int8())), + "b": pl.Series([[1], [2]], dtype=pl.List(pl.UInt16())), + "c": pl.Series( + [ + [["1", "2", "3"], ["4", "567"]], + [["8", "9"], []], + ], + dtype=pl.List(pl.List(pl.String())), + ), + "d": pl.Series([[[1, 2]], []], dtype=pl.List(pl.List(pl.UInt16()))), + } + ) + + assert_gpu_result_equal(df, executor=executor) + + +def test_cudf_cache_evaluate(): + ldf = pl.DataFrame( + { + "a": [1, 2, 3, 4, 5, 6, 7], + "b": [1, 1, 1, 1, 1, 1, 1], + } + ).lazy() + ldf2 = ldf.select((pl.col("a") + pl.col("b")).alias("c"), pl.col("a")) + query = pl.concat([ldf, ldf2], how="diagonal") + assert_gpu_result_equal(query, executor="pylibcudf") + + +def test_dask_experimental_map_function_get_hashable(): + df = pl.LazyFrame( + { + "a": pl.Series([11, 12, 13], dtype=pl.UInt16), + "b": pl.Series([1, 3, 5], dtype=pl.Int16), + "c": pl.Series([2, 4, 6], dtype=pl.Float32), + "d": ["a", "b", "c"], + } + ) + q = df.unpivot(index="d") + assert_gpu_result_equal(q, executor="dask-experimental") + + +def test_unknown_executor(): + df = pl.LazyFrame({}) + + with pytest.raises( + pl.exceptions.ComputeError, + match="ValueError: Unknown executor 'unknown-executor'", + ): + assert_gpu_result_equal(df, executor="unknown-executor")