diff --git a/be/src/runtime/runtime_filter_worker.cpp b/be/src/runtime/runtime_filter_worker.cpp index 933ac9d06e3a5..d444ff2b432ae 100644 --- a/be/src/runtime/runtime_filter_worker.cpp +++ b/be/src/runtime/runtime_filter_worker.cpp @@ -15,6 +15,7 @@ #include "runtime/runtime_filter_worker.h" #include +#include #include "exec/pipeline/query_context.h" #include "exprs/runtime_filter_bank.h" @@ -35,15 +36,23 @@ namespace starrocks { -static inline std::shared_ptr get_mem_tracker(const PUniqueId& query_id, bool is_pipeline) { +// Using a query-level mem_tracker beyond QueryContext's lifetime may access already destructed parent mem_tracker. +// mem_trackers has a hierarchy: process->query_pool->resource_group->query, so when resource_group is dropped or +// altered, resource_group-level mem_tracker would be destructed, such a dangling query-level mem_tracker would cause +// BE's crash when it accesses its parent mem_tracker(i.e. resource_group-level mem_tracker). so we need capture +// query context to prevent it from being destructed, and when a dropping resource_group is used by outstanding query +// contexts, it would be delayed to be dropped until all its outstanding query contexts are destructed. +static inline std::pair> get_mem_tracker( + const PUniqueId& query_id, bool is_pipeline) { if (is_pipeline) { TUniqueId tquery_id; tquery_id.lo = query_id.lo(); tquery_id.hi = query_id.hi(); auto query_ctx = ExecEnv::GetInstance()->query_context_mgr()->get(tquery_id); - return query_ctx == nullptr ? nullptr : query_ctx->mem_tracker(); + auto mem_tracker = query_ctx == nullptr ? nullptr : query_ctx->mem_tracker(); + return std::make_pair(query_ctx, mem_tracker); } else { - return nullptr; + return std::make_pair(nullptr, nullptr); } } @@ -237,7 +246,7 @@ Status RuntimeFilterMerger::init(const TRuntimeFilterParams& params) { } void RuntimeFilterMerger::merge_runtime_filter(PTransmitRuntimeFilterParams& params) { - auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); DCHECK(params.is_partial()); @@ -672,7 +681,7 @@ static inline void receive_total_runtime_filter_pipeline(PTransmitRuntimeFilterP } void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterParams& request) { - auto mem_tracker = get_mem_tracker(request.query_id(), request.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(request.query_id(), request.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); // deserialize once, and all fragment instance shared that runtime filter. JoinRuntimeFilter* rf = nullptr; @@ -743,7 +752,7 @@ void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterPa void RuntimeFilterWorker::_process_send_broadcast_runtime_filter_event( PTransmitRuntimeFilterParams&& params, std::vector&& destinations, int timeout_ms, int64_t rpc_http_min_size) { - auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline()); + auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline()); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get()); std::random_device rd;