Skip to content

Commit

Permalink
[Enhancement] Show resource group in global and local current_queries…
Browse files Browse the repository at this point in the history
… and fix check bind_cpu (backport #50763 #50809) (#50808)

Signed-off-by: zihe.liu <[email protected]>
  • Loading branch information
ZiheLiu committed Sep 7, 2024
1 parent 286198b commit b295da5
Show file tree
Hide file tree
Showing 19 changed files with 109 additions and 30 deletions.
16 changes: 13 additions & 3 deletions be/src/exec/workgroup/pipeline_executor_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ namespace starrocks::workgroup {
// PipelineExecutorSetConfig
// ------------------------------------------------------------------------------------

static CpuUtil::CpuIds limit_total_cpuids(CpuUtil::CpuIds&& total_cpuids, uint32_t num_total_cores) {
if (total_cpuids.empty() || total_cpuids.size() <= num_total_cores) {
return std::move(total_cpuids);
}

CpuUtil::CpuIds cpuids;
std::copy_n(total_cpuids.begin(), num_total_cores, std::back_inserter(cpuids));
return cpuids;
}

PipelineExecutorSetConfig::PipelineExecutorSetConfig(uint32_t num_total_cores, uint32_t num_total_driver_threads,
uint32_t num_total_scan_threads,
uint32_t num_total_connector_scan_threads,
Expand All @@ -35,9 +45,9 @@ PipelineExecutorSetConfig::PipelineExecutorSetConfig(uint32_t num_total_cores, u
num_total_driver_threads(num_total_driver_threads),
num_total_scan_threads(num_total_scan_threads),
num_total_connector_scan_threads(num_total_connector_scan_threads),
total_cpuids(std::move(total_cpuids)),
total_cpuids(limit_total_cpuids(std::move(total_cpuids), num_total_cores)),
enable_bind_cpus(enable_bind_cpus),
enable_cpu_borrowing(enable_cpu_borrowing) {}
enable_cpu_borrowing(enable_cpu_borrowing && enable_bind_cpus) {}

std::string PipelineExecutorSetConfig::to_string() const {
return fmt::format(
Expand Down Expand Up @@ -172,7 +182,7 @@ void PipelineExecutorSet::notify_config_changed() const {
}

uint32_t PipelineExecutorSet::calculate_num_threads(uint32_t num_total_threads) const {
if (!_borrowed_cpu_ids.empty() || _cpuids.empty()) {
if (!_borrowed_cpu_ids.empty()) {
return num_total_threads;
}
return std::max<uint32_t>(1, num_total_threads * _cpuids.size() / _conf.num_total_cores);
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/workgroup/pipeline_executor_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ struct PipelineExecutorSetConfig {
const uint32_t num_total_scan_threads;
uint32_t num_total_connector_scan_threads;

CpuUtil::CpuIds total_cpuids;
const CpuUtil::CpuIds total_cpuids;

bool enable_bind_cpus;
const bool enable_bind_cpus;
bool enable_cpu_borrowing;
};

Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/workgroup/pipeline_executor_set_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@ void ExecutorsManager::change_num_connector_scan_threads(uint32_t num_connector_
}

void ExecutorsManager::change_enable_resource_group_cpu_borrowing(bool val) {
if (_conf.enable_cpu_borrowing == val) {
const bool new_val = val && _conf.enable_bind_cpus;
if (_conf.enable_cpu_borrowing == new_val) {
return;
}
_conf.enable_cpu_borrowing = val;
_conf.enable_cpu_borrowing = new_val;

update_shared_executors();
}
Expand Down
7 changes: 2 additions & 5 deletions be/src/http/action/pipeline_blocking_drivers_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,8 @@ void PipelineBlockingDriversAction::_handle_stat(HttpRequest* req) {
};

QueryMap query_map_in_wg;
_exec_env->workgroup_manager()->for_each_workgroup([&](const workgroup::WorkGroup& wg) {
if (wg.exclusive_executors() != nullptr) {
wg.exclusive_executors()->driver_executor()->iterate_immutable_blocking_driver(
iterate_func_generator(query_map_in_wg));
}
_exec_env->workgroup_manager()->for_each_executors([&](const workgroup::PipelineExecutorSet& executor) {
executor.driver_executor()->iterate_immutable_blocking_driver(iterate_func_generator(query_map_in_wg));
});
rapidjson::Document queries_in_wg_obj = query_map_to_doc_func(query_map_in_wg);

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
// Disable bind cpus when cgroup has cpu quota but no cpuset.
const bool enable_bind_cpus = config::enable_resource_group_bind_cpus &&
(!CpuInfo::is_cgroup_with_cpu_quota() || CpuInfo::is_cgroup_with_cpuset());
config::enable_resource_group_bind_cpus = enable_bind_cpus;
workgroup::PipelineExecutorSetConfig executors_manager_opts(
CpuInfo::num_cores(), _max_executor_threads, num_io_threads, connector_num_io_threads,
CpuInfo::get_core_ids(), enable_bind_cpus, config::enable_resource_group_cpu_borrowing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ public void createResourceGroup(CreateResourceGroupStmt stmt) throws DdlExceptio
dropResourceGroupUnlocked(wg.getName());
}

if (wg.getCpuWeight() == null) {
wg.setCpuWeight(0);
}

if (ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME.equals(wg.getName())) {
wg.setId(ResourceGroup.DEFAULT_WG_ID);
} else if (ResourceGroup.DEFAULT_MV_RESOURCE_GROUP_NAME.equals(wg.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
.add("CPUTime")
.add("ExecTime")
.add("Warehouse")
.add("ResourceGroup")
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.starrocks.analysis.DescriptorTable;
import com.starrocks.authentication.AuthenticationMgr;
import com.starrocks.catalog.FsBroker;
import com.starrocks.catalog.ResourceGroup;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
Expand Down Expand Up @@ -1173,6 +1174,12 @@ public String getWarehouseName() {
return connectContext.getSessionVariable().getWarehouseName();
}

@Override
public String getResourceGroupName() {
return jobSpec.getResourceGroup() == null ? ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME :
jobSpec.getResourceGroup().getName();
}

private void execShortCircuit() throws Exception {
shortCircuitExecutor.exec();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ public Map<String, QueryStatisticsItem> getQueryStatistics() {
.db(context.getDatabase())
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
.profile(info.getCoord().getQueryProfile())
.warehouseName(info.coord.getWarehouseName()).build();
.warehouseName(info.coord.getWarehouseName())
.resourceGroupName(info.coord.getResourceGroupName())
.build();

querySet.put(queryIdStr, item);
}
Expand Down
28 changes: 22 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ public class QueryStatisticsInfo {
private long spillBytes;
private long execTime;
private String wareHouseName;
private String resourceGroupName;

public QueryStatisticsInfo() {
}

public QueryStatisticsInfo(long queryStartTime, String feIp, String queryId, String connId, String db, String user,
long cpuCostNs, long scanBytes, long scanRows, long memUsageBytes, long spillBytes,
long execTime, String wareHouseName) {
long execTime, String wareHouseName, String resourceGroupName) {
this.queryStartTime = queryStartTime;
this.feIp = feIp;
this.queryId = queryId;
Expand All @@ -82,6 +83,7 @@ public QueryStatisticsInfo(long queryStartTime, String feIp, String queryId, Str
this.spillBytes = spillBytes;
this.execTime = execTime;
this.wareHouseName = wareHouseName;
this.resourceGroupName = resourceGroupName;
}

public long getQueryStartTime() {
Expand Down Expand Up @@ -136,6 +138,9 @@ public String getWareHouseName() {
return wareHouseName;
}

public String getResourceGroupName() {
return resourceGroupName;
}
public QueryStatisticsInfo withQueryStartTime(long queryStartTime) {
this.queryStartTime = queryStartTime;
return this;
Expand Down Expand Up @@ -201,6 +206,11 @@ public QueryStatisticsInfo withWareHouseName(String warehouseName) {
return this;
}

public QueryStatisticsInfo withResourceGroupName(String resourceGroupName) {
this.resourceGroupName = resourceGroupName;
return this;
}

public TQueryStatisticsInfo toThrift() {
return new TQueryStatisticsInfo()
.setQueryStartTime(queryStartTime)
Expand All @@ -215,7 +225,8 @@ public TQueryStatisticsInfo toThrift() {
.setMemUsageBytes(memUsageBytes)
.setSpillBytes(spillBytes)
.setExecTime(execTime)
.setWareHouseName(wareHouseName);
.setWareHouseName(wareHouseName)
.setResourceGroupName(resourceGroupName);
}

public static QueryStatisticsInfo fromThrift(TQueryStatisticsInfo tinfo) {
Expand All @@ -232,7 +243,8 @@ public static QueryStatisticsInfo fromThrift(TQueryStatisticsInfo tinfo) {
.withSpillBytes(tinfo.getSpillBytes())
.withCpuCostNs(tinfo.getCpuCostNs())
.withExecTime(tinfo.getExecTime())
.withWareHouseName(tinfo.getWareHouseName());
.withWareHouseName(tinfo.getWareHouseName())
.withResourceGroupName(tinfo.getResourceGroupName());
}

public List<String> formatToList() {
Expand All @@ -250,6 +262,7 @@ public List<String> formatToList() {
values.add(QueryStatisticsFormatter.getSecondsFromNano(this.getCpuCostNs()));
values.add(QueryStatisticsFormatter.getSecondsFromMilli(this.getExecTime()));
values.add(this.getWareHouseName());
values.add(this.getResourceGroupName());
return values;
}

Expand All @@ -267,13 +280,14 @@ public boolean equals(Object o) {
Objects.equals(db, that.db) && Objects.equals(user, that.user) && cpuCostNs == that.cpuCostNs &&
scanBytes == that.scanBytes && scanRows == that.scanRows && memUsageBytes == that.memUsageBytes &&
spillBytes == that.spillBytes && execTime == that.execTime &&
Objects.equals(wareHouseName, that.wareHouseName);
Objects.equals(wareHouseName, that.wareHouseName) &&
Objects.equals(resourceGroupName, that.resourceGroupName);
}

@Override
public int hashCode() {
return Objects.hash(queryStartTime, feIp, queryId, connId, db, user, cpuCostNs, scanBytes, scanRows, memUsageBytes,
spillBytes, execTime, wareHouseName);
spillBytes, execTime, wareHouseName, resourceGroupName);
}

@Override
Expand All @@ -291,6 +305,7 @@ public String toString() {
", spillBytes=" + spillBytes +
", execTime=" + execTime +
", wareHouseName=" + wareHouseName +
", resourceGroupName=" + resourceGroupName +
'}';
}

Expand Down Expand Up @@ -325,7 +340,8 @@ public static List<QueryStatisticsInfo> makeListFromMetricsAndMgrs() throws Anal
.withSpillBytes(statistics.getSpillBytes())
.withCpuCostNs(statistics.getCpuCostNs())
.withExecTime(item.getQueryExecTime())
.withWareHouseName(item.getWarehouseName());
.withWareHouseName(item.getWarehouseName())
.withResourceGroupName(item.getResourceGroupName());
sortedRowData.add(info);
}

Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public final class QueryStatisticsItem {
private final RuntimeProfile queryProfile;
private final TUniqueId executionId;
private final String warehouseName;
private final String resourceGroupName;

private QueryStatisticsItem(Builder builder) {
this.queryId = builder.queryId;
Expand All @@ -49,6 +50,7 @@ private QueryStatisticsItem(Builder builder) {
this.queryProfile = builder.queryProfile;
this.executionId = builder.executionId;
this.warehouseName = builder.warehouseName;
this.resourceGroupName = builder.resourceGroupName;
}

public String getDb() {
Expand Down Expand Up @@ -96,6 +98,10 @@ public String getWarehouseName() {
return warehouseName;
}

public String getResourceGroupName() {
return resourceGroupName;
}

public static final class Builder {
private String queryId;
private String db;
Expand All @@ -107,6 +113,7 @@ public static final class Builder {
private RuntimeProfile queryProfile;
private TUniqueId executionId;
private String warehouseName;
private String resourceGroupName;

public Builder() {
fragmentInstanceInfos = Lists.newArrayList();
Expand Down Expand Up @@ -162,6 +169,11 @@ public Builder warehouseName(String warehouseName) {
return this;
}

public Builder resourceGroupName(String resourceGroupName) {
this.resourceGroupName = resourceGroupName;
return this;
}

public QueryStatisticsItem build() {
initDefaultValue(this);
return new QueryStatisticsItem(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,5 +232,7 @@ public static List<TabletCommitInfo> getCommitInfos(Coordinator coord) {

public abstract String getWarehouseName();

public abstract String getResourceGroupName();

public abstract boolean isShortCircuit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ public String getWarehouseName() {
return connectContext.getSessionVariable().getWarehouseName();
}

@Override
public String getResourceGroupName() {
return "";
}

public boolean isShortCircuit() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ public void testGetDedicateCpuCores() throws Exception {
assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" +
"default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" +
"rg1|17|0|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|null|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)\n" +
"rg2|0|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)\n" +
"rt_rg1|15|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rt_rg1_user)");

starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1");
Expand Down Expand Up @@ -1554,7 +1554,7 @@ public void testValidateCpuParametersForCreate() throws Exception {
List<List<String>> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all");
assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" +
"default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" +
"rg1|null|17|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)");
"rg1|0|17|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)");
starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1");
}
}
Expand Down Expand Up @@ -1720,8 +1720,8 @@ public void testValidateSumExclusiveCpuCores() throws Exception {
List<List<String>> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all");
assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" +
"default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" +
"rg1|null|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|null|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)");
"rg1|0|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|0|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)");
}

{
Expand All @@ -1745,8 +1745,8 @@ public void testValidateSumExclusiveCpuCores() throws Exception {
List<List<String>> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all");
assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" +
"default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" +
"rg1|null|14|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|null|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)");
"rg1|0|14|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|0|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)");
}

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withSpillBytes(0)
.withCpuCostNs(97323000)
.withExecTime(3533000)
.withWareHouseName("default_warehouse");
.withWareHouseName("default_warehouse")
.withResourceGroupName("wg1");


public static final QueryStatisticsInfo QUERY_TWO_LOCAL = new QueryStatisticsInfo()
Expand All @@ -63,7 +64,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withSpillBytes(0)
.withCpuCostNs(96576000)
.withExecTime(2086000)
.withWareHouseName("default_warehouse");
.withWareHouseName("default_warehouse")
.withResourceGroupName("wg2");

public static final QueryStatisticsInfo QUERY_ONE_REMOTE = new QueryStatisticsInfo()
.withQueryStartTime(1721866428)
Expand All @@ -78,7 +80,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withSpillBytes(0)
.withCpuCostNs(97456000)
.withExecTime(3687000)
.withWareHouseName("default_warehouse");
.withWareHouseName("default_warehouse")
.withResourceGroupName("wg3");


public static final QueryStatisticsInfo QUERY_TWO_REMOTE = new QueryStatisticsInfo()
Expand All @@ -94,7 +97,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withSpillBytes(0)
.withCpuCostNs(96686000)
.withExecTime(2196000)
.withWareHouseName("default_warehouse");
.withWareHouseName("default_warehouse")
.withResourceGroupName("wg");

public static List<QueryStatisticsInfo> LOCAL_TEST_QUERIES =
new ArrayList<>(List.of(QUERY_ONE_LOCAL, QUERY_TWO_LOCAL));
Expand Down
Loading

0 comments on commit b295da5

Please sign in to comment.