From c4d45e6f970961e2cfb77cfaf4702acd255b6d77 Mon Sep 17 00:00:00 2001 From: satanson Date: Tue, 30 Jul 2024 20:41:13 +0800 Subject: [PATCH] [BugFix] Capture query context when use its mem_tracker out of its lifetime (#49123) Signed-off-by: satanson (cherry picked from commit e2c37dbc8ea16d07399e7b626ba168237ecc67ce) # Conflicts: # be/src/runtime/runtime_filter_worker.cpp --- be/src/runtime/runtime_filter_worker.cpp | 25 +++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/runtime_filter_worker.cpp b/be/src/runtime/runtime_filter_worker.cpp index 02b061221f985..641b1322592e3 100644 --- a/be/src/runtime/runtime_filter_worker.cpp +++ b/be/src/runtime/runtime_filter_worker.cpp @@ -3,6 +3,7 @@ #include "runtime/runtime_filter_worker.h" #include +#include #include "exec/pipeline/query_context.h" #include "exprs/vectorized/runtime_filter_bank.h" @@ -23,15 +24,23 @@ namespace starrocks { -static inline std::shared_ptr get_mem_tracker(const PUniqueId& query_id, bool is_pipeline) { +// Using a query-level mem_tracker beyond QueryContext's lifetime may access already destructed parent mem_tracker. +// mem_trackers has a hierarchy: process->query_pool->resource_group->query, so when resource_group is dropped or +// altered, resource_group-level mem_tracker would be destructed, such a dangling query-level mem_tracker would cause +// BE's crash when it accesses its parent mem_tracker(i.e. resource_group-level mem_tracker). so we need capture +// query context to prevent it from being destructed, and when a dropping resource_group is used by outstanding query +// contexts, it would be delayed to be dropped until all its outstanding query contexts are destructed. +static inline std::pair> get_mem_tracker( + const PUniqueId& query_id, bool is_pipeline) { if (is_pipeline) { TUniqueId tquery_id; tquery_id.lo = query_id.lo(); tquery_id.hi = query_id.hi(); auto query_ctx = ExecEnv::GetInstance()->query_context_mgr()->get(tquery_id); - return query_ctx == nullptr ? nullptr : query_ctx->mem_tracker(); + auto mem_tracker = query_ctx == nullptr ? nullptr : query_ctx->mem_tracker(); + return std::make_pair(query_ctx, mem_tracker); } else { - return nullptr; + return std::make_pair(nullptr, nullptr); } } @@ -199,7 +208,7 @@ Status RuntimeFilterMerger::init(const TRuntimeFilterParams& params) { } void RuntimeFilterMerger::merge_runtime_filter(PTransmitRuntimeFilterParams& params) { - auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); DCHECK(params.is_partial()); @@ -628,7 +637,7 @@ static inline Status receive_total_runtime_filter_pipeline( } void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterParams& request) { - auto mem_tracker = get_mem_tracker(request.query_id(), request.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(request.query_id(), request.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); // deserialize once, and all fragment instance shared that runtime filter. vectorized::JoinRuntimeFilter* rf = nullptr; @@ -700,8 +709,14 @@ void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterPa } void RuntimeFilterWorker::_process_send_broadcast_runtime_filter_event( +<<<<<<< HEAD PTransmitRuntimeFilterParams&& params, std::vector&& destinations, int timeout_ms) { auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline()); +======= + PTransmitRuntimeFilterParams&& params, std::vector&& destinations, int timeout_ms, + int64_t rpc_http_min_size) { + auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline()); +>>>>>>> e2c37dbc8e ([BugFix] Capture query context when use its mem_tracker out of its lifetime (#49123)) SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); std::random_device rd;