Skip to content

Commit

Permalink
[test](join) Fuzzy disable runtime filters in BE
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Dec 20, 2024
1 parent 62a6360 commit d8ec9fd
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 21 deletions.
46 changes: 38 additions & 8 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,15 @@ class RuntimePredicateWrapper {
const TExpr& probe_expr);

Status merge(const RuntimePredicateWrapper* wrapper) {
if (wrapper->is_ignored()) {
if (wrapper->is_disabled()) {
set_disabled();
return Status::OK();
}

if (wrapper->is_ignored() || is_disabled()) {
return Status::OK();
}

_context->ignored = false;

bool can_not_merge_in_or_bloom =
Expand Down Expand Up @@ -938,6 +944,10 @@ class RuntimePredicateWrapper {

void set_ignored() { _context->ignored = true; }

bool is_disabled() const { return _context->disabled; }

void set_disabled() { _context->disabled = true; }

void batch_assign(const PInFilter* filter,
void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set,
PColumnValue&)) {
Expand Down Expand Up @@ -1216,9 +1226,10 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
merge_filter_callback->cntl_->ignore_eovercrowded();
}

if (get_ignored()) {
if (get_ignored() || get_disabled()) {
merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
merge_filter_request->set_ignored(true);
merge_filter_request->set_ignored(get_ignored());
merge_filter_request->set_disabled(get_disabled());
} else {
RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));
}
Expand All @@ -1240,7 +1251,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
bool is_late_arrival) {
DCHECK(is_consumer());
auto origin_size = push_exprs.size();
if (!_wrapper->is_ignored()) {
if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) {
_set_push_down(!is_late_arrival);
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr));
}
Expand Down Expand Up @@ -1289,9 +1300,9 @@ PrimitiveType IRuntimeFilter::column_type() const {
void IRuntimeFilter::signal() {
DCHECK(is_consumer());

if (!_wrapper->is_ignored() && _wrapper->is_bloomfilter() &&
if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() &&
!_wrapper->get_bloomfilter()->inited()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored, rf: {}",
throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored/disabled, rf: {}",
debug_string());
}

Expand Down Expand Up @@ -1344,12 +1355,21 @@ bool IRuntimeFilter::get_ignored() {
return _wrapper->is_ignored();
}

void IRuntimeFilter::set_disabled() {
_wrapper->set_disabled();
}

bool IRuntimeFilter::get_disabled() const {
return _wrapper->is_disabled();
}

std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
"[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}, Ignored = {}]",
"HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]",
_filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target,
_has_local_target, _wrapper->_context->ignored);
_has_local_target, _wrapper->_context->ignored, _wrapper->_context->disabled,
_wrapper->get_real_type(), wait_time_ms());
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
Expand Down Expand Up @@ -1451,6 +1471,11 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param,
*wrapper = std::make_shared<RuntimePredicateWrapper>(column_type, get_type(filter_type),
param->request->filter_id());

if (param->request->has_disabled() && param->request->disabled()) {
(*wrapper)->set_disabled();
return Status::OK();
}

