Skip to content

Commit

Permalink
Refactor predicate pushdown to reuse row group pruning in experimenta…
Browse files Browse the repository at this point in the history
…l PQ reader (#17946)

Related to #17896

This PR refactors Parquet reader's predicate pushdown to separate out row group pruning with stats, reading bloom filters, and row group pruning with bloom filters. This allows reusing corresponding functionalities in the experimental PQ reader for highly selective queries (Hybrid scan) as needed.

Note that no code has been added or removed in this PR. Only moved around.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Karthikeyan (https://github.com/karthikeyann)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #17946
  • Loading branch information
mhaseeb123 authored Feb 13, 2025
1 parent 9ead47b commit 6c281fd
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 206 deletions.
252 changes: 100 additions & 152 deletions cpp/src/io/parquet/bloom_filter_reader.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/aligned_resource_adaptor.hpp>

#include <cuco/bloom_filter_policies.cuh>
#include <cuco/bloom_filter_ref.cuh>
Expand Down Expand Up @@ -163,108 +162,6 @@ struct bloom_filter_caster {
}
};

/**
* @brief Collects lists of equality predicate literals in the AST expression, one list per input
* table column. This is used in row group filtering based on bloom filters.
*/
class equality_literals_collector : public ast::detail::expression_transformer {
public:
equality_literals_collector() = default;

equality_literals_collector(ast::expression const& expr, cudf::size_type num_input_columns)
: _num_input_columns{num_input_columns}
{
_equality_literals.resize(_num_input_columns);
expr.accept(*this);
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::literal const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::literal const& expr) override
{
return expr;
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::column_reference const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::column_reference const& expr) override
{
CUDF_EXPECTS(expr.get_table_source() == ast::table_reference::LEFT,
"BloomfilterAST supports only left table");
CUDF_EXPECTS(expr.get_column_index() < _num_input_columns,
"Column index cannot be more than number of columns in the table");
return expr;
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::column_name_reference const& )
*/
std::reference_wrapper<ast::expression const> visit(
ast::column_name_reference const& expr) override
{
CUDF_FAIL("Column name reference is not supported in BloomfilterAST");
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::operation const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::operation const& expr) override
{
using cudf::ast::ast_operator;
auto const operands = expr.get_operands();
auto const op = expr.get_operator();

if (auto* v = dynamic_cast<ast::column_reference const*>(&operands[0].get())) {
// First operand should be column reference, second should be literal.
CUDF_EXPECTS(cudf::ast::detail::ast_operator_arity(op) == 2,
"Only binary operations are supported on column reference");
auto const literal_ptr = dynamic_cast<ast::literal const*>(&operands[1].get());
CUDF_EXPECTS(literal_ptr != nullptr,
"Second operand of binary operation with column reference must be a literal");
v->accept(*this);

// Push to the corresponding column's literals list iff equality predicate is seen
if (op == ast_operator::EQUAL) {
auto const col_idx = v->get_column_index();
_equality_literals[col_idx].emplace_back(const_cast<ast::literal*>(literal_ptr));
}
} else {
// Just visit the operands and ignore any output
std::ignore = visit_operands(operands);
}

return expr;
}

/**
* @brief Vectors of equality literals in the AST expression, one per input table column
*
* @return Vectors of equality literals, one per input table column
*/
[[nodiscard]] std::vector<std::vector<ast::literal*>> get_equality_literals() &&
{
return std::move(_equality_literals);
}

private:
std::vector<std::vector<ast::literal*>> _equality_literals;

protected:
std::vector<std::reference_wrapper<ast::expression const>> visit_operands(
cudf::host_span<std::reference_wrapper<ast::expression const> const> operands)
{
std::vector<std::reference_wrapper<ast::expression const>> transformed_operands;
for (auto const& operand : operands) {
auto const new_operand = operand.get().accept(*this);
transformed_operands.push_back(new_operand);
}
return transformed_operands;
}
size_type _num_input_columns;
};

/**
* @brief Converts AST expression to bloom filter membership (BloomfilterAST) expression.
* This is used in row group filtering based on equality predicate.
Expand Down Expand Up @@ -502,6 +399,17 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources

} // namespace

size_t aggregate_reader_metadata::get_bloom_filter_alignment() const
{
// Required alignment:
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>;
return alignof(cuco::bloom_filter_ref<cuda::std::byte,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>::filter_block_type);
}

std::vector<rmm::device_buffer> aggregate_reader_metadata::read_bloom_filters(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
Expand Down Expand Up @@ -599,55 +507,19 @@ std::vector<Type> aggregate_reader_metadata::get_parquet_types(
return parquet_types;
}

std::pair<std::optional<std::vector<std::vector<size_type>>>, bool>
aggregate_reader_metadata::apply_bloom_filters(
host_span<std::unique_ptr<datasource> const> sources,
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::apply_bloom_filters(
std::vector<rmm::device_buffer>& bloom_filter_data,
host_span<std::vector<size_type> const> input_row_group_indices,
host_span<std::vector<ast::literal*> const> literals,
size_type total_row_groups,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
host_span<int const> equality_col_schemas,
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const
{
// Number of input table columns
auto const num_input_columns = static_cast<cudf::size_type>(output_dtypes.size());

// Collect equality literals for each input table column
auto const equality_literals =
equality_literals_collector{filter.get(), num_input_columns}.get_equality_literals();

// Collect schema indices of columns with equality predicate(s)
std::vector<cudf::size_type> equality_col_schemas;
thrust::copy_if(thrust::host,
output_column_schemas.begin(),
output_column_schemas.end(),
equality_literals.begin(),
std::back_inserter(equality_col_schemas),
[](auto& eq_literals) { return not eq_literals.empty(); });

// Return early if no column with equality predicate(s)
if (equality_col_schemas.empty()) { return {std::nullopt, false}; }

// Required alignment:
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>;
auto constexpr alignment = alignof(cuco::bloom_filter_ref<cuda::std::byte,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>::filter_block_type);

// Aligned resource adaptor to allocate bloom filter buffers with
auto aligned_mr =
rmm::mr::aligned_resource_adaptor(cudf::get_current_device_resource(), alignment);

// Read a vector of bloom filter bitset device buffers for all columns with equality
// predicate(s) across all row groups
auto bloom_filter_data = read_bloom_filters(
sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream, aligned_mr);

// No bloom filter buffers, return early
if (bloom_filter_data.empty()) { return {std::nullopt, false}; }

// Get parquet types for the predicate columns
auto const parquet_types = get_parquet_types(input_row_group_indices, equality_col_schemas);

Expand Down Expand Up @@ -684,13 +556,13 @@ aggregate_reader_metadata::apply_bloom_filters(
auto const& dtype = output_dtypes[input_col_idx];

// Skip if no equality literals for this column
if (equality_literals[input_col_idx].empty()) { return; }
if (literals[input_col_idx].empty()) { return; }

// Skip if non-comparable (compound) type except string
if (cudf::is_compound(dtype) and dtype.id() != cudf::type_id::STRING) { return; }

// Add a column for all literals associated with an equality column
for (auto const& literal : equality_literals[input_col_idx]) {
for (auto const& literal : literals[input_col_idx]) {
bloom_filter_membership_columns.emplace_back(cudf::type_dispatcher<dispatch_storage_type>(
dtype, bloom_filter_col, equality_col_idx, dtype, literal, stream));
}
Expand All @@ -702,16 +574,92 @@ aggregate_reader_metadata::apply_bloom_filters(

// Convert AST to BloomfilterAST expression with reference to bloom filter membership
// in above `bloom_filter_membership_table`
bloom_filter_expression_converter bloom_filter_expr{
filter.get(), num_input_columns, {equality_literals}};
bloom_filter_expression_converter bloom_filter_expr{filter.get(), num_input_columns, {literals}};

// Filter bloom filter membership table with the BloomfilterAST expression and collect
// filtered row group indices
return {collect_filtered_row_group_indices(bloom_filter_membership_table,
bloom_filter_expr.get_bloom_filter_expr(),
input_row_group_indices,
stream),
true};
return collect_filtered_row_group_indices(bloom_filter_membership_table,
bloom_filter_expr.get_bloom_filter_expr(),
input_row_group_indices,
stream);
}

equality_literals_collector::equality_literals_collector() = default;

equality_literals_collector::equality_literals_collector(ast::expression const& expr,
cudf::size_type num_input_columns)
: _num_input_columns{num_input_columns}
{
_literals.resize(_num_input_columns);
expr.accept(*this);
}

std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
ast::literal const& expr)
{
return expr;
}

std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
ast::column_reference const& expr)
{
CUDF_EXPECTS(expr.get_table_source() == ast::table_reference::LEFT,
"BloomfilterAST supports only left table");
CUDF_EXPECTS(expr.get_column_index() < _num_input_columns,
"Column index cannot be more than number of columns in the table");
return expr;
}

std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
ast::column_name_reference const& expr)
{
CUDF_FAIL("Column name reference is not supported in BloomfilterAST");
}

std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
ast::operation const& expr)
{
using cudf::ast::ast_operator;
auto const operands = expr.get_operands();
auto const op = expr.get_operator();

if (auto* v = dynamic_cast<ast::column_reference const*>(&operands[0].get())) {
// First operand should be column reference, second should be literal.
CUDF_EXPECTS(cudf::ast::detail::ast_operator_arity(op) == 2,
"Only binary operations are supported on column reference");
auto const literal_ptr = dynamic_cast<ast::literal const*>(&operands[1].get());
CUDF_EXPECTS(literal_ptr != nullptr,
"Second operand of binary operation with column reference must be a literal");
v->accept(*this);

// Push to the corresponding column's literals list iff equality predicate is seen
if (op == ast_operator::EQUAL) {
auto const col_idx = v->get_column_index();
_literals[col_idx].emplace_back(const_cast<ast::literal*>(literal_ptr));
}
} else {
// Just visit the operands and ignore any output
std::ignore = visit_operands(operands);
}

return expr;
}

std::vector<std::vector<ast::literal*>> equality_literals_collector::get_literals() &&
{
return std::move(_literals);
}

std::vector<std::reference_wrapper<ast::expression const>>
equality_literals_collector::visit_operands(
cudf::host_span<std::reference_wrapper<ast::expression const> const> operands)
{
std::vector<std::reference_wrapper<ast::expression const>> transformed_operands;
for (auto const& operand : operands) {
auto const new_operand = operand.get().accept(*this);
transformed_operands.push_back(new_operand);
}
return transformed_operands;
}

} // namespace cudf::io::parquet::detail
Loading

0 comments on commit 6c281fd

Please sign in to comment.