Skip to content

Commit

Permalink
[feat] support rf in partitioned hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Dec 19, 2024
1 parent 9a81b8d commit 65dd428
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 20 deletions.
50 changes: 40 additions & 10 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ class RuntimePredicateWrapper {
BloomFilterFuncBase* get_bloomfilter() const { return _context->bloom_filter_func.get(); }

void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) {
if (is_ignored()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet ignored rf");
if (is_ignored() || is_disabled()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet ignored/disabled rf");
}
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
Expand Down Expand Up @@ -476,6 +476,18 @@ class RuntimePredicateWrapper {
const TExpr& probe_expr);

Status merge(const RuntimePredicateWrapper* wrapper) {
if (wrapper->_context->disabled) {
_context->disabled = true;
_context->ignored = true;

LOG(INFO) << "Runtime filter is disabled: " << _filter_id;
return Status::OK();
}

if (_context->disabled) {
return Status::OK();
}

if (wrapper->is_ignored()) {
return Status::OK();
}
Expand Down Expand Up @@ -932,6 +944,10 @@ class RuntimePredicateWrapper {

void set_ignored() { _context->ignored = true; }

bool is_disabled() const { return _context->disabled; }

void set_disabled() { _context->disabled = true; }

void batch_assign(const PInFilter* filter,
void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set,
PColumnValue&)) {
Expand Down Expand Up @@ -1211,9 +1227,10 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
merge_filter_callback->cntl_->ignore_eovercrowded();
}

if (get_ignored()) {
if (get_ignored() || get_disabled()) {
merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
merge_filter_request->set_ignored(true);
merge_filter_request->set_disabled(get_disabled());
merge_filter_request->set_ignored(get_ignored());
} else {
RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));
}
Expand All @@ -1235,7 +1252,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
bool is_late_arrival) {
DCHECK(is_consumer());
auto origin_size = push_exprs.size();
if (!_wrapper->is_ignored()) {
if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) {
_set_push_down(!is_late_arrival);
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr));
}
Expand Down Expand Up @@ -1284,10 +1301,10 @@ PrimitiveType IRuntimeFilter::column_type() const {
void IRuntimeFilter::signal() {
DCHECK(is_consumer());

if (!_wrapper->is_ignored() && _wrapper->is_bloomfilter() &&
if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() &&
!_wrapper->get_bloomfilter()->inited()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored, rf: {}",
debug_string());
throw Exception(ErrorCode::INTERNAL_ERROR,
"bf not inited and not ignored and not disabled, rf: {}", debug_string());
}

COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS));
Expand Down Expand Up @@ -1339,12 +1356,21 @@ bool IRuntimeFilter::get_ignored() {
return _wrapper->is_ignored();
}

void IRuntimeFilter::set_disabled() {
_wrapper->_context->disabled = true;
}

bool IRuntimeFilter::get_disabled() const {
return _wrapper->is_disabled();
}

std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
"[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}, Ignored = {}]",
"HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]",
_filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target,
_has_local_target, _wrapper->_context->ignored);
_has_local_target, _wrapper->_context->ignored, _wrapper->_context->disabled,
_wrapper->get_real_type(), wait_time_ms());
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
Expand Down Expand Up @@ -1497,6 +1523,10 @@ Status IRuntimeFilter::_create_wrapper(const T* param,
*wrapper = std::make_unique<RuntimePredicateWrapper>(column_type, get_type(filter_type),
param->request->filter_id());

if (param->request->has_disabled() && param->request->disabled()) {
(*wrapper)->set_disabled();
}

if (param->request->has_ignored() && param->request->ignored()) {
(*wrapper)->set_ignored();
return Status::OK();
Expand Down
4 changes: 4 additions & 0 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ class IRuntimeFilter {

bool get_ignored();

void set_disabled();

bool get_disabled() const;

RuntimeFilterType get_real_type();

bool need_sync_filter_size();
Expand Down
13 changes: 10 additions & 3 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,18 @@ class VRuntimeFilterSlots {
return filter->need_sync_filter_size() ? filter->get_synced_size() : hash_table_size;
}

Status disable_filters(RuntimeState* state) {
for (auto& filter : _runtime_filters) {
filter->set_disabled();
}
return Status::OK();
}

Status ignore_filters(RuntimeState* state) {
// process ignore duplicate IN_FILTER
std::unordered_set<int> has_in_filter;
for (auto filter : _runtime_filters) {
if (filter->get_ignored()) {
if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
Expand All @@ -89,7 +96,7 @@ class VRuntimeFilterSlots {

// process ignore filter when it has IN_FILTER on same expr
for (auto filter : _runtime_filters) {
if (filter->get_ignored()) {
if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
Expand Down Expand Up @@ -135,7 +142,7 @@ class VRuntimeFilterSlots {
int result_column_id = _build_expr_context[i]->get_last_result_column_id();
const auto& column = block->get_by_position(result_column_id).column;
for (auto* filter : iter->second) {
if (filter->get_ignored()) {
if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
filter->insert_batch(column, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
while (!state->is_cancelled() && !has_agg_data &&
!_shared_state->spill_partitions.empty()) {
while (!_shared_state->spill_partitions[0]->spill_streams_.empty() &&
!state->is_cancelled()) {
!state->is_cancelled() && !has_agg_data) {
auto& stream = _shared_state->spill_partitions[0]->spill_streams_[0];
stream->set_read_counters(profile());
vectorized::Block block;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,6 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
local_state._shared_state->inner_runtime_state.get(), block, eos));
if (*eos) {
_update_profile_from_internal_states(local_state);
local_state._shared_state->inner_runtime_state.reset();
}
}

Expand Down
Loading

0 comments on commit 65dd428

Please sign in to comment.