Skip to content

Commit

Permalink
fix orc
Browse files Browse the repository at this point in the history
  • Loading branch information
suxiaogang223 committed Nov 26, 2024
1 parent 35a4152 commit 04a6609
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 127 deletions.
207 changes: 86 additions & 121 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,23 +626,38 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_lite

// check if the slot of expr can be pushed down to orc reader
bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) {
if (!expr->children()[0]->is_slot_ref()) {
if (!expr->get_child(0)->is_slot_ref()) {
return false;
}
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->get_child(0).get());
// check if the slot exists in orc file and not partition column
return _col_name_to_file_col_name.contains(slot_ref->expr_name()) &&
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
}

// check if the literal of expr can be pushed down to orc reader and make orc literal in _vliteral_to_orc_literal_and_type
bool OrcReader::_check_literal_can_push_down(const VExprSPtr& expr, uint16_t child_id) {
if (!expr->get_child(child_id)->is_literal()) {
return false;
}
// the slot has been checked in _check_slot_can_push_down before calling this function
const auto* slot_ref = static_cast<const VSlotRef*>(expr->get_child(0).get());
const auto* literal = static_cast<const VLiteral*>(expr->get_child(child_id).get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (valid) {
_vliteral_to_orc_literal_and_type[literal] = std::make_tuple(orc_literal, predicate_type);
}
return valid;
}

// check if there are rest children of expr can be pushed down to orc reader
bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {
if (expr->children().size() < 2) {
if (expr->get_num_children() < 2) {
return false;
}

for (size_t i = 1; i < expr->children().size(); ++i) {
if (!expr->children()[i]->is_literal()) {
for (size_t i = 1; i < expr->get_num_children(); ++i) {
if (!_check_literal_can_push_down(expr, i)) {
return false;
}
}
Expand All @@ -651,7 +666,10 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {

// check if the expr can be pushed down to orc reader
bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
DCHECK(expr != nullptr);
if (expr == nullptr) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND:
// at least one child can be pushed down
Expand All @@ -664,8 +682,8 @@ bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
return _check_expr_can_push_down(child);
});
case TExprOpcode::COMPOUND_NOT:
DCHECK_EQ(expr->children().size(), 1);
return _check_expr_can_push_down(expr->children()[0]);
DCHECK_EQ(expr->get_num_children(), 1);
return _check_expr_can_push_down(expr->get_child(0));

case TExprOpcode::GE:
case TExprOpcode::GT:
Expand Down Expand Up @@ -693,199 +711,144 @@ bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
}
}

bool OrcReader::_build_less_than(const VExprSPtr& expr,
void OrcReader::_build_less_than(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(expr->get_num_children() == 2);
DCHECK(expr->get_child(0)->is_slot_ref());
DCHECK(expr->get_child(1)->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->get_child(0).get());
const auto* literal = static_cast<const VLiteral*>(expr->get_child(1).get());
DCHECK(_vliteral_to_orc_literal_and_type.contains(literal));
auto [orc_literal, predicate_type] = _vliteral_to_orc_literal_and_type[literal];
builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_less_than_equals(const VExprSPtr& expr,
void OrcReader::_build_less_than_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(expr->get_num_children() == 2);
DCHECK(expr->get_child(0)->is_slot_ref());
DCHECK(expr->get_child(1)->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->get_child(0).get());
const auto* literal = static_cast<const VLiteral*>(expr->get_child(1).get());
DCHECK(_vliteral_to_orc_literal_and_type.contains(literal));
auto [orc_literal, predicate_type] = _vliteral_to_orc_literal_and_type[literal];
builder->lessThanEquals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_equals(const VExprSPtr& expr,
void OrcReader::_build_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(expr->get_num_children() == 2);
DCHECK(expr->get_child(0)->is_slot_ref());
DCHECK(expr->get_child(1)->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->get_child(0).get());
const auto* literal = static_cast<const VLiteral*>(expr->get_child(1).get());
DCHECK(_vliteral_to_orc_literal_and_type.contains(literal));
auto [orc_literal, predicate_type] = _vliteral_to_orc_literal_and_type[literal];
builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_filter_in(const VExprSPtr& expr,
void OrcReader::_build_filter_in(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() >= 2);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
DCHECK(expr->get_num_children() >= 2);
DCHECK(expr->get_child(0)->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->get_child(0).get());
std::vector<orc::Literal> literals;
orc::PredicateDataType predicate_type = orc::PredicateDataType::LONG;
for (size_t i = 1; i < expr->children().size(); ++i) {
for (size_t i = 1; i < expr->get_num_children(); ++i) {
DCHECK(expr->children()[i]->is_literal());
const auto* literal = static_cast<const VLiteral*>(expr->children()[i].get());
auto [valid, orc_literal, type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vliteral_to_orc_literal_and_type.contains(literal));
auto [orc_literal, type] = _vliteral_to_orc_literal_and_type[literal];
literals.emplace_back(orc_literal);
predicate_type = type;
}
DCHECK(!literals.empty());
builder->in(slot_ref->expr_name(), predicate_type, literals);
return true;
}

bool OrcReader::_build_is_null(const VExprSPtr& expr,
void OrcReader::_build_is_null(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 1);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
DCHECK(expr->get_num_children() == 1);
DCHECK(expr->get_child(0)->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->get_child(0).get());
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
builder->isNull(slot_ref->expr_name(), predicate_type);
return true;
}

bool OrcReader::_build_search_argument(const VExprSPtr& expr,
void OrcReader::_build_search_argument(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
if (expr == nullptr) {
return false;
}

// if expr can not be pushed down, skip it and continue to next expr
if (!_check_expr_can_push_down(expr)) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND: {
bool at_least_one_can_push_down = false;
case TExprOpcode::COMPOUND_AND:
builder->startAnd();
for (const auto& child : expr->children()) {
if (_build_search_argument(child, builder)) {
at_least_one_can_push_down = true;
}
}
if (!at_least_one_can_push_down) {
// if all exprs can not be pushed down, builder->end() will throw exception
return false;
_build_search_argument(child, builder);
}
builder->end();
break;
}
case TExprOpcode::COMPOUND_OR:
builder->startOr();
for (const auto& child : expr->children()) {
if (!_build_search_argument(child, builder)) {
return false;
}
_build_search_argument(child, builder);
}
builder->end();
break;
case TExprOpcode::COMPOUND_NOT:
builder->startNot();
DCHECK_EQ(expr->children().size(), 1);
if (!_build_search_argument(expr->children()[0], builder)) {
return false;
}
DCHECK_EQ(expr->get_num_children(), 1);
_build_search_argument(expr->get_child(0), builder);
builder->end();
break;
case TExprOpcode::GE:
builder->startNot();
if (!_build_less_than(expr, builder)) {
return false;
}
_build_less_than(expr, builder);
builder->end();
break;
case TExprOpcode::GT:
builder->startNot();
if (!_build_less_than_equals(expr, builder)) {
return false;
}
_build_less_than_equals(expr, builder);
builder->end();
break;
case TExprOpcode::LE:
if (!_build_less_than_equals(expr, builder)) {
return false;
}
_build_less_than_equals(expr, builder);
break;
case TExprOpcode::LT:
if (!_build_less_than(expr, builder)) {
return false;
}
_build_less_than(expr, builder);
break;
case TExprOpcode::EQ:
if (!_build_equals(expr, builder)) {
return false;
}
_build_equals(expr, builder);
break;
case TExprOpcode::NE:
builder->startNot();
if (!_build_equals(expr, builder)) {
return false;
}
_build_equals(expr, builder);
builder->end();
break;
case TExprOpcode::FILTER_IN:
if (!_build_filter_in(expr, builder)) {
return false;
}
_build_filter_in(expr, builder);
break;
case TExprOpcode::FILTER_NOT_IN:
builder->startNot();
if (!_build_filter_in(expr, builder)) {
return false;
}
_build_filter_in(expr, builder);
builder->end();
break;
// is null and is not null is represented as function call
case TExprOpcode::INVALID_OPCODE: {
case TExprOpcode::INVALID_OPCODE:
DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
if (expr->fn().name.function_name == "is_null_pred") {
if (!_build_is_null(expr, builder)) {
return false;
}
_build_is_null(expr, builder);
} else if (expr->fn().name.function_name == "is_not_null_pred") {
builder->startNot();
if (!_build_is_null(expr, builder)) {
return false;
}
_build_is_null(expr, builder);
builder->end();
} else {
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
break;
}
default: {

default:
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
}
return true;
}

bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {
Expand All @@ -898,7 +861,8 @@ bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {
bool at_least_one_can_push_down = false;
builder->startAnd();
for (const auto& expr_ctx : conjuncts) {
if (_build_search_argument(expr_ctx->root(), builder)) {
if (_check_expr_can_push_down(expr_ctx->root())) {
_build_search_argument(expr_ctx->root(), builder);
at_least_one_can_push_down = true;
}
}
Expand Down Expand Up @@ -944,7 +908,7 @@ Status OrcReader::set_fill_columns(
}
} else if (VInPredicate* in_predicate = typeid_cast<VInPredicate*>(filter_impl)) {
if (in_predicate->get_num_children() > 0) {
visit_slot(in_predicate->children()[0].get());
visit_slot(in_predicate->get_child(0).get());
}
} else {
for (auto& child : filter_impl->children()) {
Expand Down Expand Up @@ -1179,7 +1143,8 @@ Status OrcReader::_fill_partition_columns(
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually written : "
"Number of rows expected to be written : {}, number of rows actually "
"written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
Expand Down
Loading

0 comments on commit 04a6609

Please sign in to comment.