Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jan 6, 2025
1 parent a355a18 commit ca4c07e
Show file tree
Hide file tree
Showing 17 changed files with 614 additions and 631 deletions.
6 changes: 3 additions & 3 deletions be/src/olap/rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class RowIdConversion {
"consuming "
"tracker:<{}>, peak used {}, current used {}.",
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str(),
doris::thread_context()->thread_mem_tracker()->label(),
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
doris::thread_context()->thread_mem_tracker()->consumption()));
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(),
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->peak_consumption(),
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->consumption()));
}

uint32_t id = _segments_rowid_map.size();
Expand Down
18 changes: 6 additions & 12 deletions be/src/runtime/memory/memory_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,27 @@ class MemoryProfile {
void make_memory_profile(RuntimeProfile* profile) const;

std::string print_memory_overview_profile() const {
return return_memory_profile_str(_memory_overview_profile.get());
return _memory_overview_profile->pretty_print();
}

std::string print_global_memory_profile() const {
return return_memory_profile_str(_global_memory_profile.get().get());
return _global_memory_profile.get()->pretty_print();
}

std::string print_metadata_memory_profile() const {
return return_memory_profile_str(_metadata_memory_profile.get().get());
return _metadata_memory_profile.get()->pretty_print();
}

std::string print_cache_memory_profile() const {
return return_memory_profile_str(_cache_memory_profile.get().get());
return _cache_memory_profile.get()->pretty_print();
}

std::string print_top_memory_tasks_profile() const {
return return_memory_profile_str(_top_memory_tasks_profile.get().get());
return _top_memory_tasks_profile.get()->pretty_print();
}

std::string print_tasks_memory_profile() const {
return return_memory_profile_str(_tasks_memory_profile.get().get());
return _tasks_memory_profile.get()->pretty_print();
}

static int64_t query_current_usage();
Expand All @@ -67,12 +67,6 @@ class MemoryProfile {
void print_log_process_usage();

private:
std::string return_memory_profile_str(const RuntimeProfile* profile) const {
std::stringstream ss;
profile->pretty_print(&ss);
return ss.str();
}

void init_memory_overview_counter();

std::unique_ptr<RuntimeProfile> _memory_overview_profile;
Expand Down
34 changes: 0 additions & 34 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,9 @@
#include <gen_cpp/types.pb.h>

#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"

namespace doris {

class AsyncCancelQueryTask : public Runnable {
ENABLE_FACTORY_CREATOR(AsyncCancelQueryTask);

public:
AsyncCancelQueryTask(TUniqueId query_id, const std::string& exceed_msg)
: _query_id(query_id), _exceed_msg(exceed_msg) {}
~AsyncCancelQueryTask() override = default;
void run() override {
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
_query_id, Status::MemoryLimitExceeded(_exceed_msg));
}

private:
TUniqueId _query_id;
const std::string _exceed_msg;
};

void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
Expand Down Expand Up @@ -71,20 +53,4 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
_limiter_tracker = old_mem_tracker;
}

void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
if (is_attach_query() && !_is_query_cancelled) {
Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
AsyncCancelQueryTask::create_shared(_query_id, exceed_msg));
if (submit_st.ok()) {
// Use this flag to avoid the cancel request submit to pool many times, because even we cancel the query
// successfully, but the application may not use if (state.iscancelled) to exist quickly. And it may try to
// allocate memory and may failed again and the pool will be full.
_is_query_cancelled = true;
} else {
LOG(WARNING) << "Failed to submit cancel query task to pool, query_id "
<< print_id(_query_id) << ", error st " << submit_st;
}
}
}

} // namespace doris
79 changes: 48 additions & 31 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <stdint.h>

#include <algorithm>
#include <memory>
Expand All @@ -35,11 +34,17 @@
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group.h"
#include "util/stack_util.h"
#include "util/uid_util.h"

