diff --git a/be/src/connector/connector.h b/be/src/connector/connector.h index 42fea2cd97e3b..30682452ae950 100644 --- a/be/src/connector/connector.h +++ b/be/src/connector/connector.h @@ -18,6 +18,10 @@ #include #include +<<<<<<< HEAD +======= +#include "connector/connector_chunk_sink.h" +>>>>>>> 15a6518fae ([Enhancement] reduce mem alloc failed because unfair memory sharing (#50686)) #include "exec/pipeline/scan/morsel.h" #include "exprs/runtime_filter_bank.h" #include "gen_cpp/InternalService_types.h" @@ -118,7 +122,7 @@ class DataSourceProvider { public: static constexpr int64_t MIN_DATA_SOURCE_MEM_BYTES = 16 * 1024 * 1024; // 16MB static constexpr int64_t MAX_DATA_SOURCE_MEM_BYTES = 256 * 1024 * 1024; // 256MB - static constexpr int64_t PER_FIELD_MEM_BYTES = 4 * 1024 * 1024; // 4MB + static constexpr int64_t PER_FIELD_MEM_BYTES = 1 * 1024 * 1024; // 1MB virtual ~DataSourceProvider() = default; diff --git a/be/src/exec/pipeline/fragment_executor.cpp b/be/src/exec/pipeline/fragment_executor.cpp index eda89a38307af..73d18d87b1057 100644 --- a/be/src/exec/pipeline/fragment_executor.cpp +++ b/be/src/exec/pipeline/fragment_executor.cpp @@ -241,8 +241,15 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified spill_mem_limit_ratio = query_options.spill_mem_limit_threshold; } +<<<<<<< HEAD +======= + int scan_node_number = 1; + if (query_globals.__isset.scan_node_number) { + scan_node_number = query_globals.scan_node_number; + } +>>>>>>> 15a6518fae ([Enhancement] reduce mem alloc failed because unfair memory sharing (#50686)) _query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_ratio, - wg.get(), runtime_state); + wg.get(), runtime_state, scan_node_number); auto query_mem_tracker = _query_ctx->mem_tracker(); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(query_mem_tracker.get()); diff --git a/be/src/exec/pipeline/query_context.cpp b/be/src/exec/pipeline/query_context.cpp index 382962b89e139..4ff6fe7e1bf9b 100644 --- a/be/src/exec/pipeline/query_context.cpp +++ b/be/src/exec/pipeline/query_context.cpp @@ -106,7 +106,7 @@ void QueryContext::cancel(const Status& status) { void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit, std::optional spill_mem_reserve_ratio, workgroup::WorkGroup* wg, - RuntimeState* runtime_state) { + RuntimeState* runtime_state, int scan_node_number) { std::call_once(_init_mem_tracker_once, [=]() { _profile = std::make_shared("Query" + print_id(_query_id)); auto* mem_tracker_counter = @@ -137,8 +137,8 @@ void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, if (big_query_mem_limit > 0) { _static_query_mem_limit = std::min(big_query_mem_limit, _static_query_mem_limit); } - _connector_scan_operator_mem_share_arbitrator = - _object_pool.add(new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit)); + _connector_scan_operator_mem_share_arbitrator = _object_pool.add( + new ConnectorScanOperatorMemShareArbitrator(_static_query_mem_limit, scan_node_number)); { MemTracker* connector_scan_parent = GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker(); diff --git a/be/src/exec/pipeline/query_context.h b/be/src/exec/pipeline/query_context.h index 7c31e67e6a6b8..6dcdc2fb27ad9 100644 --- a/be/src/exec/pipeline/query_context.h +++ b/be/src/exec/pipeline/query_context.h @@ -154,7 +154,7 @@ class QueryContext : public std::enable_shared_from_this { /// that there is a big query memory limit of this resource group. void init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit = -1, std::optional spill_mem_limit = std::nullopt, workgroup::WorkGroup* wg = nullptr, - RuntimeState* state = nullptr); + RuntimeState* state = nullptr, int scan_node_number = 1); std::shared_ptr mem_tracker() { return _mem_tracker; } MemTracker* connector_scan_mem_tracker() { return _connector_scan_mem_tracker.get(); } diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.cpp b/be/src/exec/pipeline/scan/connector_scan_operator.cpp index 6e3a41725a9c2..509c3d3f59b52 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/connector_scan_operator.cpp @@ -24,6 +24,12 @@ namespace starrocks::pipeline { // ==================== ConnectorScanOperatorFactory ==================== +ConnectorScanOperatorMemShareArbitrator::ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, + int scan_node_number) + : query_mem_limit(query_mem_limit), + scan_mem_limit(query_mem_limit), + total_chunk_source_mem_bytes(scan_node_number * connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES) {} + int64_t ConnectorScanOperatorMemShareArbitrator::update_chunk_source_mem_bytes(int64_t old_value, int64_t new_value) { int64_t diff = new_value - old_value; int64_t total = total_chunk_source_mem_bytes.fetch_add(diff) + diff; @@ -292,7 +298,8 @@ ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int3 _adaptive_processor->started_running = true; int64_t c = L->update_active_scan_operator_count(1); if (c == 0) { - _adjust_scan_mem_limit(0, L->get_arb_chunk_source_mem_bytes()); + _adjust_scan_mem_limit(connector::DataSourceProvider::MAX_DATA_SOURCE_MEM_BYTES, + L->get_arb_chunk_source_mem_bytes()); } } diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.h b/be/src/exec/pipeline/scan/connector_scan_operator.h index b41fc879a3451..f9351ffbca3b3 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.h +++ b/be/src/exec/pipeline/scan/connector_scan_operator.h @@ -35,8 +35,7 @@ struct ConnectorScanOperatorMemShareArbitrator { int64_t scan_mem_limit = 0; std::atomic total_chunk_source_mem_bytes = 0; - ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit) - : query_mem_limit(query_mem_limit), scan_mem_limit(query_mem_limit) {} + ConnectorScanOperatorMemShareArbitrator(int64_t query_mem_limit, int scan_node_number); int64_t set_scan_mem_ratio(double mem_ratio) { scan_mem_limit = std::max(1, query_mem_limit * mem_ratio); diff --git a/be/src/exec/workgroup/work_group.cpp b/be/src/exec/workgroup/work_group.cpp index 0289f9c081ca0..b2601ea89d019 100644 --- a/be/src/exec/workgroup/work_group.cpp +++ b/be/src/exec/workgroup/work_group.cpp @@ -177,9 +177,9 @@ void WorkGroup::init() { _scan_sched_entity.set_queue(workgroup::create_scan_task_queue()); _connector_scan_sched_entity.set_queue(workgroup::create_scan_task_queue()); - _connector_scan_mem_tracker = std::make_shared( - MemTracker::RESOURCE_GROUP, _memory_limit_bytes * config::connector_scan_use_query_mem_ratio, - _name + "/connector_scan", GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker()); + _connector_scan_mem_tracker = + std::make_shared(MemTracker::RESOURCE_GROUP, _memory_limit_bytes, _name + "/connector_scan", + GlobalEnv::GetInstance()->connector_scan_pool_mem_tracker()); } std::string WorkGroup::to_string() const { diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 0434b4521c3c1..622531180224d 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -200,8 +200,7 @@ Status GlobalEnv::_init_mem_tracker() { int64_t query_pool_spill_limit = query_pool_mem_limit * config::query_pool_spill_mem_limit_threshold; _query_pool_mem_tracker->set_reserve_limit(query_pool_spill_limit); _connector_scan_pool_mem_tracker = - regist_tracker(MemTracker::QUERY_POOL, query_pool_mem_limit * config::connector_scan_use_query_mem_ratio, - "query_pool/connector_scan", nullptr); + regist_tracker(MemTracker::QUERY_POOL, query_pool_mem_limit, "query_pool/connector_scan", nullptr); int64_t load_mem_limit = calc_max_load_memory(_process_mem_tracker->limit()); _load_mem_tracker = regist_tracker(MemTracker::LOAD, load_mem_limit, "load", process_mem_tracker()); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java index 34ef45991018b..28a4282ebf74c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java @@ -101,6 +101,7 @@ public static JobSpec fromQuerySpec(ConnectContext context, if (context.getLastQueryId() != null) { queryGlobals.setLast_query_id(context.getLastQueryId().toString()); } + queryGlobals.setScan_node_number(scanNodes.size()); return new Builder() .queryId(context.getExecutionId()) @@ -129,6 +130,7 @@ public static JobSpec fromMVMaintenanceJobSpec(ConnectContext context, if (context.getLastQueryId() != null) { queryGlobals.setLast_query_id(context.getLastQueryId().toString()); } + queryGlobals.setScan_node_number(scanNodes.size()); return new Builder() .queryId(context.getExecutionId()) diff --git a/gensrc/thrift/InternalService.thrift b/gensrc/thrift/InternalService.thrift index 4383ea55d2b29..aaa3c7dfdb7e6 100644 --- a/gensrc/thrift/InternalService.thrift +++ b/gensrc/thrift/InternalService.thrift @@ -140,7 +140,7 @@ struct TQueryQueueOptions { struct TQueryOptions { 2: optional i32 max_errors = 0 4: optional i32 batch_size = 0 - + 12: optional i64 mem_limit = 2147483648 13: optional bool abort_on_default_limit_exceeded = 0 14: optional i32 query_timeout = 3600 @@ -180,7 +180,7 @@ struct TQueryOptions { 59: optional bool enable_tablet_internal_parallel; 60: optional i32 query_delivery_timeout; - + 61: optional bool enable_query_debug_trace; 62: optional Types.TCompressionType load_transmission_compression_type; @@ -194,7 +194,7 @@ struct TQueryOptions { 67: optional bool enable_pipeline_query_statistic = false; 68: optional i32 transmission_encode_level; - + 69: optional bool enable_populate_datacache; 70: optional bool allow_throw_exception = 0; @@ -212,8 +212,13 @@ struct TQueryOptions { 78: optional i32 spill_encode_level; 79: optional i64 spill_revocable_max_bytes; +<<<<<<< HEAD 85: optional TSpillMode spill_mode; +======= + 82: optional TSpillOptions spill_options; + +>>>>>>> 15a6518fae ([Enhancement] reduce mem alloc failed because unfair memory sharing (#50686)) 86: optional i32 io_tasks_per_scan_operator = 4; 87: optional i32 connector_io_tasks_per_scan_operator = 16; 88: optional double runtime_filter_early_return_selectivity = 0.05; @@ -341,6 +346,8 @@ struct TQueryGlobals { 30: optional string last_query_id 31: optional i64 timestamp_us + + 32: optional i64 scan_node_number } @@ -406,7 +413,7 @@ struct TExecPlanFragmentParams { 53: optional WorkGroup.TWorkGroup workgroup 54: optional bool enable_resource_group 55: optional i32 func_version - + // Sharing data between drivers of same scan operator 56: optional bool enable_shared_scan