From fb61233245b32578a1329d47314bdb9bee60fb62 Mon Sep 17 00:00:00 2001 From: satanson Date: Tue, 30 Jul 2024 20:41:13 +0800 Subject: [PATCH] [BugFix] Capture query context when use its mem_tracker out of its lifetime (backport #49123) Signed-off-by: satanson --- be/src/runtime/runtime_filter_worker.cpp | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/be/src/runtime/runtime_filter_worker.cpp b/be/src/runtime/runtime_filter_worker.cpp index f3c0b45d0eae6..c90a90052c7d1 100644 --- a/be/src/runtime/runtime_filter_worker.cpp +++ b/be/src/runtime/runtime_filter_worker.cpp @@ -15,6 +15,7 @@ #include "runtime/runtime_filter_worker.h" #include +#include #include "exec/pipeline/query_context.h" #include "exprs/runtime_filter_bank.h" @@ -35,15 +36,23 @@ namespace starrocks { -static inline std::shared_ptr get_mem_tracker(const PUniqueId& query_id, bool is_pipeline) { +// Using a query-level mem_tracker beyond QueryContext's lifetime may access already destructed parent mem_tracker. +// mem_trackers has a hierarchy: process->query_pool->resource_group->query, so when resource_group is dropped or +// altered, resource_group-level mem_tracker would be destructed, such a dangling query-level mem_tracker would cause +// BE's crash when it accesses its parent mem_tracker(i.e. resource_group-level mem_tracker). so we need capture +// query context to prevent it from being destructed, and when a dropping resource_group is used by outstanding query +// contexts, it would be delayed to be dropped until all its outstanding query contexts are destructed. +static inline std::pair> get_mem_tracker( + const PUniqueId& query_id, bool is_pipeline) { if (is_pipeline) { TUniqueId tquery_id; tquery_id.lo = query_id.lo(); tquery_id.hi = query_id.hi(); auto query_ctx = ExecEnv::GetInstance()->query_context_mgr()->get(tquery_id); - return query_ctx == nullptr ? nullptr : query_ctx->mem_tracker(); + auto mem_tracker = query_ctx == nullptr ? nullptr : query_ctx->mem_tracker(); + return std::make_pair(query_ctx, mem_tracker); } else { - return nullptr; + return std::make_pair(nullptr, nullptr); } } @@ -211,7 +220,7 @@ Status RuntimeFilterMerger::init(const TRuntimeFilterParams& params) { } void RuntimeFilterMerger::merge_runtime_filter(PTransmitRuntimeFilterParams& params) { - auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); DCHECK(params.is_partial()); @@ -640,7 +649,7 @@ static inline Status receive_total_runtime_filter_pipeline(PTransmitRuntimeFilte } void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterParams& request) { - auto mem_tracker = get_mem_tracker(request.query_id(), request.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(request.query_id(), request.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); // deserialize once, and all fragment instance shared that runtime filter. JoinRuntimeFilter* rf = nullptr; @@ -713,7 +722,7 @@ void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterPa void RuntimeFilterWorker::_process_send_broadcast_runtime_filter_event( PTransmitRuntimeFilterParams&& params, std::vector&& destinations, int timeout_ms) { - auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); std::random_device rd;