namespace doris {

constexpr size_t SYNC_PROC_RESERVED_INTERVAL_BYTES = (1ULL << 20); // 1M
static std::string memory_orphan_check_msg =
"If you crash here, it means that SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. starting position of "
"each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging "
"to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the "
"thread will crash. If you want to switch MemTrackerLimiter during thread execution, "
"please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach.";

// Memory Hook is counted in the memory tracker of the current thread.
class ThreadMemTrackerMgr {
Expand All @@ -61,38 +66,42 @@ class ThreadMemTrackerMgr {
void detach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker =
ExecEnv::GetInstance()->orphan_mem_tracker());

void attach_task(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const std::weak_ptr<WorkloadGroup>& wg_wptr) {
DCHECK(mem_tracker);
// Orphan is thread default tracker.
// will only attach_task at the beginning of the thread function, there should be no duplicate attach_task.
DCHECK(_limiter_tracker->label() == "Orphan")
<< ", thread mem tracker label: " << _limiter_tracker->label()
<< ", attach mem tracker label: " << mem_tracker->label();
attach_limiter_tracker(mem_tracker);
_wg_wptr = wg_wptr;
enable_wait_gc();
}
void detach_task() {
detach_limiter_tracker();
_wg_wptr.reset();
disable_wait_gc();
}

// Must be fast enough! Thread update_tracker may be called very frequently.
bool push_consumer_tracker(MemTracker* mem_tracker);
void pop_consumer_tracker();
std::string last_consumer_tracker_label() {
return _consumer_tracker_stack.empty() ? "" : _consumer_tracker_stack.back()->label();
}

void set_query_id(const TUniqueId& query_id) { _query_id = query_id; }

TUniqueId query_id() { return _query_id; }

void set_wg_wptr(const std::weak_ptr<WorkloadGroup>& wg_wptr) { _wg_wptr = wg_wptr; }

void reset_wg_wptr() { _wg_wptr.reset(); }

// Note that, If call the memory allocation operation in Memory Hook,
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
// Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded.
void consume(int64_t size, int skip_large_memory_check = 0);
void consume(int64_t size);
void flush_untracked_mem();

doris::Status try_reserve(int64_t size);

void release_reserved();

bool is_attach_query() { return _query_id != TUniqueId(); }

bool is_query_cancelled() const { return _is_query_cancelled; }

void reset_query_cancelled_flag(bool new_val) { _is_query_cancelled = new_val; }

std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
CHECK(init());
return _limiter_tracker;
Expand All @@ -101,7 +110,6 @@ class ThreadMemTrackerMgr {
void enable_wait_gc() { _wait_gc = true; }
void disable_wait_gc() { _wait_gc = false; }
[[nodiscard]] bool wait_gc() const { return _wait_gc; }
void cancel_query(const std::string& exceed_msg);

std::string print_debug_string() {
fmt::memory_buffer consumer_tracker_buf;
Expand All @@ -118,6 +126,17 @@ class ThreadMemTrackerMgr {
int64_t untracked_mem() const { return _untracked_mem; }
int64_t reserved_mem() const { return _reserved_mem; }

int skip_memory_check = 0;
int skip_large_memory_check = 0;

void memory_orphan_check() {
#ifdef USE_MEM_TRACKER
DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check ||
_limiter_tracker->label() != "Orphan")
<< doris::memory_orphan_check_msg;
#endif
}

private:
struct LastAttachSnapshot {
int64_t reserved_mem = 0;
Expand All @@ -136,7 +155,7 @@ class ThreadMemTrackerMgr {
// so `attach_limiter_tracker` may be nested.
std::vector<LastAttachSnapshot> _last_attach_snapshots_stack;

std::string _failed_consume_msg = std::string();
std::string _failed_consume_msg;
// If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit.
// A thread of query/load will only wait once during execution.
bool _wait_gc = false;
Expand All @@ -147,14 +166,13 @@ class ThreadMemTrackerMgr {

// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
TUniqueId _query_id = TUniqueId();
bool _is_query_cancelled = false;
};

inline bool ThreadMemTrackerMgr::init() {
// 1. Initialize in the thread context when the thread starts
// 2. ExecEnv not initialized when thread start, initialized in limiter_mem_tracker().
if (_init) return true;
if (_init) { return true;
}
if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) {
_limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker();
_wait_gc = true;
Expand All @@ -178,7 +196,8 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
_consumer_tracker_stack.pop_back();
}

inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) {
inline void ThreadMemTrackerMgr::consume(int64_t size) {
memory_orphan_check();
// `consumer_tracker` not support reserve memory and not require use `_untracked_mem` to batch consume,
// because `consumer_tracker` will not be bound by many threads, so there is no performance problem.
for (auto* tracker : _consumer_tracker_stack) {
Expand Down Expand Up @@ -236,22 +255,18 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
size > doris::config::stacktrace_in_alloc_large_memory_bytes) {
_stop_consume = true;
LOG(WARNING) << fmt::format(
"alloc large memory: {}, {}, this is just a warning, not prevent memory alloc, "
"alloc large memory: {}, consume tracker: {}, this is just a warning, not prevent memory alloc, "
"stacktrace:\n{}",
size,
is_attach_query() ? "in query or load: " + print_id(_query_id)
: "not in query or load",
size, _limiter_tracker->label(),
get_stack_trace());
_stop_consume = false;
}
if (doris::config::crash_in_alloc_large_memory_bytes > 0 &&
size > doris::config::crash_in_alloc_large_memory_bytes) {
throw Exception(Status::FatalError(
"alloc large memory: {}, {}, crash generate core dumpsto help analyze, "
"alloc large memory: {}, consume tracker: {}, crash generate core dumpsto help analyze, "
"stacktrace:\n{}",
size,
is_attach_query() ? "in query or load: " + print_id(_query_id)
: "not in query or load",
size, _limiter_tracker->label(),
get_stack_trace()));
}
}
Expand Down Expand Up @@ -282,6 +297,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
DCHECK(_limiter_tracker);
DCHECK(size >= 0);
CHECK(init());
memory_orphan_check();
// if _reserved_mem not equal to 0, repeat reserve,
// _untracked_mem store bytes that not synchronized to process reserved memory.
flush_untracked_mem();
Expand Down Expand Up @@ -317,6 +333,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
}

inline void ThreadMemTrackerMgr::release_reserved() {
memory_orphan_check();
if (_reserved_mem != 0) {
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
_untracked_mem);
Expand Down
26 changes: 24 additions & 2 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/workload_management/cpu_context.h"
#include "runtime/workload_management/io_context.h"
#include "runtime/workload_management/memory_context.h"
#include "runtime/workload_management/task_controller.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
Expand Down Expand Up @@ -82,6 +86,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
_query_source(query_source) {
_init_query_mem_tracker();
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
_init_resource_context();
_query_watcher.start();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency");
Expand Down Expand Up @@ -128,16 +133,33 @@ void QueryContext::_init_query_mem_tracker() {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
_bytes_limit);
} else { // EXTERNAL
} else if (_query_options.query_type == TQueryType::EXTERNAL) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)),
MemTrackerLimiter::Type::QUERY, fmt::format("External#Id={}", print_id(_query_id)),
_bytes_limit);
} else {
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
}
}

void QueryContext::_init_resource_context() {
if (_query_options.query_type == TQueryType::SELECT) {
resource_context = ResourceContext::CreateResourceContext<QueryCPUContext, QueryMemoryContext, QueryIOContext, WorkloadGroupContext, QueryTaskController>();
} else if (_query_options.query_type == TQueryType::LOAD) {
resource_context = ResourceContext::CreateResourceContext<LoadCPUContext, LoadMemoryContext, LoadIOContext, WorkloadGroupContext, LoadTaskController>();
} else if (_query_options.query_type == TQueryType::EXTERNAL) {
resource_context = ResourceContext::CreateResourceContext<QueryCPUContext, QueryMemoryContext, QueryIOContext, WorkloadGroupContext, QueryTaskController>();
} else {
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
resource_context->memory_context()->set_memtracker_limiter(query_mem_tracker);
}

QueryContext::~QueryContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
// query mem tracker consumption is equal to 0, it means that after QueryContext is created,
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/query_statistics.h"
#include "runtime/workload_management/resource_context.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_predicate.h"
#include "util/hash_util.hpp"
Expand Down Expand Up @@ -235,6 +236,8 @@ class QueryContext {
// MemTracker that is shared by all fragment instances running on this host.
std::shared_ptr<MemTrackerLimiter> query_mem_tracker;

std::shared_ptr<ResourceContext> resource_context;

std::vector<TUniqueId> fragment_instance_ids;

// plan node id -> TFileScanRangeParams
Expand Down Expand Up @@ -283,6 +286,7 @@ class QueryContext {
std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};

void _init_query_mem_tracker();
void _init_resource_context();

std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller;
std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
Expand Down
Loading

0 comments on commit ca4c07e

Please sign in to comment.