Skip to content

Commit

Permalink
[Enhancement] Support to pushdown compound predicates in orc (#50613)
Browse files Browse the repository at this point in the history
Signed-off-by: Smith Cruise <[email protected]>
  • Loading branch information
Smith-Cruise authored Sep 4, 2024
1 parent 1def993 commit 8bf333e
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 23 deletions.
2 changes: 1 addition & 1 deletion be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
scanner_params.partition_index_in_chunk = _partition_index_in_chunk;
scanner_params._partition_index_in_hdfs_partition_columns = _partition_index_in_hdfs_partition_columns;
scanner_params.partition_values = _partition_values;
scanner_params.conjunct_ctxs = _scanner_conjunct_ctxs;
scanner_params.scanner_conjunct_ctxs = _scanner_conjunct_ctxs;
scanner_params.conjunct_ctxs_by_slot = _conjunct_ctxs_by_slot;
scanner_params.slots_in_conjunct = _slots_in_conjunct;
scanner_params.slots_of_mutli_slot_conjunct = _slots_of_mutli_slot_conjunct;
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CountedSeekableInputStream final : public io::SeekableInputStreamWrapper {
bool HdfsScannerParams::is_lazy_materialization_slot(SlotId slot_id) const {
// if there is no conjuncts, then there is no lazy materialization slot.
// we have to read up all fields.
if (conjunct_ctxs_by_slot.size() == 0 && conjunct_ctxs.size() == 0) {
if (conjunct_ctxs_by_slot.size() == 0 && scanner_conjunct_ctxs.size() == 0) {
return false;
}
if (conjunct_ctxs_by_slot.find(slot_id) != conjunct_ctxs_by_slot.end()) {
Expand Down Expand Up @@ -171,9 +171,9 @@ Status HdfsScanner::get_next(RuntimeState* runtime_state, ChunkPtr* chunk) {
RETURN_IF_ERROR(_runtime_state->check_mem_limit("get chunk from scanner"));
Status status = do_get_next(runtime_state, chunk);
if (status.ok()) {
if (!_scanner_params.conjunct_ctxs.empty()) {
if (!_scanner_params.scanner_conjunct_ctxs.empty()) {
SCOPED_RAW_TIMER(&_app_stats.expr_filter_ns);
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_scanner_params.conjunct_ctxs, (*chunk).get()));
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_scanner_params.scanner_conjunct_ctxs, (*chunk).get()));
}
RETURN_IF_ERROR(_mor_processor->get_next(runtime_state, chunk));
} else if (status.is_end_of_file()) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ struct HdfsScannerParams {
// runtime bloom filter.
const RuntimeFilterProbeCollector* runtime_filter_collector = nullptr;

// all conjuncts except `conjunct_ctxs_by_slot`
std::vector<ExprContext*> conjunct_ctxs;
// all conjuncts except `conjunct_ctxs_by_slot`, like compound predicates
std::vector<ExprContext*> scanner_conjunct_ctxs;
std::unordered_set<SlotId> slots_in_conjunct;
// slot used by conjunct_ctxs
std::unordered_set<SlotId> slots_of_mutli_slot_conjunct;
Expand Down
9 changes: 6 additions & 3 deletions be/src/exec/hdfs_scanner_orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,8 @@ bool OrcRowReaderFilter::filterOnPickStringDictionary(
Status HdfsOrcScanner::build_iceberg_delete_builder() {
if (_scanner_params.deletes.empty()) return Status::OK();
SCOPED_RAW_TIMER(&_app_stats.iceberg_delete_file_build_ns);
const IcebergDeleteBuilder iceberg_delete_builder(_scanner_params.fs, _scanner_params.path,
_scanner_params.conjunct_ctxs, _scanner_params.materialize_slots,
&_need_skip_rowids, _scanner_params.datacache_options);
const IcebergDeleteBuilder iceberg_delete_builder(_scanner_params.fs, _scanner_params.path, &_need_skip_rowids,
_scanner_params.datacache_options);

for (const auto& tdelete_file : _scanner_params.deletes) {
RETURN_IF_ERROR(iceberg_delete_builder.build_orc(_runtime_state->timezone(), *tdelete_file,
Expand Down Expand Up @@ -536,6 +535,10 @@ Status HdfsOrcScanner::do_open(RuntimeState* runtime_state) {
conjuncts.push_back(it2->root());
}
}
// add scanner's conjunct also, because SearchArgumentBuilder can support it
for (const auto& it : _scanner_params.scanner_conjunct_ctxs) {
conjuncts.push_back(it->root());
}
}
const OrcPredicates orc_predicates{&conjuncts, _scanner_ctx.runtime_filter_collector};
RETURN_IF_ERROR(_orc_reader->init(std::move(reader), &orc_predicates));
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/hdfs_scanner_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ Status HdfsParquetScanner::do_init(RuntimeState* runtime_state, const HdfsScanne
if (!scanner_params.deletes.empty()) {
SCOPED_RAW_TIMER(&_app_stats.iceberg_delete_file_build_ns);
std::unique_ptr<IcebergDeleteBuilder> iceberg_delete_builder(new IcebergDeleteBuilder(
scanner_params.fs, scanner_params.path, scanner_params.conjunct_ctxs, scanner_params.materialize_slots,
&_need_skip_rowids, scanner_params.datacache_options));
scanner_params.fs, scanner_params.path, &_need_skip_rowids, scanner_params.datacache_options));
for (const auto& tdelete_file : scanner_params.deletes) {
RETURN_IF_ERROR(iceberg_delete_builder->build_parquet(
runtime_state->timezone(), *tdelete_file, scanner_params.mor_params.equality_slots,
Expand Down
7 changes: 1 addition & 6 deletions be/src/exec/iceberg/iceberg_delete_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,10 @@ class ParquetPositionDeleteBuilder final : public PositionDeleteBuilder {

class IcebergDeleteBuilder {
public:
IcebergDeleteBuilder(FileSystem* fs, std::string datafile_path, std::vector<ExprContext*> conjunct_ctxs,
std::vector<SlotDescriptor*> materialize_slots, std::set<int64_t>* need_skip_rowids,
IcebergDeleteBuilder(FileSystem* fs, std::string datafile_path, std::set<int64_t>* need_skip_rowids,
const DataCacheOptions& datacache_options = DataCacheOptions())
: _fs(fs),
_datafile_path(std::move(datafile_path)),
_conjunct_ctxs(std::move(conjunct_ctxs)),
_materialize_slots(std::move(materialize_slots)),
_need_skip_rowids(need_skip_rowids),
_datacache_options(datacache_options) {}
~IcebergDeleteBuilder() = default;
Expand Down Expand Up @@ -168,8 +165,6 @@ class IcebergDeleteBuilder {
private:
FileSystem* _fs;
std::string _datafile_path;
std::vector<ExprContext*> _conjunct_ctxs;
std::vector<SlotDescriptor*> _materialize_slots;
std::set<int64_t>* _need_skip_rowids;
const DataCacheOptions _datacache_options;
};
Expand Down
105 changes: 99 additions & 6 deletions be/test/exec/hdfs_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,99 @@ TEST_F(HdfsScannerTest, TestOrcBooleanConjunct) {
scanner->close();
}

TEST_F(HdfsScannerTest, TestOrcCompoundConjunct) {
static const std::string input_orc_file = "./be/test/exec/test_data/orc_scanner/scalar_types.orc";

SlotDesc c0{"col_tinyint", TypeDescriptor::from_logical_type(LogicalType::TYPE_TINYINT)};
SlotDesc c1{"col_smallint", TypeDescriptor::from_logical_type(LogicalType::TYPE_SMALLINT)};

SlotDesc slot_descs[] = {c0, c1, {""}};

auto scanner = std::make_shared<HdfsOrcScanner>();

auto* range = _create_scan_range(input_orc_file, 0, 0);
auto* tuple_desc = _create_tuple_desc(slot_descs);
auto* param = _create_param(input_orc_file, range, tuple_desc);

{
std::vector<TExprNode> nodes;
TExprNode compound_node;
compound_node.__set_opcode(TExprOpcode::COMPOUND_OR);
compound_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
compound_node.__set_type(create_primitive_type_desc(TPrimitiveType::BOOLEAN));
compound_node.__set_num_children(2);
nodes.emplace_back(compound_node);

{
TExprNode left_eq;
left_eq.__set_opcode(TExprOpcode::EQ);
left_eq.__set_node_type(TExprNodeType::BINARY_PRED);
left_eq.__set_type(create_primitive_type_desc(TPrimitiveType::BOOLEAN));
left_eq.__set_child_type(TPrimitiveType::TINYINT);
left_eq.__set_num_children(2);

TExprNode left_leaf_node;
left_leaf_node.__set_node_type(TExprNodeType::SLOT_REF);
left_leaf_node.__set_num_children(0);
left_leaf_node.__set_type(create_primitive_type_desc(TPrimitiveType::TINYINT));
TSlotRef t_slot_ref = TSlotRef();
t_slot_ref.slot_id = 0;
t_slot_ref.tuple_id = 0;
left_leaf_node.__set_slot_ref(t_slot_ref);

TExprNode left_literal = create_int_literal_node(TPrimitiveType::TINYINT, 1);

nodes.emplace_back(left_eq);
nodes.emplace_back(left_leaf_node);
nodes.emplace_back(left_literal);
}

{
TExprNode right_eq;
right_eq.__set_opcode(TExprOpcode::EQ);
right_eq.__set_node_type(TExprNodeType::BINARY_PRED);
right_eq.__set_type(create_primitive_type_desc(TPrimitiveType::BOOLEAN));
right_eq.__set_child_type(TPrimitiveType::SMALLINT);
right_eq.__set_num_children(2);

TExprNode right_leaf_node;
right_leaf_node.__set_node_type(TExprNodeType::SLOT_REF);
right_leaf_node.__set_num_children(0);
right_leaf_node.__set_type(create_primitive_type_desc(TPrimitiveType::TINYINT));
TSlotRef t_slot_ref = TSlotRef();
t_slot_ref.slot_id = 1;
t_slot_ref.tuple_id = 0;
right_leaf_node.__set_slot_ref(t_slot_ref);

TExprNode right_literal = create_int_literal_node(TPrimitiveType::SMALLINT, 1);

nodes.emplace_back(right_eq);
nodes.emplace_back(right_leaf_node);
nodes.emplace_back(right_literal);
}

ExprContext* ctx = create_expr_context(&_pool, nodes);
std::cout << ctx->root()->debug_string() << std::endl;
param->scanner_conjunct_ctxs.push_back(ctx);
}

ASSERT_OK(Expr::prepare(param->scanner_conjunct_ctxs, _runtime_state));
ASSERT_OK(Expr::open(param->scanner_conjunct_ctxs, _runtime_state));

Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());

status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
EXPECT_EQ("leaf-0 = (column(id=2) = 1), leaf-1 = (column(id=3) = 1), expr = (or leaf-0 leaf-1)",
scanner->_orc_reader->get_search_argument_string());

ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 0);
status = scanner->get_next(_runtime_state, &chunk);
EXPECT_TRUE(status.is_end_of_file());
scanner->close();
}

// =============================================================================

/*
Expand Down Expand Up @@ -2369,7 +2462,7 @@ TEST_F(HdfsScannerTest, TestParquetUppercaseFiledPredicate) {
lit_node);
ExprContext* ctx = create_expr_context(&_pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
param->conjunct_ctxs.push_back(ctx);
param->scanner_conjunct_ctxs.push_back(ctx);
}
{
std::vector<TExprNode> nodes;
Expand All @@ -2378,7 +2471,7 @@ TEST_F(HdfsScannerTest, TestParquetUppercaseFiledPredicate) {
lit_node);
ExprContext* ctx = create_expr_context(&_pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
param->conjunct_ctxs.push_back(ctx);
param->scanner_conjunct_ctxs.push_back(ctx);
}

ASSERT_OK(Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state));
Expand Down Expand Up @@ -2535,7 +2628,7 @@ TEST_F(HdfsScannerTest, TestParquetDictTwoPage) {
lit_node);
ExprContext* ctx = create_expr_context(&_pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
param->conjunct_ctxs.push_back(ctx);
param->scanner_conjunct_ctxs.push_back(ctx);
}
{
std::vector<TExprNode> nodes;
Expand All @@ -2544,7 +2637,7 @@ TEST_F(HdfsScannerTest, TestParquetDictTwoPage) {
lit_node);
ExprContext* ctx = create_expr_context(&_pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
param->conjunct_ctxs.push_back(ctx);
param->scanner_conjunct_ctxs.push_back(ctx);
}

ASSERT_OK(Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state));
Expand Down Expand Up @@ -2580,7 +2673,7 @@ TEST_F(HdfsScannerTest, TestMinMaxFilterWhenContainsComplexTypes) {
lit_node);
ExprContext* ctx = create_expr_context(&_pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
param->conjunct_ctxs.push_back(ctx);
param->scanner_conjunct_ctxs.push_back(ctx);
}
{
std::vector<TExprNode> nodes;
Expand All @@ -2589,7 +2682,7 @@ TEST_F(HdfsScannerTest, TestMinMaxFilterWhenContainsComplexTypes) {
lit_node);
ExprContext* ctx = create_expr_context(&_pool, nodes);
param->min_max_conjunct_ctxs.push_back(ctx);
param->conjunct_ctxs.push_back(ctx);
param->scanner_conjunct_ctxs.push_back(ctx);
}

ASSERT_OK(Expr::prepare(param->min_max_conjunct_ctxs, _runtime_state));
Expand Down

0 comments on commit 8bf333e

Please sign in to comment.