Skip to content

Commit

Permalink
[BugFix] Capture query context when use its mem_tracker out of its li…
Browse files Browse the repository at this point in the history
…fetime

Signed-off-by: satanson <[email protected]>
  • Loading branch information
satanson committed Jul 30, 2024
1 parent 07b9ee1 commit 66e10bc
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions be/src/runtime/runtime_filter_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cstddef>
#include <random>
#include <utility>

#include "exec/pipeline/query_context.h"
#include "exprs/runtime_filter_bank.h"
Expand All @@ -37,15 +38,17 @@

namespace starrocks {

static inline std::shared_ptr<MemTracker> get_mem_tracker(const PUniqueId& query_id, bool is_pipeline) {
static inline std::pair<pipeline::QueryContextPtr, std::shared_ptr<MemTracker>> get_mem_tracker(
const PUniqueId& query_id, bool is_pipeline) {
if (is_pipeline) {
TUniqueId tquery_id;
tquery_id.lo = query_id.lo();
tquery_id.hi = query_id.hi();
auto query_ctx = ExecEnv::GetInstance()->query_context_mgr()->get(tquery_id);
return query_ctx == nullptr ? nullptr : query_ctx->mem_tracker();
auto mem_tracker = query_ctx == nullptr ? nullptr : query_ctx->mem_tracker();
return std::make_pair(query_ctx, mem_tracker);
} else {
return nullptr;
return std::make_pair(nullptr, nullptr);
}
}

Expand Down Expand Up @@ -239,7 +242,7 @@ Status RuntimeFilterMerger::init(const TRuntimeFilterParams& params) {
}

void RuntimeFilterMerger::merge_runtime_filter(PTransmitRuntimeFilterParams& params) {
auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());

DCHECK(params.is_partial());
Expand Down Expand Up @@ -674,7 +677,7 @@ static inline void receive_total_runtime_filter_pipeline(PTransmitRuntimeFilterP
}

void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterParams& request) {
auto mem_tracker = get_mem_tracker(request.query_id(), request.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(request.query_id(), request.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());
// deserialize once, and all fragment instance shared that runtime filter.
JoinRuntimeFilter* rf = nullptr;
Expand Down Expand Up @@ -745,7 +748,7 @@ void RuntimeFilterWorker::_receive_total_runtime_filter(PTransmitRuntimeFilterPa
void RuntimeFilterWorker::_process_send_broadcast_runtime_filter_event(
PTransmitRuntimeFilterParams&& params, std::vector<TRuntimeFilterDestination>&& destinations, int timeout_ms,
int64_t rpc_http_min_size) {
auto mem_tracker = get_mem_tracker(params.query_id(), params.is_pipeline());
auto [query_ctx, mem_tracker] = get_mem_tracker(params.query_id(), params.is_pipeline());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker.get());

std::random_device rd;
Expand Down

0 comments on commit 66e10bc

Please sign in to comment.