From b295da5e1882c66fd1b39f7a6f792c2563ad8cca Mon Sep 17 00:00:00 2001 From: "zihe.liu" Date: Sat, 7 Sep 2024 09:51:16 +0800 Subject: [PATCH] [Enhancement] Show resource group in global and local current_queries and fix check bind_cpu (backport #50763 #50809) (#50808) Signed-off-by: zihe.liu --- .../exec/workgroup/pipeline_executor_set.cpp | 16 +++++++++-- be/src/exec/workgroup/pipeline_executor_set.h | 4 +-- .../pipeline_executor_set_manager.cpp | 5 ++-- .../pipeline_blocking_drivers_action.cpp | 7 ++--- be/src/runtime/exec_env.cpp | 1 + .../starrocks/catalog/ResourceGroupMgr.java | 4 +++ .../proc/CurrentQueryStatisticsProcDir.java | 1 + .../com/starrocks/qe/DefaultCoordinator.java | 7 +++++ .../com/starrocks/qe/QeProcessorImpl.java | 4 ++- .../com/starrocks/qe/QueryStatisticsInfo.java | 28 +++++++++++++++---- .../com/starrocks/qe/QueryStatisticsItem.java | 12 ++++++++ .../starrocks/qe/scheduler/Coordinator.java | 2 ++ .../qe/scheduler/FeExecuteCoordinator.java | 5 ++++ .../analysis/ResourceGroupStmtTest.java | 12 ++++---- ...rrentGlobalQueryStatisticsProcDirTest.java | 12 +++++--- .../CurrentQueryStatisticsProcDirTest.java | 6 ++++ .../starrocks/qe/QueryStatisticsInfoTest.java | 3 +- .../starrocks/qe/scheduler/JobSpecTest.java | 9 ++++++ gensrc/thrift/FrontendService.thrift | 1 + 19 files changed, 109 insertions(+), 30 deletions(-) diff --git a/be/src/exec/workgroup/pipeline_executor_set.cpp b/be/src/exec/workgroup/pipeline_executor_set.cpp index 15c5788b235e7..9bf9a4ca22c96 100644 --- a/be/src/exec/workgroup/pipeline_executor_set.cpp +++ b/be/src/exec/workgroup/pipeline_executor_set.cpp @@ -26,6 +26,16 @@ namespace starrocks::workgroup { // PipelineExecutorSetConfig // ------------------------------------------------------------------------------------ +static CpuUtil::CpuIds limit_total_cpuids(CpuUtil::CpuIds&& total_cpuids, uint32_t num_total_cores) { + if (total_cpuids.empty() || total_cpuids.size() <= num_total_cores) { + return std::move(total_cpuids); + } + + CpuUtil::CpuIds cpuids; + std::copy_n(total_cpuids.begin(), num_total_cores, std::back_inserter(cpuids)); + return cpuids; +} + PipelineExecutorSetConfig::PipelineExecutorSetConfig(uint32_t num_total_cores, uint32_t num_total_driver_threads, uint32_t num_total_scan_threads, uint32_t num_total_connector_scan_threads, @@ -35,9 +45,9 @@ PipelineExecutorSetConfig::PipelineExecutorSetConfig(uint32_t num_total_cores, u num_total_driver_threads(num_total_driver_threads), num_total_scan_threads(num_total_scan_threads), num_total_connector_scan_threads(num_total_connector_scan_threads), - total_cpuids(std::move(total_cpuids)), + total_cpuids(limit_total_cpuids(std::move(total_cpuids), num_total_cores)), enable_bind_cpus(enable_bind_cpus), - enable_cpu_borrowing(enable_cpu_borrowing) {} + enable_cpu_borrowing(enable_cpu_borrowing && enable_bind_cpus) {} std::string PipelineExecutorSetConfig::to_string() const { return fmt::format( @@ -172,7 +182,7 @@ void PipelineExecutorSet::notify_config_changed() const { } uint32_t PipelineExecutorSet::calculate_num_threads(uint32_t num_total_threads) const { - if (!_borrowed_cpu_ids.empty() || _cpuids.empty()) { + if (!_borrowed_cpu_ids.empty()) { return num_total_threads; } return std::max(1, num_total_threads * _cpuids.size() / _conf.num_total_cores); diff --git a/be/src/exec/workgroup/pipeline_executor_set.h b/be/src/exec/workgroup/pipeline_executor_set.h index da59411d92fc9..0f0a49aa54703 100644 --- a/be/src/exec/workgroup/pipeline_executor_set.h +++ b/be/src/exec/workgroup/pipeline_executor_set.h @@ -33,9 +33,9 @@ struct PipelineExecutorSetConfig { const uint32_t num_total_scan_threads; uint32_t num_total_connector_scan_threads; - CpuUtil::CpuIds total_cpuids; + const CpuUtil::CpuIds total_cpuids; - bool enable_bind_cpus; + const bool enable_bind_cpus; bool enable_cpu_borrowing; }; diff --git a/be/src/exec/workgroup/pipeline_executor_set_manager.cpp b/be/src/exec/workgroup/pipeline_executor_set_manager.cpp index 6ec0abf47929e..ed7af7a06d86f 100644 --- a/be/src/exec/workgroup/pipeline_executor_set_manager.cpp +++ b/be/src/exec/workgroup/pipeline_executor_set_manager.cpp @@ -147,10 +147,11 @@ void ExecutorsManager::change_num_connector_scan_threads(uint32_t num_connector_ } void ExecutorsManager::change_enable_resource_group_cpu_borrowing(bool val) { - if (_conf.enable_cpu_borrowing == val) { + const bool new_val = val && _conf.enable_bind_cpus; + if (_conf.enable_cpu_borrowing == new_val) { return; } - _conf.enable_cpu_borrowing = val; + _conf.enable_cpu_borrowing = new_val; update_shared_executors(); } diff --git a/be/src/http/action/pipeline_blocking_drivers_action.cpp b/be/src/http/action/pipeline_blocking_drivers_action.cpp index 0d1abb8a961f7..fecb782f1d24b 100644 --- a/be/src/http/action/pipeline_blocking_drivers_action.cpp +++ b/be/src/http/action/pipeline_blocking_drivers_action.cpp @@ -161,11 +161,8 @@ void PipelineBlockingDriversAction::_handle_stat(HttpRequest* req) { }; QueryMap query_map_in_wg; - _exec_env->workgroup_manager()->for_each_workgroup([&](const workgroup::WorkGroup& wg) { - if (wg.exclusive_executors() != nullptr) { - wg.exclusive_executors()->driver_executor()->iterate_immutable_blocking_driver( - iterate_func_generator(query_map_in_wg)); - } + _exec_env->workgroup_manager()->for_each_executors([&](const workgroup::PipelineExecutorSet& executor) { + executor.driver_executor()->iterate_immutable_blocking_driver(iterate_func_generator(query_map_in_wg)); }); rapidjson::Document queries_in_wg_obj = query_map_to_doc_func(query_map_in_wg); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index f94186f01d2d3..c40c68736417d 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -435,6 +435,7 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { // Disable bind cpus when cgroup has cpu quota but no cpuset. const bool enable_bind_cpus = config::enable_resource_group_bind_cpus && (!CpuInfo::is_cgroup_with_cpu_quota() || CpuInfo::is_cgroup_with_cpuset()); + config::enable_resource_group_bind_cpus = enable_bind_cpus; workgroup::PipelineExecutorSetConfig executors_manager_opts( CpuInfo::num_cores(), _max_executor_threads, num_io_threads, connector_num_io_threads, CpuInfo::get_core_ids(), enable_bind_cpus, config::enable_resource_group_cpu_borrowing); diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupMgr.java b/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupMgr.java index fe11635a5925f..19c57df8e0bed 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroupMgr.java @@ -150,6 +150,10 @@ public void createResourceGroup(CreateResourceGroupStmt stmt) throws DdlExceptio dropResourceGroupUnlocked(wg.getName()); } + if (wg.getCpuWeight() == null) { + wg.setCpuWeight(0); + } + if (ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME.equals(wg.getName())) { wg.setId(ResourceGroup.DEFAULT_WG_ID); } else if (ResourceGroup.DEFAULT_MV_RESOURCE_GROUP_NAME.equals(wg.getName())) { diff --git a/fe/fe-core/src/main/java/com/starrocks/common/proc/CurrentQueryStatisticsProcDir.java b/fe/fe-core/src/main/java/com/starrocks/common/proc/CurrentQueryStatisticsProcDir.java index 922bff3d89423..2469c9264ff7d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/proc/CurrentQueryStatisticsProcDir.java @@ -66,6 +66,7 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { .add("CPUTime") .add("ExecTime") .add("Warehouse") + .add("ResourceGroup") .build(); @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java index 1c2be92557a2e..5c95a86674f1a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java @@ -41,6 +41,7 @@ import com.starrocks.analysis.DescriptorTable; import com.starrocks.authentication.AuthenticationMgr; import com.starrocks.catalog.FsBroker; +import com.starrocks.catalog.ResourceGroup; import com.starrocks.common.AnalysisException; import com.starrocks.common.Config; import com.starrocks.common.FeConstants; @@ -1173,6 +1174,12 @@ public String getWarehouseName() { return connectContext.getSessionVariable().getWarehouseName(); } + @Override + public String getResourceGroupName() { + return jobSpec.getResourceGroup() == null ? ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME : + jobSpec.getResourceGroup().getName(); + } + private void execShortCircuit() throws Exception { shortCircuitExecutor.exec(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java index e183977b55e03..8f030c84691ab 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java @@ -172,7 +172,9 @@ public Map getQueryStatistics() { .db(context.getDatabase()) .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos()) .profile(info.getCoord().getQueryProfile()) - .warehouseName(info.coord.getWarehouseName()).build(); + .warehouseName(info.coord.getWarehouseName()) + .resourceGroupName(info.coord.getResourceGroupName()) + .build(); querySet.put(queryIdStr, item); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsInfo.java b/fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsInfo.java index 53546c13b5050..e901ad11cd53e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsInfo.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsInfo.java @@ -62,13 +62,14 @@ public class QueryStatisticsInfo { private long spillBytes; private long execTime; private String wareHouseName; + private String resourceGroupName; public QueryStatisticsInfo() { } public QueryStatisticsInfo(long queryStartTime, String feIp, String queryId, String connId, String db, String user, long cpuCostNs, long scanBytes, long scanRows, long memUsageBytes, long spillBytes, - long execTime, String wareHouseName) { + long execTime, String wareHouseName, String resourceGroupName) { this.queryStartTime = queryStartTime; this.feIp = feIp; this.queryId = queryId; @@ -82,6 +83,7 @@ public QueryStatisticsInfo(long queryStartTime, String feIp, String queryId, Str this.spillBytes = spillBytes; this.execTime = execTime; this.wareHouseName = wareHouseName; + this.resourceGroupName = resourceGroupName; } public long getQueryStartTime() { @@ -136,6 +138,9 @@ public String getWareHouseName() { return wareHouseName; } + public String getResourceGroupName() { + return resourceGroupName; + } public QueryStatisticsInfo withQueryStartTime(long queryStartTime) { this.queryStartTime = queryStartTime; return this; @@ -201,6 +206,11 @@ public QueryStatisticsInfo withWareHouseName(String warehouseName) { return this; } + public QueryStatisticsInfo withResourceGroupName(String resourceGroupName) { + this.resourceGroupName = resourceGroupName; + return this; + } + public TQueryStatisticsInfo toThrift() { return new TQueryStatisticsInfo() .setQueryStartTime(queryStartTime) @@ -215,7 +225,8 @@ public TQueryStatisticsInfo toThrift() { .setMemUsageBytes(memUsageBytes) .setSpillBytes(spillBytes) .setExecTime(execTime) - .setWareHouseName(wareHouseName); + .setWareHouseName(wareHouseName) + .setResourceGroupName(resourceGroupName); } public static QueryStatisticsInfo fromThrift(TQueryStatisticsInfo tinfo) { @@ -232,7 +243,8 @@ public static QueryStatisticsInfo fromThrift(TQueryStatisticsInfo tinfo) { .withSpillBytes(tinfo.getSpillBytes()) .withCpuCostNs(tinfo.getCpuCostNs()) .withExecTime(tinfo.getExecTime()) - .withWareHouseName(tinfo.getWareHouseName()); + .withWareHouseName(tinfo.getWareHouseName()) + .withResourceGroupName(tinfo.getResourceGroupName()); } public List formatToList() { @@ -250,6 +262,7 @@ public List formatToList() { values.add(QueryStatisticsFormatter.getSecondsFromNano(this.getCpuCostNs())); values.add(QueryStatisticsFormatter.getSecondsFromMilli(this.getExecTime())); values.add(this.getWareHouseName()); + values.add(this.getResourceGroupName()); return values; } @@ -267,13 +280,14 @@ public boolean equals(Object o) { Objects.equals(db, that.db) && Objects.equals(user, that.user) && cpuCostNs == that.cpuCostNs && scanBytes == that.scanBytes && scanRows == that.scanRows && memUsageBytes == that.memUsageBytes && spillBytes == that.spillBytes && execTime == that.execTime && - Objects.equals(wareHouseName, that.wareHouseName); + Objects.equals(wareHouseName, that.wareHouseName) && + Objects.equals(resourceGroupName, that.resourceGroupName); } @Override public int hashCode() { return Objects.hash(queryStartTime, feIp, queryId, connId, db, user, cpuCostNs, scanBytes, scanRows, memUsageBytes, - spillBytes, execTime, wareHouseName); + spillBytes, execTime, wareHouseName, resourceGroupName); } @Override @@ -291,6 +305,7 @@ public String toString() { ", spillBytes=" + spillBytes + ", execTime=" + execTime + ", wareHouseName=" + wareHouseName + + ", resourceGroupName=" + resourceGroupName + '}'; } @@ -325,7 +340,8 @@ public static List makeListFromMetricsAndMgrs() throws Anal .withSpillBytes(statistics.getSpillBytes()) .withCpuCostNs(statistics.getCpuCostNs()) .withExecTime(item.getQueryExecTime()) - .withWareHouseName(item.getWarehouseName()); + .withWareHouseName(item.getWarehouseName()) + .withResourceGroupName(item.getResourceGroupName()); sortedRowData.add(info); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsItem.java b/fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsItem.java index 5eb793fbf1c63..f94ecac16a09a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsItem.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsItem.java @@ -37,6 +37,7 @@ public final class QueryStatisticsItem { private final RuntimeProfile queryProfile; private final TUniqueId executionId; private final String warehouseName; + private final String resourceGroupName; private QueryStatisticsItem(Builder builder) { this.queryId = builder.queryId; @@ -49,6 +50,7 @@ private QueryStatisticsItem(Builder builder) { this.queryProfile = builder.queryProfile; this.executionId = builder.executionId; this.warehouseName = builder.warehouseName; + this.resourceGroupName = builder.resourceGroupName; } public String getDb() { @@ -96,6 +98,10 @@ public String getWarehouseName() { return warehouseName; } + public String getResourceGroupName() { + return resourceGroupName; + } + public static final class Builder { private String queryId; private String db; @@ -107,6 +113,7 @@ public static final class Builder { private RuntimeProfile queryProfile; private TUniqueId executionId; private String warehouseName; + private String resourceGroupName; public Builder() { fragmentInstanceInfos = Lists.newArrayList(); @@ -162,6 +169,11 @@ public Builder warehouseName(String warehouseName) { return this; } + public Builder resourceGroupName(String resourceGroupName) { + this.resourceGroupName = resourceGroupName; + return this; + } + public QueryStatisticsItem build() { initDefaultValue(this); return new QueryStatisticsItem(this); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java index 9e44783d980d7..9de8ca2cf5ac8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java @@ -232,5 +232,7 @@ public static List getCommitInfos(Coordinator coord) { public abstract String getWarehouseName(); + public abstract String getResourceGroupName(); + public abstract boolean isShortCircuit(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/FeExecuteCoordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/FeExecuteCoordinator.java index 108e7e88cd665..651a544c5d4cd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/FeExecuteCoordinator.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/FeExecuteCoordinator.java @@ -303,6 +303,11 @@ public String getWarehouseName() { return connectContext.getSessionVariable().getWarehouseName(); } + @Override + public String getResourceGroupName() { + return ""; + } + public boolean isShortCircuit() { return false; } diff --git a/fe/fe-core/src/test/java/com/starrocks/analysis/ResourceGroupStmtTest.java b/fe/fe-core/src/test/java/com/starrocks/analysis/ResourceGroupStmtTest.java index 4a23bc85600d1..89dac2642ae5b 100644 --- a/fe/fe-core/src/test/java/com/starrocks/analysis/ResourceGroupStmtTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/analysis/ResourceGroupStmtTest.java @@ -1436,7 +1436,7 @@ public void testGetDedicateCpuCores() throws Exception { assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" + "default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" + "rg1|17|0|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" + - "rg2|null|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)\n" + + "rg2|0|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)\n" + "rt_rg1|15|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rt_rg1_user)"); starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1"); @@ -1554,7 +1554,7 @@ public void testValidateCpuParametersForCreate() throws Exception { List> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all"); assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" + "default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" + - "rg1|null|17|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)"); + "rg1|0|17|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)"); starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1"); } } @@ -1720,8 +1720,8 @@ public void testValidateSumExclusiveCpuCores() throws Exception { List> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all"); assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" + "default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" + - "rg1|null|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" + - "rg2|null|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)"); + "rg1|0|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" + + "rg2|0|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)"); } { @@ -1745,8 +1745,8 @@ public void testValidateSumExclusiveCpuCores() throws Exception { List> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all"); assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" + "default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" + - "rg1|null|14|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" + - "rg2|null|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)"); + "rg1|0|14|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" + + "rg2|0|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)"); } { diff --git a/fe/fe-core/src/test/java/com/starrocks/common/proc/CurrentGlobalQueryStatisticsProcDirTest.java b/fe/fe-core/src/test/java/com/starrocks/common/proc/CurrentGlobalQueryStatisticsProcDirTest.java index 05793f65cc9ba..7faf031fb3b78 100644 --- a/fe/fe-core/src/test/java/com/starrocks/common/proc/CurrentGlobalQueryStatisticsProcDirTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/common/proc/CurrentGlobalQueryStatisticsProcDirTest.java @@ -47,7 +47,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest { .withSpillBytes(0) .withCpuCostNs(97323000) .withExecTime(3533000) - .withWareHouseName("default_warehouse"); + .withWareHouseName("default_warehouse") + .withResourceGroupName("wg1"); public static final QueryStatisticsInfo QUERY_TWO_LOCAL = new QueryStatisticsInfo() @@ -63,7 +64,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest { .withSpillBytes(0) .withCpuCostNs(96576000) .withExecTime(2086000) - .withWareHouseName("default_warehouse"); + .withWareHouseName("default_warehouse") + .withResourceGroupName("wg2"); public static final QueryStatisticsInfo QUERY_ONE_REMOTE = new QueryStatisticsInfo() .withQueryStartTime(1721866428) @@ -78,7 +80,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest { .withSpillBytes(0) .withCpuCostNs(97456000) .withExecTime(3687000) - .withWareHouseName("default_warehouse"); + .withWareHouseName("default_warehouse") + .withResourceGroupName("wg3"); public static final QueryStatisticsInfo QUERY_TWO_REMOTE = new QueryStatisticsInfo() @@ -94,7 +97,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest { .withSpillBytes(0) .withCpuCostNs(96686000) .withExecTime(2196000) - .withWareHouseName("default_warehouse"); + .withWareHouseName("default_warehouse") + .withResourceGroupName("wg"); public static List LOCAL_TEST_QUERIES = new ArrayList<>(List.of(QUERY_ONE_LOCAL, QUERY_TWO_LOCAL)); diff --git a/fe/fe-core/src/test/java/com/starrocks/common/proc/CurrentQueryStatisticsProcDirTest.java b/fe/fe-core/src/test/java/com/starrocks/common/proc/CurrentQueryStatisticsProcDirTest.java index bc1950ea9cd30..b7525a7686a45 100644 --- a/fe/fe-core/src/test/java/com/starrocks/common/proc/CurrentQueryStatisticsProcDirTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/common/proc/CurrentQueryStatisticsProcDirTest.java @@ -44,12 +44,14 @@ public void testFetchResult() throws AnalysisException { .queryStartTime(1) .queryId("queryId1") .warehouseName("wh1") + .resourceGroupName("wg1") .build() ); statistic.put("queryId2", new QueryStatisticsItem.Builder() .queryStartTime(2) .queryId("queryId2") .warehouseName("wh1") + .resourceGroupName("wg2") .build() ); new MockUp() { @@ -76,6 +78,8 @@ public Map getQueryStatistics( Assert.assertEquals("queryId1", list1.get(2)); // Warehouse Assert.assertEquals("wh1", list1.get(12)); + // ResourceGroupName + Assert.assertEquals("wg1", list1.get(13)); List list2 = rows.get(1); Assert.assertEquals(list2.size(), CurrentQueryStatisticsProcDir.TITLE_NAMES.size()); @@ -83,6 +87,8 @@ public Map getQueryStatistics( Assert.assertEquals("queryId2", list2.get(2)); // Warehouse Assert.assertEquals("wh1", list2.get(12)); + // ResourceGroupName + Assert.assertEquals("wg2", list2.get(13)); } @Test diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/QueryStatisticsInfoTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/QueryStatisticsInfoTest.java index d89218e9b864a..1fa57b8b1297b 100644 --- a/fe/fe-core/src/test/java/com/starrocks/qe/QueryStatisticsInfoTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/qe/QueryStatisticsInfoTest.java @@ -38,7 +38,8 @@ public void testEquality() { firstQuery.getMemUsageBytes(), firstQuery.getSpillBytes(), firstQuery.getExecTime(), - firstQuery.getWareHouseName() + firstQuery.getWareHouseName(), + firstQuery.getResourceGroupName() ); Assert.assertEquals(firstQuery, otherQuery); Assert.assertEquals(firstQuery.hashCode(), otherQuery.hashCode()); diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/JobSpecTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/JobSpecTest.java index af6575489a5f6..552c1e0f9eb42 100644 --- a/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/JobSpecTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/JobSpecTest.java @@ -21,12 +21,15 @@ import com.starrocks.catalog.ResourceGroupClassifier; import com.starrocks.catalog.ResourceGroupMgr; import com.starrocks.common.Config; +import com.starrocks.common.util.DebugUtil; import com.starrocks.load.loadv2.BulkLoadJob; import com.starrocks.planner.PlanFragment; import com.starrocks.planner.ScanNode; import com.starrocks.planner.StreamLoadPlanner; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.DefaultCoordinator; +import com.starrocks.qe.QeProcessorImpl; +import com.starrocks.qe.QueryStatisticsItem; import com.starrocks.qe.SessionVariable; import com.starrocks.qe.scheduler.dag.JobSpec; import com.starrocks.server.WarehouseManager; @@ -118,6 +121,12 @@ public void testFromQuerySpec() throws Exception { connectContext, fragments, scanNodes, descTable.toThrift()); JobSpec jobSpec = coordinator.getJobSpec(); + QeProcessorImpl.INSTANCE.registerQuery(queryId, new QeProcessorImpl.QueryInfo(connectContext, sql, coordinator)); + Map queryStatistics = QeProcessorImpl.INSTANCE.getQueryStatistics(); + assertThat(queryStatistics).hasSize(1); + assertThat(queryStatistics.get(DebugUtil.printId(queryId)).getResourceGroupName()) + .isEqualTo(QUERY_RESOURCE_GROUP.getName()); + // Check created jobSpec. Assert.assertEquals(queryId, jobSpec.getQueryId()); Assert.assertEquals(lastQueryId.toString(), jobSpec.getQueryGlobals().getLast_query_id()); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d60a51f7f37dc..48a228a8449c6 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1534,6 +1534,7 @@ struct TQueryStatisticsInfo { 11: optional i64 spillBytes 12: optional i64 execTime 13: optional string wareHouseName + 15: optional string resourceGroupName } struct TGetQueryStatisticsResponse {