From 04a6609bea467e90834363112db196895cec4356 Mon Sep 17 00:00:00 2001 From: Socrates Date: Tue, 26 Nov 2024 16:58:57 +0800 Subject: [PATCH] fix orc --- be/src/vec/exec/format/orc/vorc_reader.cpp | 207 +++++++++------------ be/src/vec/exec/format/orc/vorc_reader.h | 19 +- 2 files changed, 99 insertions(+), 127 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 70f3f6f003f6117..d8cca4af7ae8964 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -626,23 +626,38 @@ std::tuple OrcReader::_make_orc_lite // check if the slot of expr can be pushed down to orc reader bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) { - if (!expr->children()[0]->is_slot_ref()) { + if (!expr->get_child(0)->is_slot_ref()) { return false; } - const auto* slot_ref = static_cast(expr->children()[0].get()); + const auto* slot_ref = static_cast(expr->get_child(0).get()); // check if the slot exists in orc file and not partition column return _col_name_to_file_col_name.contains(slot_ref->expr_name()) && !_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name()); } +// check if the literal of expr can be pushed down to orc reader and make orc literal in _vliteral_to_orc_literal_and_type +bool OrcReader::_check_literal_can_push_down(const VExprSPtr& expr, uint16_t child_id) { + if (!expr->get_child(child_id)->is_literal()) { + return false; + } + // the slot has been checked in _check_slot_can_push_down before calling this function + const auto* slot_ref = static_cast(expr->get_child(0).get()); + const auto* literal = static_cast(expr->get_child(child_id).get()); + auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal); + if (valid) { + _vliteral_to_orc_literal_and_type[literal] = std::make_tuple(orc_literal, predicate_type); + } + return valid; +} + // check if there are rest children of expr can be pushed down to orc reader bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) { - if (expr->children().size() < 2) { + if (expr->get_num_children() < 2) { return false; } - for (size_t i = 1; i < expr->children().size(); ++i) { - if (!expr->children()[i]->is_literal()) { + for (size_t i = 1; i < expr->get_num_children(); ++i) { + if (!_check_literal_can_push_down(expr, i)) { return false; } } @@ -651,7 +666,10 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) { // check if the expr can be pushed down to orc reader bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) { - DCHECK(expr != nullptr); + if (expr == nullptr) { + return false; + } + switch (expr->op()) { case TExprOpcode::COMPOUND_AND: // at least one child can be pushed down @@ -664,8 +682,8 @@ bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) { return _check_expr_can_push_down(child); }); case TExprOpcode::COMPOUND_NOT: - DCHECK_EQ(expr->children().size(), 1); - return _check_expr_can_push_down(expr->children()[0]); + DCHECK_EQ(expr->get_num_children(), 1); + return _check_expr_can_push_down(expr->get_child(0)); case TExprOpcode::GE: case TExprOpcode::GT: @@ -693,199 +711,144 @@ bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) { } } -bool OrcReader::_build_less_than(const VExprSPtr& expr, +void OrcReader::_build_less_than(const VExprSPtr& expr, std::unique_ptr& builder) { - DCHECK(expr->children().size() == 2); - DCHECK(expr->children()[0]->is_slot_ref()); - DCHECK(expr->children()[1]->is_literal()); - const auto* slot_ref = static_cast(expr->children()[0].get()); - const auto* literal = static_cast(expr->children()[1].get()); - auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal); - if (!valid) { - return false; - } + DCHECK(expr->get_num_children() == 2); + DCHECK(expr->get_child(0)->is_slot_ref()); + DCHECK(expr->get_child(1)->is_literal()); + const auto* slot_ref = static_cast(expr->get_child(0).get()); + const auto* literal = static_cast(expr->get_child(1).get()); + DCHECK(_vliteral_to_orc_literal_and_type.contains(literal)); + auto [orc_literal, predicate_type] = _vliteral_to_orc_literal_and_type[literal]; builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal); - return true; } -bool OrcReader::_build_less_than_equals(const VExprSPtr& expr, +void OrcReader::_build_less_than_equals(const VExprSPtr& expr, std::unique_ptr& builder) { - DCHECK(expr->children().size() == 2); - DCHECK(expr->children()[0]->is_slot_ref()); - DCHECK(expr->children()[1]->is_literal()); - const auto* slot_ref = static_cast(expr->children()[0].get()); - const auto* literal = static_cast(expr->children()[1].get()); - auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal); - if (!valid) { - return false; - } + DCHECK(expr->get_num_children() == 2); + DCHECK(expr->get_child(0)->is_slot_ref()); + DCHECK(expr->get_child(1)->is_literal()); + const auto* slot_ref = static_cast(expr->get_child(0).get()); + const auto* literal = static_cast(expr->get_child(1).get()); + DCHECK(_vliteral_to_orc_literal_and_type.contains(literal)); + auto [orc_literal, predicate_type] = _vliteral_to_orc_literal_and_type[literal]; builder->lessThanEquals(slot_ref->expr_name(), predicate_type, orc_literal); - return true; } -bool OrcReader::_build_equals(const VExprSPtr& expr, +void OrcReader::_build_equals(const VExprSPtr& expr, std::unique_ptr& builder) { - DCHECK(expr->children().size() == 2); - DCHECK(expr->children()[0]->is_slot_ref()); - DCHECK(expr->children()[1]->is_literal()); - const auto* slot_ref = static_cast(expr->children()[0].get()); - const auto* literal = static_cast(expr->children()[1].get()); - auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal); - if (!valid) { - return false; - } + DCHECK(expr->get_num_children() == 2); + DCHECK(expr->get_child(0)->is_slot_ref()); + DCHECK(expr->get_child(1)->is_literal()); + const auto* slot_ref = static_cast(expr->get_child(0).get()); + const auto* literal = static_cast(expr->get_child(1).get()); + DCHECK(_vliteral_to_orc_literal_and_type.contains(literal)); + auto [orc_literal, predicate_type] = _vliteral_to_orc_literal_and_type[literal]; builder->equals(slot_ref->expr_name(), predicate_type, orc_literal); - return true; } -bool OrcReader::_build_filter_in(const VExprSPtr& expr, +void OrcReader::_build_filter_in(const VExprSPtr& expr, std::unique_ptr& builder) { - DCHECK(expr->children().size() >= 2); - DCHECK(expr->children()[0]->is_slot_ref()); - const auto* slot_ref = static_cast(expr->children()[0].get()); + DCHECK(expr->get_num_children() >= 2); + DCHECK(expr->get_child(0)->is_slot_ref()); + const auto* slot_ref = static_cast(expr->get_child(0).get()); std::vector literals; orc::PredicateDataType predicate_type = orc::PredicateDataType::LONG; - for (size_t i = 1; i < expr->children().size(); ++i) { + for (size_t i = 1; i < expr->get_num_children(); ++i) { DCHECK(expr->children()[i]->is_literal()); const auto* literal = static_cast(expr->children()[i].get()); - auto [valid, orc_literal, type] = _make_orc_literal(slot_ref, literal); - if (!valid) { - return false; - } + DCHECK(_vliteral_to_orc_literal_and_type.contains(literal)); + auto [orc_literal, type] = _vliteral_to_orc_literal_and_type[literal]; literals.emplace_back(orc_literal); predicate_type = type; } DCHECK(!literals.empty()); builder->in(slot_ref->expr_name(), predicate_type, literals); - return true; } -bool OrcReader::_build_is_null(const VExprSPtr& expr, +void OrcReader::_build_is_null(const VExprSPtr& expr, std::unique_ptr& builder) { - DCHECK(expr->children().size() == 1); - DCHECK(expr->children()[0]->is_slot_ref()); - const auto* slot_ref = static_cast(expr->children()[0].get()); + DCHECK(expr->get_num_children() == 1); + DCHECK(expr->get_child(0)->is_slot_ref()); + const auto* slot_ref = static_cast(expr->get_child(0).get()); auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr); builder->isNull(slot_ref->expr_name(), predicate_type); - return true; } -bool OrcReader::_build_search_argument(const VExprSPtr& expr, +void OrcReader::_build_search_argument(const VExprSPtr& expr, std::unique_ptr& builder) { - if (expr == nullptr) { - return false; - } - - // if expr can not be pushed down, skip it and continue to next expr - if (!_check_expr_can_push_down(expr)) { - return false; - } - switch (expr->op()) { - case TExprOpcode::COMPOUND_AND: { - bool at_least_one_can_push_down = false; + case TExprOpcode::COMPOUND_AND: builder->startAnd(); for (const auto& child : expr->children()) { - if (_build_search_argument(child, builder)) { - at_least_one_can_push_down = true; - } - } - if (!at_least_one_can_push_down) { - // if all exprs can not be pushed down, builder->end() will throw exception - return false; + _build_search_argument(child, builder); } builder->end(); break; - } case TExprOpcode::COMPOUND_OR: builder->startOr(); for (const auto& child : expr->children()) { - if (!_build_search_argument(child, builder)) { - return false; - } + _build_search_argument(child, builder); } builder->end(); break; case TExprOpcode::COMPOUND_NOT: builder->startNot(); - DCHECK_EQ(expr->children().size(), 1); - if (!_build_search_argument(expr->children()[0], builder)) { - return false; - } + DCHECK_EQ(expr->get_num_children(), 1); + _build_search_argument(expr->get_child(0), builder); builder->end(); break; case TExprOpcode::GE: builder->startNot(); - if (!_build_less_than(expr, builder)) { - return false; - } + _build_less_than(expr, builder); builder->end(); break; case TExprOpcode::GT: builder->startNot(); - if (!_build_less_than_equals(expr, builder)) { - return false; - } + _build_less_than_equals(expr, builder); builder->end(); break; case TExprOpcode::LE: - if (!_build_less_than_equals(expr, builder)) { - return false; - } + _build_less_than_equals(expr, builder); break; case TExprOpcode::LT: - if (!_build_less_than(expr, builder)) { - return false; - } + _build_less_than(expr, builder); break; case TExprOpcode::EQ: - if (!_build_equals(expr, builder)) { - return false; - } + _build_equals(expr, builder); break; case TExprOpcode::NE: builder->startNot(); - if (!_build_equals(expr, builder)) { - return false; - } + _build_equals(expr, builder); builder->end(); break; case TExprOpcode::FILTER_IN: - if (!_build_filter_in(expr, builder)) { - return false; - } + _build_filter_in(expr, builder); break; case TExprOpcode::FILTER_NOT_IN: builder->startNot(); - if (!_build_filter_in(expr, builder)) { - return false; - } + _build_filter_in(expr, builder); builder->end(); break; // is null and is not null is represented as function call - case TExprOpcode::INVALID_OPCODE: { + case TExprOpcode::INVALID_OPCODE: DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL); if (expr->fn().name.function_name == "is_null_pred") { - if (!_build_is_null(expr, builder)) { - return false; - } + _build_is_null(expr, builder); } else if (expr->fn().name.function_name == "is_not_null_pred") { builder->startNot(); - if (!_build_is_null(expr, builder)) { - return false; - } + _build_is_null(expr, builder); builder->end(); } else { + // should not reach here, because _check_expr_can_push_down has already checked __builtin_unreachable(); } break; - } - default: { + + default: // should not reach here, because _check_expr_can_push_down has already checked __builtin_unreachable(); } - } - return true; } bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) { @@ -898,7 +861,8 @@ bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) { bool at_least_one_can_push_down = false; builder->startAnd(); for (const auto& expr_ctx : conjuncts) { - if (_build_search_argument(expr_ctx->root(), builder)) { + if (_check_expr_can_push_down(expr_ctx->root())) { + _build_search_argument(expr_ctx->root(), builder); at_least_one_can_push_down = true; } } @@ -944,7 +908,7 @@ Status OrcReader::set_fill_columns( } } else if (VInPredicate* in_predicate = typeid_cast(filter_impl)) { if (in_predicate->get_num_children() > 0) { - visit_slot(in_predicate->children()[0].get()); + visit_slot(in_predicate->get_child(0).get()); } } else { for (auto& child : filter_impl->children()) { @@ -1179,7 +1143,8 @@ Status OrcReader::_fill_partition_columns( if (num_deserialized != rows) { return Status::InternalError( "Failed to fill partition column: {}={} ." - "Number of rows expected to be written : {}, number of rows actually written : " + "Number of rows expected to be written : {}, number of rows actually " + "written : " "{}", slot_desc->col_name(), value, num_deserialized, rows); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 8c73957e79e4e0e..d30a2d9f7445d51 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -41,6 +41,7 @@ #include "orc/Reader.hh" #include "orc/Type.hh" #include "orc/Vector.hh" +#include "orc/sargs/Literal.hh" #include "runtime/types.h" #include "util/runtime_profile.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -288,23 +289,27 @@ class OrcReader : public GenericReader { bool* is_hive1_orc); static bool _check_acid_schema(const orc::Type& type); static const orc::Type& _remove_acid(const orc::Type& type); + + // functions for building search argument until _init_search_argument std::tuple _make_orc_literal( const VSlotRef* slot_ref, const VLiteral* literal); bool _check_slot_can_push_down(const VExprSPtr& expr); + bool _check_literal_can_push_down(const VExprSPtr& expr, uint16_t child_id); bool _check_rest_children_can_push_down(const VExprSPtr& expr); bool _check_expr_can_push_down(const VExprSPtr& expr); - bool _build_less_than(const VExprSPtr& expr, + void _build_less_than(const VExprSPtr& expr, std::unique_ptr& builder); - bool _build_less_than_equals(const VExprSPtr& expr, + void _build_less_than_equals(const VExprSPtr& expr, std::unique_ptr& builder); - bool _build_equals(const VExprSPtr& expr, std::unique_ptr& builder); - bool _build_filter_in(const VExprSPtr& expr, + void _build_equals(const VExprSPtr& expr, std::unique_ptr& builder); + void _build_filter_in(const VExprSPtr& expr, std::unique_ptr& builder); - bool _build_is_null(const VExprSPtr& expr, + void _build_is_null(const VExprSPtr& expr, std::unique_ptr& builder); - bool _build_search_argument(const VExprSPtr& expr, + void _build_search_argument(const VExprSPtr& expr, std::unique_ptr& builder); bool _init_search_argument(const VExprContextSPtrs& conjuncts); + void _init_bloom_filter( std::unordered_map* colname_to_value_range); void _init_system_properties(); @@ -644,6 +649,8 @@ class OrcReader : public GenericReader { std::unordered_map _table_col_to_file_col; //support iceberg position delete . std::vector* _position_delete_ordered_rowids = nullptr; + std::unordered_map> + _vliteral_to_orc_literal_and_type; }; class ORCFileInputStream : public orc::InputStream, public ProfileCollector {