diff --git a/be/src/runtime/runtime_filter_worker.cpp b/be/src/runtime/runtime_filter_worker.cpp index 02b061221f985..b9a052e997b92 100644 --- a/be/src/runtime/runtime_filter_worker.cpp +++ b/be/src/runtime/runtime_filter_worker.cpp @@ -3,6 +3,7 @@ #include "runtime/runtime_filter_worker.h" #include +#include #include "exec/pipeline/query_context.h" #include "exprs/vectorized/runtime_filter_bank.h" @@ -23,15 +24,23 @@ namespace starrocks { -static inline std::shared_ptr get_mem_tracker(const PUniqueId& query_id, bool is_pipeline) { +// Using a query-level mem_tracker beyond QueryContext's lifetime may access already destructed parent mem_tracker. +// mem_trackers has a hierarchy: process->query_pool->resource_group->query, so when resource_group is dropped or +// altered, resource_group-level mem_tracker would be destructed, such a dangling query-level mem_tracker would cause +// BE's crash when it accesses its parent mem_tracker(i.e. resource_group-level mem_tracker). so we need capture +// query context to prevent it from being destructed, and when a dropping resource_group is used by outstanding query +// contexts, it would be delayed to be dropped until all its outstanding query contexts are destructed. +static inline std::pair> get_mem_tracker( + const PUniqueId& query_id, bool is_pipeline) { if (is_pipeline) { TUniqueId tquery_id; tquery_id.lo = query_id.lo(); tquery_id.hi = query_id.hi(); auto query_ctx = ExecEnv::GetInstance()->query_context_mgr()->get(tquery_id); - return query_ctx == nullptr ? nullptr : query_ctx->mem_tracker(); + auto mem_tracker = query_ctx == nullptr ? nullptr : query_ctx->mem_tracker(); + return std::make_pair(query_ctx, mem_tracker); } else { - return nullptr; + return std::make_pair(nullptr, nullptr); } } @@ -199,7 +208,7 @@ Status RuntimeFilterMerger::init(const TRuntimeFilterParams& params) { } void RuntimeFilterMerger::merge_runtime_filter(PTransmitRuntimeFilterParams& params) { - auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); DCHECK(params.is_partial()); @@ -628,7 +637,7 @@ static inline Status receive_total_runtime_filter_pipeline( } void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterParams& request) { - auto mem_tracker = get_mem_tracker(request.query_id(), request.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(request.query_id(), request.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); // deserialize once, and all fragment instance shared that runtime filter. vectorized::JoinRuntimeFilter* rf = nullptr; @@ -701,7 +710,7 @@ void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterPa void RuntimeFilterWorker::_process_send_broadcast_runtime_filter_event( PTransmitRuntimeFilterParams&& params, std::vector&& destinations, int timeout_ms) { - auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); std::random_device rd;