Skip to content

Commit

Permalink
[BugFix] Capture query context when use its mem_tracker out of its li…
Browse files Browse the repository at this point in the history
…fetime (backport #49123)

Signed-off-by: satanson <[email protected]>
  • Loading branch information
satanson committed Jul 31, 2024
1 parent 44a3ca7 commit 2151d66
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions be/src/runtime/runtime_filter_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "runtime/runtime_filter_worker.h"

#include <random>
#include <utility>

#include "exec/pipeline/query_context.h"
#include "exprs/vectorized/runtime_filter_bank.h"
Expand All @@ -23,15 +24,23 @@

namespace starrocks {

static inline std::shared_ptr<MemTracker> get_mem_tracker(const PUniqueId& query_id, bool is_pipeline) {
// Using a query-level mem_tracker beyond QueryContext's lifetime may access already destructed parent mem_tracker.
// mem_trackers has a hierarchy: process->query_pool->resource_group->query, so when resource_group is dropped or
// altered, resource_group-level mem_tracker would be destructed, such a dangling query-level mem_tracker would cause
// BE's crash when it accesses its parent mem_tracker(i.e. resource_group-level mem_tracker). so we need capture
// query context to prevent it from being destructed, and when a dropping resource_group is used by outstanding query
// contexts, it would be delayed to be dropped until all its outstanding query contexts are destructed.
static inline std::pair<pipeline::QueryContextPtr, std::shared_ptr<MemTracker>> get_mem_tracker(
const PUniqueId& query_id, bool is_pipeline) {
if (is_pipeline) {
TUniqueId tquery_id;
tquery_id.lo = query_id.lo();
tquery_id.hi = query_id.hi();
auto query_ctx = ExecEnv::GetInstance()->query_context_mgr()->get(tquery_id);
return query_ctx == nullptr ? nullptr : query_ctx->mem_tracker();
auto mem_tracker = query_ctx == nullptr ? nullptr : query_ctx->mem_tracker();
return std::make_pair(query_ctx, mem_tracker);
} else {
return nullptr;
return std::make_pair(nullptr, nullptr);
}
}

Expand Down Expand Up @@ -199,7 +208,7 @@ Status RuntimeFilterMerger::init(const TRuntimeFilterParams& params) {
}

void RuntimeFilterMerger::merge_runtime_filter(PTransmitRuntimeFilterParams& params) {
auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());

DCHECK(params.is_partial());
Expand Down Expand Up @@ -628,7 +637,7 @@ static inline Status receive_total_runtime_filter_pipeline(
}

void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterParams& request) {
auto mem_tracker = get_mem_tracker(request.query_id(), request.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(request.query_id(), request.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());
// deserialize once, and all fragment instance shared that runtime filter.
vectorized::JoinRuntimeFilter* rf = nullptr;
Expand Down Expand Up @@ -701,7 +710,7 @@ void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterPa

void RuntimeFilterWorker::_process_send_broadcast_runtime_filter_event(
PTransmitRuntimeFilterParams&& params, std::vector<TRuntimeFilterDestination>&& destinations, int timeout_ms) {
auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());

std::random_device rd;
Expand Down

0 comments on commit 2151d66

Please sign in to comment.