Skip to content

Commit

Permalink
[Enhancement] reduce mem alloc failed because unfair memory sharing (#…
Browse files Browse the repository at this point in the history
…50686)

Signed-off-by: yanz <[email protected]>
(cherry picked from commit 15a6518)

# Conflicts:
#	be/src/connector/connector.h
#	be/src/exec/pipeline/fragment_executor.cpp
#	gensrc/thrift/InternalService.thrift
  • Loading branch information
dirtysalt authored and mergify[bot] committed Sep 5, 2024
1 parent c5af6d0 commit 840f699
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 18 deletions.
6 changes: 5 additions & 1 deletion be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include <string>
#include <unordered_map>

<<<<<<< HEAD
=======
#include "connector/connector_chunk_sink.h"
>>>>>>> 15a6518fae ([Enhancement] reduce mem alloc failed because unfair memory sharing (#50686))
#include "exec/pipeline/scan/morsel.h"
#include "exprs/runtime_filter_bank.h"
#include "gen_cpp/InternalService_types.h"
Expand Down Expand Up @@ -118,7 +122,7 @@ class DataSourceProvider {
public:
static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB
static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB
static constexpr int64_t PER_FIELD_MEM_BYTES = 4 * 1024 * 1024; // 4MB
static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB

virtual ~DataSourceProvider() = default;

Expand Down
9 changes: 8 additions & 1 deletion be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,15 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
spill_mem_limit_ratio = query_options.spill_mem_limit_threshold;
}

<<<<<<< HEAD
=======
int scan_node_number = 1;
if (query_globals.__isset.scan_node_number) {
scan_node_number = query_globals.scan_node_number;
}
>>>>>>> 15a6518fae ([Enhancement] reduce mem alloc failed because unfair memory sharing (#50686))
_query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_ratio,
wg.get(), runtime_state);
wg.get(), runtime_state, scan_node_number);

auto query_mem_tracker = _query_ctx->mem_tracker();
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(query_mem_tracker.get());
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void QueryContext::cancel(const Status& status) {

void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit,
std::optional<double> spill_mem_reserve_ratio, workgroup::WorkGroup* wg,
RuntimeState* runtime_state) {
RuntimeState* runtime_state, int scan_node_number) {
std::call_once(_init_mem_tracker_once, [=]() {
_profile = std::make_shared<RuntimeProfile>("Query" + print_id(_query_id));
auto* mem_tracker_counter =
Expand Down Expand Up @@ -137,8 +137,8 @@ void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent,
if (big_query_mem_limit > 0) {
_static_query_mem_limit = std::min(big_query_mem_limit, _static_query_mem_limit);
}
_connector_scan_operator_mem_share_arbitrator =
_object_pool.add(new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit));
_connector_scan_operator_mem_share_arbitrator = _object_pool.add(
new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, scan_node_number));

{
MemTracker* connector_scan_parent = GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
/// that there is a big query memory limit of this resource group.
void init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit = -1,
std::optional<double> spill_mem_limit = std::nullopt, workgroup::WorkGroup* wg = nullptr,
RuntimeState* state = nullptr);
RuntimeState* state = nullptr, int scan_node_number = 1);
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
MemTracker* connector_scan_mem_tracker() { return _connector_scan_mem_tracker.get(); }

Expand Down
9 changes: 8 additions & 1 deletion be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
namespace starrocks::pipeline {

// ==================== ConnectorScanOperatorFactory ====================
ConnectorScanOperatorMemShareArbitrator::ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit,
int scan_node_number)
: query_mem_limit(query_mem_limit),
scan_mem_limit(query_mem_limit),
total_chunk_source_mem_bytes(scan_node_number * connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES) {}

int64_t ConnectorScanOperatorMemShareArbitrator::update_chunk_source_mem_bytes(int64_t old_value, int64_t new_value) {
int64_t diff = new_value - old_value;
int64_t total = total_chunk_source_mem_bytes.fetch_add(diff) + diff;
Expand Down Expand Up @@ -292,7 +298,8 @@ ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int3
_adaptive_processor->started_running = true;
int64_t c = L->update_active_scan_operator_count(1);
if (c == 0) {
_adjust_scan_mem_limit(0, L->get_arb_chunk_source_mem_bytes());
_adjust_scan_mem_limit(connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES,
L->get_arb_chunk_source_mem_bytes());
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ struct ConnectorScanOperatorMemShareArbitrator {
int64_t scan_mem_limit = 0;
std::atomic<int64_t> total_chunk_source_mem_bytes = 0;

ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit)
: query_mem_limit(query_mem_limit), scan_mem_limit(query_mem_limit) {}
ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int scan_node_number);

int64_t set_scan_mem_ratio(double mem_ratio) {
scan_mem_limit = std::max<int64_t>(1, query_mem_limit * mem_ratio);
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/workgroup/work_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ void WorkGroup::init() {
_scan_sched_entity.set_queue(workgroup::create_scan_task_queue());
_connector_scan_sched_entity.set_queue(workgroup::create_scan_task_queue());

_connector_scan_mem_tracker = std::make_shared<MemTracker>(
MemTracker::RESOURCE_GROUP, _memory_limit_bytes * config::connector_scan_use_query_mem_ratio,
_name + "/connector_scan", GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker());
_connector_scan_mem_tracker =
std::make_shared<MemTracker>(MemTracker::RESOURCE_GROUP, _memory_limit_bytes, _name + "/connector_scan",
GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker());
}

std::string WorkGroup::to_string() const {
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ Status GlobalEnv::_init_mem_tracker() {
int64_t query_pool_spill_limit = query_pool_mem_limit * config::query_pool_spill_mem_limit_threshold;
_query_pool_mem_tracker->set_reserve_limit(query_pool_spill_limit);
_connector_scan_pool_mem_tracker =
regist_tracker(MemTracker::QUERY_POOL, query_pool_mem_limit * config::connector_scan_use_query_mem_ratio,
"query_pool/connector_scan", nullptr);
regist_tracker(MemTracker::QUERY_POOL, query_pool_mem_limit, "query_pool/connector_scan", nullptr);

int64_t load_mem_limit = calc_max_load_memory(_process_mem_tracker->limit());
_load_mem_tracker = regist_tracker(MemTracker::LOAD, load_mem_limit, "load", process_mem_tracker());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public static JobSpec fromQuerySpec(ConnectContext context,
if (context.getLastQueryId() != null) {
queryGlobals.setLast_query_id(context.getLastQueryId().toString());
}
queryGlobals.setScan_node_number(scanNodes.size());

return new Builder()
.queryId(context.getExecutionId())
Expand Down Expand Up @@ -129,6 +130,7 @@ public static JobSpec fromMVMaintenanceJobSpec(ConnectContext context,
if (context.getLastQueryId() != null) {
queryGlobals.setLast_query_id(context.getLastQueryId().toString());
}
queryGlobals.setScan_node_number(scanNodes.size());

return new Builder()
.queryId(context.getExecutionId())
Expand Down
15 changes: 11 additions & 4 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ struct TQueryQueueOptions {
struct TQueryOptions {
2: optional i32 max_errors = 0
4: optional i32 batch_size = 0

12: optional i64 mem_limit = 2147483648
13: optional bool abort_on_default_limit_exceeded = 0
14: optional i32 query_timeout = 3600
Expand Down Expand Up @@ -180,7 +180,7 @@ struct TQueryOptions {
59: optional bool enable_tablet_internal_parallel;

60: optional i32 query_delivery_timeout;

61: optional bool enable_query_debug_trace;

62: optional Types.TCompressionType load_transmission_compression_type;
Expand All @@ -194,7 +194,7 @@ struct TQueryOptions {
67: optional bool enable_pipeline_query_statistic = false;

68: optional i32 transmission_encode_level;

69: optional bool enable_populate_datacache;

70: optional bool allow_throw_exception = 0;
Expand All @@ -212,8 +212,13 @@ struct TQueryOptions {
78: optional i32 spill_encode_level;
79: optional i64 spill_revocable_max_bytes;

<<<<<<< HEAD
85: optional TSpillMode spill_mode;

=======
82: optional TSpillOptions spill_options;

>>>>>>> 15a6518fae ([Enhancement] reduce mem alloc failed because unfair memory sharing (#50686))
86: optional i32 io_tasks_per_scan_operator = 4;
87: optional i32 connector_io_tasks_per_scan_operator = 16;
88: optional double runtime_filter_early_return_selectivity = 0.05;
Expand Down Expand Up @@ -341,6 +346,8 @@ struct TQueryGlobals {
30: optional string last_query_id

31: optional i64 timestamp_us

32: optional i64 scan_node_number
}


Expand Down Expand Up @@ -406,7 +413,7 @@ struct TExecPlanFragmentParams {
53: optional WorkGroup.TWorkGroup workgroup
54: optional bool enable_resource_group
55: optional i32 func_version

// Sharing data between drivers of same scan operator
56: optional bool enable_shared_scan

Expand Down

0 comments on commit 840f699

Please sign in to comment.