From 3a0545199ac5d75b59877d4601e3771eb2774dc0 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 20 Feb 2025 03:23:24 +0000 Subject: [PATCH] GPU algorithm to conver per-page col to per-row col --- .../parquet/experimental/page_index_filter.cu | 233 ++++++++++++++---- 1 file changed, 190 insertions(+), 43 deletions(-) diff --git a/cpp/src/io/parquet/experimental/page_index_filter.cu b/cpp/src/io/parquet/experimental/page_index_filter.cu index 6b10c9afff6..f336439c2a1 100644 --- a/cpp/src/io/parquet/experimental/page_index_filter.cu +++ b/cpp/src/io/parquet/experimental/page_index_filter.cu @@ -23,10 +23,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -37,6 +39,7 @@ #include #include +#include #include #include @@ -83,54 +86,198 @@ struct page_stats_caster : public cudf::io::parquet::detail::stats_caster_base { if constexpr (cudf::is_compound() && !std::is_same_v) { CUDF_FAIL("Compound types do not have statistics"); } else { - host_column min(total_rows, stream); - host_column max(total_rows, stream); - size_type stats_idx = 0; - - for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) { - for (auto const rg_idx : row_group_indices[src_idx]) { - auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; - auto col = std::find_if(row_group.columns.begin(), - row_group.columns.end(), - [schema_idx](cudf::io::parquet::detail::ColumnChunk const& col) { - return col.schema_idx == schema_idx; - }); - if (col != std::end(row_group.columns) and col->column_index.has_value()) { - auto const& colchunk = *col; - auto const& column_index = colchunk.column_index.value(); - auto const& offset_index = colchunk.offset_index.value(); - - CUDF_EXPECTS(column_index.min_values.size() == column_index.max_values.size(), - "page min and max values should be of same size"); - CUDF_EXPECTS( - column_index.min_values.size() == offset_index.page_locations.size(), - "mismatch between size of min/max page values and the size of page locations"); - for (size_t page_idx = 0; page_idx < column_index.min_values.size(); ++page_idx) { - // To support deprecated min, max fields. - auto const& min_value = column_index.min_values[page_idx]; - auto const& max_value = column_index.min_values[page_idx]; - int64_t const first_row_idx = offset_index.page_locations[page_idx].first_row_index; - int64_t const last_row_idx = - (page_idx < column_index.min_values.size() - 1) - ? offset_index.page_locations[page_idx + 1].first_row_index - : row_group.num_rows; - for (int64_t i = first_row_idx; i < last_row_idx; ++i) { - // translate binary data to Type then to - min.set_index(stats_idx + i, min_value, colchunk.meta_data.type); - max.set_index(stats_idx + i, max_value, colchunk.meta_data.type); + // TODO: For non string columns, we can do this with GPU. + // We can for string columns as well but the algorithm needs modification + if constexpr (not std::is_same_v) { + // Compute page offsets + auto const [page_counts, page_offsets] = [&]() { + std::vector page_counts; + std::vector page_offsets{0}; + std::for_each( + thrust::counting_iterator(0), + thrust::counting_iterator(row_group_indices.size()), + [&](auto src_idx) { + auto const& rg_indices = row_group_indices[src_idx]; + std::for_each(rg_indices.cbegin(), rg_indices.cend(), [&](auto rg_idx) { + auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; + auto col = + std::find_if(row_group.columns.begin(), + row_group.columns.end(), + [schema_idx](cudf::io::parquet::detail::ColumnChunk const& col) { + return col.schema_idx == schema_idx; + }); + if (col != std::end(row_group.columns) and col->column_index.has_value()) { + CUDF_EXPECTS(col->offset_index.has_value(), + "Both column and offset index must be present"); + CUDF_EXPECTS(col->column_index.value().min_values.size() == + col->column_index.value().max_values.size(), + "page min and max values should be of same size"); + CUDF_EXPECTS(col->column_index.value().min_values.size() == + col->offset_index.value().page_locations.size(), + "mismatch between size of min/max page values and the size of page " + "locations"); + page_counts.emplace_back(col->offset_index.value().page_locations.size()); + page_offsets.emplace_back(page_offsets.back() + page_counts.back()); + } + }); + }); + return std::pair{page_counts, page_offsets}; + }(); + + auto const num_pages = page_offsets.back(); + CUDF_EXPECTS(num_pages > 0, "No pages with PageIndex found for the column"); + + host_column min(num_pages, stream); + host_column max(num_pages, stream); + + // For each source + std::for_each( + thrust::counting_iterator(0), + thrust::counting_iterator(row_group_indices.size()), + [&, page_offsets = page_offsets](auto src_idx) { + // For all row groups in this source + auto const& rg_indices = row_group_indices[src_idx]; + std::for_each(rg_indices.cbegin(), rg_indices.cend(), [&](auto rg_idx) { + auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; + auto col = + std::find_if(row_group.columns.begin(), + row_group.columns.end(), + [schema_idx](cudf::io::parquet::detail::ColumnChunk const& col) { + return col.schema_idx == schema_idx; + }); + if (col != std::end(row_group.columns) and col->column_index.has_value()) { + auto const& colchunk = *col; + auto const& column_index = colchunk.column_index.value(); + auto const& offset_index = colchunk.offset_index.value(); + auto const num_pages_in_colchunk = column_index.min_values.size(); + auto const page_offset_in_colchunk = + page_offsets[src_idx * row_group_indices.size() + rg_idx]; + + // For all pages in this chunk + std::for_each( + thrust::counting_iterator(0), + thrust::counting_iterator(num_pages_in_colchunk), + [&](auto page_idx) { + // To support deprecated min, max fields. + auto const& min_value = column_index.min_values[page_idx]; + auto const& max_value = column_index.min_values[page_idx]; + // translate binary data to Type then to + min.set_index( + page_offset_in_colchunk + page_idx, min_value, colchunk.meta_data.type); + max.set_index( + page_offset_in_colchunk + page_idx, max_value, colchunk.meta_data.type); + }); + } + }); + }); + + auto mincol = min.to_device(dtype, stream, mr); + auto maxcol = max.to_device(dtype, stream, mr); + + // auto null_mask = std::move(contents.null_mask); + auto counts = cudf::detail::make_device_uvector_async(page_counts, stream, mr); + auto offsets = cudf::detail::make_device_uvector_async(page_offsets, stream, mr); + + // Generate index mapping + auto indices = + cudf::detail::make_zeroed_device_uvector_async(num_pages, stream, mr); + thrust::scatter_if(rmm::exec_policy_nosync(stream), + thrust::counting_iterator(0), + thrust::counting_iterator(counts.size()), + offsets.begin(), + counts.begin(), + offsets.begin()); + + // Fill gaps with previous values + thrust::inclusive_scan( + indices.begin(), indices.end(), indices.begin(), thrust::maximum()); + + // Gather values based on computed indices + // Column contents + auto const gather_data_and_nullmask = [&](mutable_column_view column, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + auto output_data = rmm::device_buffer(cudf::size_of(dtype) * num_pages, stream, mr); + auto data_ptr = reinterpret_cast(output_data.data()); + thrust::gather(rmm::exec_policy_nosync(stream), + indices.begin(), + indices.end(), + column.template begin(), + data_ptr); + + // Gather values based on computed indices + auto bitmask = column.null_mask(); + auto output_bitmask = rmm::device_buffer(sizeof(bitmask_type) * num_pages, stream, mr); + auto bitmask_ptr = reinterpret_cast(output_data.data()); + thrust::gather( + rmm::exec_policy_nosync(stream), indices.begin(), indices.end(), bitmask, bitmask_ptr); + + return std::pair{std::move(output_data), std::move(output_bitmask)}; + }; + + auto [min_data, min_bitmask] = gather_data_and_nullmask(mincol->mutable_view(), stream, mr); + auto min_nulls = cudf::detail::null_count( + reinterpret_cast(min_bitmask.data()), 0, num_pages, stream); + + auto [max_data, max_bitmask] = gather_data_and_nullmask(maxcol->mutable_view(), stream, mr); + auto max_nulls = cudf::detail::null_count( + reinterpret_cast(max_bitmask.data()), 0, num_pages, stream); + + return {std::make_unique( + dtype, num_pages, std::move(min_data), std::move(min_bitmask), min_nulls), + std::make_unique( + dtype, num_pages, std::move(max_data), std::move(max_bitmask), max_nulls)}; + } else { + host_column min(total_rows, stream); + host_column max(total_rows, stream); + size_type stats_idx = 0; + + for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) { + for (auto const rg_idx : row_group_indices[src_idx]) { + auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; + auto col = + std::find_if(row_group.columns.begin(), + row_group.columns.end(), + [schema_idx](cudf::io::parquet::detail::ColumnChunk const& col) { + return col.schema_idx == schema_idx; + }); + if (col != std::end(row_group.columns) and col->column_index.has_value()) { + auto const& colchunk = *col; + auto const& column_index = colchunk.column_index.value(); + auto const& offset_index = colchunk.offset_index.value(); + + CUDF_EXPECTS(column_index.min_values.size() == column_index.max_values.size(), + "page min and max values should be of same size"); + CUDF_EXPECTS( + column_index.min_values.size() == offset_index.page_locations.size(), + "mismatch between size of min/max page values and the size of page locations"); + for (size_t page_idx = 0; page_idx < column_index.min_values.size(); ++page_idx) { + // To support deprecated min, max fields. + auto const& min_value = column_index.min_values[page_idx]; + auto const& max_value = column_index.min_values[page_idx]; + int64_t const first_row_idx = offset_index.page_locations[page_idx].first_row_index; + int64_t const last_row_idx = + (page_idx < column_index.min_values.size() - 1) + ? offset_index.page_locations[page_idx + 1].first_row_index + : row_group.num_rows; + for (int64_t i = first_row_idx; i < last_row_idx; ++i) { + // translate binary data to Type then to + min.set_index(stats_idx + i, min_value, colchunk.meta_data.type); + max.set_index(stats_idx + i, max_value, colchunk.meta_data.type); + } + } + } else { + for (int64_t i = 0; i < row_group.num_rows; ++i) { + // Marking it null, if column present in row group + min.set_index(stats_idx + i, std::nullopt, {}); + max.set_index(stats_idx + i, std::nullopt, {}); } } - } else { - for (int64_t i = 0; i < row_group.num_rows; ++i) { - // Marking it null, if column present in row group - min.set_index(stats_idx + i, std::nullopt, {}); - max.set_index(stats_idx + i, std::nullopt, {}); - } + stats_idx += row_group.num_rows; } - stats_idx += row_group.num_rows; } + return {min.to_device(dtype, stream, mr), max.to_device(dtype, stream, mr)}; } - return {min.to_device(dtype, stream, mr), max.to_device(dtype, stream, mr)}; } } };