Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Capture query context when use its mem_tracker out of its lifetime #49123

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions be/src/runtime/runtime_filter_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cstddef>
#include <random>
#include <utility>

#include "exec/pipeline/query_context.h"
#include "exprs/runtime_filter_bank.h"
Expand All @@ -37,15 +38,23 @@

namespace starrocks {

static inline std::shared_ptr<MemTracker> get_mem_tracker(const PUniqueId& query_id, bool is_pipeline) {
// Using a query-level mem_tracker beyond QueryContext's lifetime may access already destructed parent mem_tracker.
// mem_trackers has a hierarchy: process->query_pool->resource_group->query, so when resource_group is dropped or
// altered, resource_group-level mem_tracker would be destructed, such a dangling query-level mem_tracker would cause
// BE's crash when it accesses its parent mem_tracker(i.e. resource_group-level mem_tracker). so we need capture
// query context to prevent it from being destructed, and when a dropping resource_group is used by outstanding query
// contexts, it would be delayed to be dropped until all its outstanding query contexts are destructed.
static inline std::pair<pipeline::QueryContextPtr, std::shared_ptr<MemTracker>> get_mem_tracker(
const PUniqueId& query_id, bool is_pipeline) {
if (is_pipeline) {
TUniqueId tquery_id;
tquery_id.lo = query_id.lo();
tquery_id.hi = query_id.hi();
auto query_ctx = ExecEnv::GetInstance()->query_context_mgr()->get(tquery_id);
return query_ctx == nullptr ? nullptr : query_ctx->mem_tracker();
auto mem_tracker = query_ctx == nullptr ? nullptr : query_ctx->mem_tracker();
return std::make_pair(query_ctx, mem_tracker);
} else {
return nullptr;
return std::make_pair(nullptr, nullptr);
}
}

Expand Down Expand Up @@ -239,7 +248,7 @@ Status RuntimeFilterMerger::init(const TRuntimeFilterParams& params) {
}

void RuntimeFilterMerger::merge_runtime_filter(PTransmitRuntimeFilterParams& params) {
auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());

DCHECK(params.is_partial());
Expand Down Expand Up @@ -674,7 +683,7 @@ static inline void receive_total_runtime_filter_pipeline(PTransmitRuntimeFilterP
}

void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterParams& request) {
auto mem_tracker = get_mem_tracker(request.query_id(), request.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(request.query_id(), request.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());
// deserialize once, and all fragment instance shared that runtime filter.
JoinRuntimeFilter* rf = nullptr;
Expand Down Expand Up @@ -745,7 +754,7 @@ void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterPa
void RuntimeFilterWorker::_process_send_broadcast_runtime_filter_event(
PTransmitRuntimeFilterParams&& params, std::vector<TRuntimeFilterDestination>&& destinations, int timeout_ms,
int64_t rpc_http_min_size) {
auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());

std::random_device rd;
Expand Down
Loading