Skip to content

Commit

Permalink
GPU algorithm to conver per-page col to per-row col
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Feb 20, 2025
1 parent 3451cb8 commit 3a05451
Showing 1 changed file with 190 additions and 43 deletions.
233 changes: 190 additions & 43 deletions cpp/src/io/parquet/experimental/page_index_filter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include <cudf/ast/expressions.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/transform.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/io/types.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>
Expand All @@ -37,6 +39,7 @@
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/gather.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/tabulate.h>

Expand Down Expand Up @@ -83,54 +86,198 @@ struct page_stats_caster : public cudf::io::parquet::detail::stats_caster_base {
if constexpr (cudf::is_compound<T>() && !std::is_same_v<T, string_view>) {
CUDF_FAIL("Compound types do not have statistics");
} else {
host_column<T> min(total_rows, stream);
host_column<T> max(total_rows, stream);
size_type stats_idx = 0;

for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) {
for (auto const rg_idx : row_group_indices[src_idx]) {
auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx];
auto col = std::find_if(row_group.columns.begin(),
row_group.columns.end(),
[schema_idx](cudf::io::parquet::detail::ColumnChunk const& col) {
return col.schema_idx == schema_idx;
});
if (col != std::end(row_group.columns) and col->column_index.has_value()) {
auto const& colchunk = *col;
auto const& column_index = colchunk.column_index.value();
auto const& offset_index = colchunk.offset_index.value();

CUDF_EXPECTS(column_index.min_values.size() == column_index.max_values.size(),
"page min and max values should be of same size");
CUDF_EXPECTS(
column_index.min_values.size() == offset_index.page_locations.size(),
"mismatch between size of min/max page values and the size of page locations");
for (size_t page_idx = 0; page_idx < column_index.min_values.size(); ++page_idx) {
// To support deprecated min, max fields.
auto const& min_value = column_index.min_values[page_idx];
auto const& max_value = column_index.min_values[page_idx];
int64_t const first_row_idx = offset_index.page_locations[page_idx].first_row_index;
int64_t const last_row_idx =
(page_idx < column_index.min_values.size() - 1)
? offset_index.page_locations[page_idx + 1].first_row_index
: row_group.num_rows;
for (int64_t i = first_row_idx; i < last_row_idx; ++i) {
// translate binary data to Type then to <T>
min.set_index(stats_idx + i, min_value, colchunk.meta_data.type);
max.set_index(stats_idx + i, max_value, colchunk.meta_data.type);
// TODO: For non string columns, we can do this with GPU.
// We can for string columns as well but the algorithm needs modification
if constexpr (not std::is_same_v<T, string_view>) {
// Compute page offsets
auto const [page_counts, page_offsets] = [&]() {
std::vector<size_type> page_counts;
std::vector<size_type> page_offsets{0};
std::for_each(
thrust::counting_iterator<size_t>(0),
thrust::counting_iterator(row_group_indices.size()),
[&](auto src_idx) {
auto const& rg_indices = row_group_indices[src_idx];
std::for_each(rg_indices.cbegin(), rg_indices.cend(), [&](auto rg_idx) {
auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx];
auto col =
std::find_if(row_group.columns.begin(),
row_group.columns.end(),
[schema_idx](cudf::io::parquet::detail::ColumnChunk const& col) {
return col.schema_idx == schema_idx;
});
if (col != std::end(row_group.columns) and col->column_index.has_value()) {
CUDF_EXPECTS(col->offset_index.has_value(),
"Both column and offset index must be present");
CUDF_EXPECTS(col->column_index.value().min_values.size() ==
col->column_index.value().max_values.size(),
"page min and max values should be of same size");
CUDF_EXPECTS(col->column_index.value().min_values.size() ==
col->offset_index.value().page_locations.size(),
"mismatch between size of min/max page values and the size of page "
"locations");
page_counts.emplace_back(col->offset_index.value().page_locations.size());
page_offsets.emplace_back(page_offsets.back() + page_counts.back());
}
});
});
return std::pair{page_counts, page_offsets};
}();

auto const num_pages = page_offsets.back();
CUDF_EXPECTS(num_pages > 0, "No pages with PageIndex found for the column");

host_column<T> min(num_pages, stream);
host_column<T> max(num_pages, stream);

// For each source
std::for_each(
thrust::counting_iterator<size_t>(0),
thrust::counting_iterator(row_group_indices.size()),
[&, page_offsets = page_offsets](auto src_idx) {
// For all row groups in this source
auto const& rg_indices = row_group_indices[src_idx];
std::for_each(rg_indices.cbegin(), rg_indices.cend(), [&](auto rg_idx) {
auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx];
auto col =
std::find_if(row_group.columns.begin(),
row_group.columns.end(),
[schema_idx](cudf::io::parquet::detail::ColumnChunk const& col) {
return col.schema_idx == schema_idx;
});
if (col != std::end(row_group.columns) and col->column_index.has_value()) {
auto const& colchunk = *col;
auto const& column_index = colchunk.column_index.value();
auto const& offset_index = colchunk.offset_index.value();
auto const num_pages_in_colchunk = column_index.min_values.size();
auto const page_offset_in_colchunk =
page_offsets[src_idx * row_group_indices.size() + rg_idx];

// For all pages in this chunk
std::for_each(
thrust::counting_iterator<size_t>(0),
thrust::counting_iterator(num_pages_in_colchunk),
[&](auto page_idx) {
// To support deprecated min, max fields.
auto const& min_value = column_index.min_values[page_idx];
auto const& max_value = column_index.min_values[page_idx];
// translate binary data to Type then to <T>
min.set_index(
page_offset_in_colchunk + page_idx, min_value, colchunk.meta_data.type);
max.set_index(
page_offset_in_colchunk + page_idx, max_value, colchunk.meta_data.type);
});
}
});
});

auto mincol = min.to_device(dtype, stream, mr);
auto maxcol = max.to_device(dtype, stream, mr);

// auto null_mask = std::move(contents.null_mask);
auto counts = cudf::detail::make_device_uvector_async(page_counts, stream, mr);
auto offsets = cudf::detail::make_device_uvector_async(page_offsets, stream, mr);

// Generate index mapping
auto indices =
cudf::detail::make_zeroed_device_uvector_async<cudf::size_type>(num_pages, stream, mr);
thrust::scatter_if(rmm::exec_policy_nosync(stream),
thrust::counting_iterator<size_type>(0),
thrust::counting_iterator<size_type>(counts.size()),
offsets.begin(),
counts.begin(),
offsets.begin());

// Fill gaps with previous values
thrust::inclusive_scan(
indices.begin(), indices.end(), indices.begin(), thrust::maximum<int>());

// Gather values based on computed indices
// Column contents
auto const gather_data_and_nullmask = [&](mutable_column_view column,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) {
auto output_data = rmm::device_buffer(cudf::size_of(dtype) * num_pages, stream, mr);
auto data_ptr = reinterpret_cast<T*>(output_data.data());
thrust::gather(rmm::exec_policy_nosync(stream),
indices.begin(),
indices.end(),
column.template begin<T>(),
data_ptr);

// Gather values based on computed indices
auto bitmask = column.null_mask();
auto output_bitmask = rmm::device_buffer(sizeof(bitmask_type) * num_pages, stream, mr);
auto bitmask_ptr = reinterpret_cast<bitmask_type*>(output_data.data());
thrust::gather(
rmm::exec_policy_nosync(stream), indices.begin(), indices.end(), bitmask, bitmask_ptr);

return std::pair{std::move(output_data), std::move(output_bitmask)};
};

auto [min_data, min_bitmask] = gather_data_and_nullmask(mincol->mutable_view(), stream, mr);
auto min_nulls = cudf::detail::null_count(
reinterpret_cast<bitmask_type*>(min_bitmask.data()), 0, num_pages, stream);

auto [max_data, max_bitmask] = gather_data_and_nullmask(maxcol->mutable_view(), stream, mr);
auto max_nulls = cudf::detail::null_count(
reinterpret_cast<bitmask_type*>(max_bitmask.data()), 0, num_pages, stream);

return {std::make_unique<column>(
dtype, num_pages, std::move(min_data), std::move(min_bitmask), min_nulls),
std::make_unique<column>(
dtype, num_pages, std::move(max_data), std::move(max_bitmask), max_nulls)};
} else {
host_column<T> min(total_rows, stream);
host_column<T> max(total_rows, stream);
size_type stats_idx = 0;

for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) {
for (auto const rg_idx : row_group_indices[src_idx]) {
auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx];
auto col =
std::find_if(row_group.columns.begin(),
row_group.columns.end(),
[schema_idx](cudf::io::parquet::detail::ColumnChunk const& col) {
return col.schema_idx == schema_idx;
});
if (col != std::end(row_group.columns) and col->column_index.has_value()) {
auto const& colchunk = *col;
auto const& column_index = colchunk.column_index.value();
auto const& offset_index = colchunk.offset_index.value();

CUDF_EXPECTS(column_index.min_values.size() == column_index.max_values.size(),
"page min and max values should be of same size");
CUDF_EXPECTS(
column_index.min_values.size() == offset_index.page_locations.size(),
"mismatch between size of min/max page values and the size of page locations");
for (size_t page_idx = 0; page_idx < column_index.min_values.size(); ++page_idx) {
// To support deprecated min, max fields.
auto const& min_value = column_index.min_values[page_idx];
auto const& max_value = column_index.min_values[page_idx];
int64_t const first_row_idx = offset_index.page_locations[page_idx].first_row_index;
int64_t const last_row_idx =
(page_idx < column_index.min_values.size() - 1)
? offset_index.page_locations[page_idx + 1].first_row_index
: row_group.num_rows;
for (int64_t i = first_row_idx; i < last_row_idx; ++i) {
// translate binary data to Type then to <T>
min.set_index(stats_idx + i, min_value, colchunk.meta_data.type);
max.set_index(stats_idx + i, max_value, colchunk.meta_data.type);
}
}
} else {
for (int64_t i = 0; i < row_group.num_rows; ++i) {
// Marking it null, if column present in row group
min.set_index(stats_idx + i, std::nullopt, {});
max.set_index(stats_idx + i, std::nullopt, {});
}
}
} else {
for (int64_t i = 0; i < row_group.num_rows; ++i) {
// Marking it null, if column present in row group
min.set_index(stats_idx + i, std::nullopt, {});
max.set_index(stats_idx + i, std::nullopt, {});
}
stats_idx += row_group.num_rows;
}
stats_idx += row_group.num_rows;
}
return {min.to_device(dtype, stream, mr), max.to_device(dtype, stream, mr)};
}
return {min.to_device(dtype, stream, mr), max.to_device(dtype, stream, mr)};
}
}
};
Expand Down

0 comments on commit 3a05451

Please sign in to comment.