if (param->request->has_ignored() && param->request->ignored()) {
(*wrapper)->set_ignored();
return Status::OK();
Expand Down Expand Up @@ -1497,6 +1522,11 @@ Status IRuntimeFilter::_create_wrapper(const T* param,
*wrapper = std::make_unique<RuntimePredicateWrapper>(column_type, get_type(filter_type),
param->request->filter_id());

if (param->request->has_disabled() && param->request->disabled()) {
(*wrapper)->set_disabled();
return Status::OK();
}

if (param->request->has_ignored() && param->request->ignored()) {
(*wrapper)->set_ignored();
return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ class IRuntimeFilter {

bool get_ignored();

void set_disabled();
bool get_disabled() const;

RuntimeFilterType get_real_type();

bool need_sync_filter_size();
Expand Down
25 changes: 19 additions & 6 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,17 @@ class VRuntimeFilterSlots {
return filter->need_sync_filter_size() ? filter->get_synced_size() : hash_table_size;
}

Status ignore_filters(RuntimeState* state) {
/**
Disable meaningless filters, such as filters:
RF1: col1 in (1, 3, 5)
RF2: col1 min: 1, max: 5
We consider RF2 is meaningless, because RF1 has already filtered out all values that RF2 can filter.
*/
Status disable_meaningless_filters(RuntimeState* state) {
// process ignore duplicate IN_FILTER
std::unordered_set<int> has_in_filter;
for (auto filter : _runtime_filters) {
if (filter->get_ignored()) {
if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
Expand All @@ -81,22 +87,22 @@ class VRuntimeFilterSlots {
continue;
}
if (has_in_filter.contains(filter->expr_order())) {
filter->set_ignored();
filter->set_disabled();
continue;
}
has_in_filter.insert(filter->expr_order());
}

// process ignore filter when it has IN_FILTER on same expr
for (auto filter : _runtime_filters) {
if (filter->get_ignored()) {
if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
!has_in_filter.contains(filter->expr_order())) {
continue;
}
filter->set_ignored();
filter->set_disabled();
}
return Status::OK();
}
Expand All @@ -108,6 +114,13 @@ class VRuntimeFilterSlots {
return Status::OK();
}

Status disable_all_filters() {
for (auto filter : _runtime_filters) {
filter->set_disabled();
}
return Status::OK();
}

Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto filter : _runtime_filters) {
Expand Down Expand Up @@ -135,7 +148,7 @@ class VRuntimeFilterSlots {
int result_column_id = _build_expr_context[i]->get_last_result_column_id();
const auto& column = block->get_by_position(result_column_id).column;
for (auto* filter : iter->second) {
if (filter->get_ignored()) {
if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
filter->insert_batch(column, 1);
Expand Down
51 changes: 46 additions & 5 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "hashjoin_build_sink.h"

#include <cstdlib>
#include <string>

#include "exprs/bloom_filter_func.h"
Expand Down Expand Up @@ -105,6 +106,15 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));

#ifndef NDEBUG
if (state->fuzzy_disable_runtime_filter_in_be()) {
if ((_parent->operator_id() + random()) % 2 == 0) {
RETURN_IF_ERROR(disable_runtime_filters(state));
}
}
#endif

return Status::OK();
}

Expand Down Expand Up @@ -135,7 +145,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos ||
_runtime_filters_disabled) {
return Base::close(state, exec_status);
}

Expand All @@ -150,7 +161,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
RETURN_IF_ERROR(_runtime_filter_slots->disable_meaningless_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
Expand Down Expand Up @@ -179,6 +190,33 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
return Base::close(state, exec_status);
}

Status HashJoinBuildSinkLocalState::disable_runtime_filters(RuntimeState* state) {
if (_runtime_filters_disabled) {
return Status::OK();
}

if (_runtime_filters.empty()) {
return Status::OK();
}

if (!_should_build_hash_table) {
return Status::OK();
}

if (_runtime_filters.empty()) {
return Status::OK();
}

DCHECK(_runtime_filter_slots) << "_runtime_filter_slots should be initialized";

_runtime_filters_disabled = true;
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->disable_all_filters());

SCOPED_TIMER(_publish_runtime_filter_timer);
return _runtime_filter_slots->publish(state, !_should_build_hash_table);
}

bool HashJoinBuildSinkLocalState::build_unique() const {
return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
}
Expand Down Expand Up @@ -509,9 +547,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._shared_state->build_block = std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());

RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size(
state, local_state._shared_state->build_block->rows(),
local_state._finish_dependency));
if (!local_state._runtime_filters_disabled) {
RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size(
state, local_state._shared_state->build_block->rows(),
local_state._finish_dependency));
}

RETURN_IF_ERROR(
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class HashJoinBuildSinkLocalState final

Status close(RuntimeState* state, Status exec_status) override;

Status disable_runtime_filters(RuntimeState* state);

protected:
Status _hash_table_init(RuntimeState* state);
void _set_build_side_has_external_nullmap(vectorized::Block& block,
Expand All @@ -76,6 +78,8 @@ class HashJoinBuildSinkLocalState final

bool _should_build_hash_table = true;

bool _runtime_filters_disabled = false;

size_t _build_side_rows = 0;

vectorized::MutableBlock _build_side_mutable_block;
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,11 @@ Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que
void* data = nullptr;
int len = 0;
bool has_attachment = false;
if (!cnt_val->filter->get_ignored()) {
if (!cnt_val->filter->get_ignored() && !cnt_val->filter->get_disabled()) {
RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len));
} else {
apply_request.set_ignored(true);
apply_request.set_ignored(cnt_val->filter->get_ignored());
apply_request.set_disabled(cnt_val->filter->get_disabled());
apply_request.set_filter_type(PFilterType::UNKNOW_FILTER);
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,11 @@ class RuntimeState {
_query_options.enable_shared_exchange_sink_buffer;
}

bool fuzzy_disable_runtime_filter_in_be() const {
return _query_options.__isset.fuzzy_disable_runtime_filter_in_be &&
_query_options.fuzzy_disable_runtime_filter_in_be;
}

int64_t min_revocable_mem() const {
if (_query_options.__isset.min_revocable_mem) {
return std::max(_query_options.min_revocable_mem, (int64_t)1);
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct RuntimeFilterContext {
std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
bool ignored = false;
bool disabled = false;
std::string err_msg;
};

Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";

public static final String FUZZY_DISABLE_RUNTIME_FILTER_IN_BE = "fuzzy_disable_runtime_filter_in_be";

public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";

public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
Expand Down Expand Up @@ -2250,6 +2252,13 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
needForward = true, fuzzy = true)
public long dataQueueMaxBlocks = 1;

@VariableMgr.VarAttr(
name = FUZZY_DISABLE_RUNTIME_FILTER_IN_BE,
description = {"在 BE 上开启禁用 runtime filter 的随机开关,用于测试",
"Disable the runtime filter on the BE for testing purposes."},
needForward = true, fuzzy = false)
public boolean fuzzyDisableRuntimeFilterInBE = false;

// If the memory consumption of sort node exceed this limit, will trigger spill to disk;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
Expand Down Expand Up @@ -2499,6 +2508,8 @@ public void initFuzzyModeVariables() {
this.batchSize = 1024;
this.enableFoldConstantByBe = false;
}

this.fuzzyDisableRuntimeFilterInBE = true;
}
}

Expand Down Expand Up @@ -3979,6 +3990,7 @@ public TQueryOptions toThrift() {
tResult.setEnableForceSpill(enableForceSpill);
tResult.setMinRevocableMem(minRevocableMem);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
tResult.setFuzzyDisableRuntimeFilterInBe(fuzzyDisableRuntimeFilterInBE);

tResult.setEnableLocalMergeSort(enableLocalMergeSort);
tResult.setEnableSharedExchangeSinkBuffer(enableSharedExchangeSinkBuffer);
Expand Down
3 changes: 3 additions & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ message PMergeFilterRequest {
optional bool contain_null = 11;
optional bool ignored = 12;
optional uint64 local_merge_time = 13;
optional bool disabled = 14;
};

message PMergeFilterResponse {
Expand All @@ -615,6 +616,7 @@ message PPublishFilterRequest {
optional PColumnType column_type = 10;
optional bool contain_null = 11;
optional bool ignored = 12;
optional bool disabled = 13;
};

message PPublishFilterRequestV2 {
Expand All @@ -631,6 +633,7 @@ message PPublishFilterRequestV2 {
optional bool ignored = 11;
repeated int32 fragment_ids = 12;
optional uint64 local_merge_time = 13;
optional bool disabled = 14;
};

message PPublishFilterResponse {
Expand Down
Loading

0 comments on commit d8ec9fd

Please sign in to comment.