Skip to content

Commit

Permalink
[Enhancement] Show resource group in global and local current_queries (
Browse files Browse the repository at this point in the history
…StarRocks#50763)

Signed-off-by: zihe.liu <[email protected]>
  • Loading branch information
ZiheLiu authored and HangyuanLiu committed Sep 11, 2024
1 parent a91fa1c commit 129d280
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ public void createResourceGroup(CreateResourceGroupStmt stmt) throws DdlExceptio
dropResourceGroupUnlocked(wg.getName());
}

if (wg.getCpuWeight() == null) {
wg.setCpuWeight(0);
}

if (ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME.equals(wg.getName())) {
wg.setId(ResourceGroup.DEFAULT_WG_ID);
} else if (ResourceGroup.DEFAULT_MV_RESOURCE_GROUP_NAME.equals(wg.getName())) {
Expand Down Expand Up @@ -359,10 +363,10 @@ public void alterResourceGroup(AlterResourceGroupStmt stmt) throws DdlException

if (exclusiveCpuCores != null && exclusiveCpuCores > 0) {
if (sumExclusiveCpuCores + exclusiveCpuCores - wg.getNormalizedExclusiveCpuCores() >=
BackendResourceStat.getInstance().getAvgNumHardwareCoresOfBe()) {
BackendResourceStat.getInstance().getMinNumHardwareCoresOfBe()) {
throw new DdlException(String.format(EXCEED_TOTAL_EXCLUSIVE_CPU_CORES_ERR_MSG,
ResourceGroup.EXCLUSIVE_CPU_CORES,
BackendResourceStat.getInstance().getAvgNumHardwareCoresOfBe() - 1));
BackendResourceStat.getInstance().getMinNumHardwareCoresOfBe() - 1));
}
if (wg.getResourceGroupType() == TWorkGroupType.WG_SHORT_QUERY) {
throw new SemanticException(SHORT_QUERY_SET_EXCLUSIVE_CPU_CORES_ERR_MSG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
.add("ExecTime")
.add("Warehouse")
.add("CustomQueryId")
.add("ResourceGroup")
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.starrocks.analysis.DescriptorTable;
import com.starrocks.authentication.AuthenticationMgr;
import com.starrocks.catalog.FsBroker;
import com.starrocks.catalog.ResourceGroup;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
Expand Down Expand Up @@ -1287,6 +1288,12 @@ public String getWarehouseName() {
return connectContext.getSessionVariable().getWarehouseName();
}

@Override
public String getResourceGroupName() {
return jobSpec.getResourceGroup() == null ? ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME :
jobSpec.getResourceGroup().getName();
}

private void execShortCircuit() throws Exception {
shortCircuitExecutor.exec();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ public Map<String, QueryStatisticsItem> getQueryStatistics() {
.db(context.getDatabase())
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
.profile(info.getCoord().getQueryProfile())
.warehouseName(info.coord.getWarehouseName()).build();
.warehouseName(info.coord.getWarehouseName())
.resourceGroupName(info.coord.getResourceGroupName())
.build();

querySet.put(queryIdStr, item);
}
Expand Down
29 changes: 23 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ public class QueryStatisticsInfo {
private long execTime;
private String wareHouseName;
private String customQueryId;
private String resourceGroupName;

public QueryStatisticsInfo() {
}

public QueryStatisticsInfo(long queryStartTime, String feIp, String queryId, String connId, String db, String user,
long cpuCostNs, long scanBytes, long scanRows, long memUsageBytes, long spillBytes,
long execTime, String wareHouseName, String customQueryId) {
long execTime, String wareHouseName, String customQueryId, String resourceGroupName) {
this.queryStartTime = queryStartTime;
this.feIp = feIp;
this.queryId = queryId;
Expand All @@ -84,6 +85,7 @@ public QueryStatisticsInfo(long queryStartTime, String feIp, String queryId, Str
this.execTime = execTime;
this.wareHouseName = wareHouseName;
this.customQueryId = customQueryId;
this.resourceGroupName = resourceGroupName;
}

public long getQueryStartTime() {
Expand Down Expand Up @@ -138,6 +140,10 @@ public String getWareHouseName() {
return wareHouseName;
}

public String getResourceGroupName() {
return resourceGroupName;
}

public String getCustomQueryId() {
return customQueryId;
}
Expand Down Expand Up @@ -207,6 +213,11 @@ public QueryStatisticsInfo withWareHouseName(String warehouseName) {
return this;
}

public QueryStatisticsInfo withResourceGroupName(String resourceGroupName) {
this.resourceGroupName = resourceGroupName;
return this;
}

public QueryStatisticsInfo withCustomQueryId(String customQueryId) {
this.customQueryId = customQueryId;
return this;
Expand All @@ -227,7 +238,8 @@ public TQueryStatisticsInfo toThrift() {
.setSpillBytes(spillBytes)
.setExecTime(execTime)
.setWareHouseName(wareHouseName)
.setCustomQueryId(customQueryId);
.setCustomQueryId(customQueryId)
.setResourceGroupName(resourceGroupName);
}

public static QueryStatisticsInfo fromThrift(TQueryStatisticsInfo tinfo) {
Expand All @@ -245,7 +257,8 @@ public static QueryStatisticsInfo fromThrift(TQueryStatisticsInfo tinfo) {
.withCpuCostNs(tinfo.getCpuCostNs())
.withExecTime(tinfo.getExecTime())
.withWareHouseName(tinfo.getWareHouseName())
.withCustomQueryId(tinfo.getCustomQueryId());
.withCustomQueryId(tinfo.getCustomQueryId())
.withResourceGroupName(tinfo.getResourceGroupName());
}

public List<String> formatToList() {
Expand All @@ -264,6 +277,7 @@ public List<String> formatToList() {
values.add(QueryStatisticsFormatter.getSecondsFromMilli(this.getExecTime()));
values.add(this.getWareHouseName());
values.add(this.getCustomQueryId());
values.add(this.getResourceGroupName());
return values;
}

Expand All @@ -281,13 +295,14 @@ public boolean equals(Object o) {
Objects.equals(db, that.db) && Objects.equals(user, that.user) && cpuCostNs == that.cpuCostNs &&
scanBytes == that.scanBytes && scanRows == that.scanRows && memUsageBytes == that.memUsageBytes &&
spillBytes == that.spillBytes && execTime == that.execTime &&
Objects.equals(wareHouseName, that.wareHouseName) && Objects.equals(customQueryId, that.customQueryId);
Objects.equals(wareHouseName, that.wareHouseName) && Objects.equals(customQueryId, that.customQueryId) &&
Objects.equals(resourceGroupName, that.resourceGroupName);
}

@Override
public int hashCode() {
return Objects.hash(queryStartTime, feIp, queryId, connId, db, user, cpuCostNs, scanBytes, scanRows, memUsageBytes,
spillBytes, execTime, wareHouseName, customQueryId);
spillBytes, execTime, wareHouseName, customQueryId, resourceGroupName);
}

@Override
Expand All @@ -306,6 +321,7 @@ public String toString() {
", execTime=" + execTime +
", wareHouseName=" + wareHouseName +
", customQueryId=" + customQueryId +
", resourceGroupName=" + resourceGroupName +
'}';
}

Expand Down Expand Up @@ -341,7 +357,8 @@ public static List<QueryStatisticsInfo> makeListFromMetricsAndMgrs() throws Anal
.withCpuCostNs(statistics.getCpuCostNs())
.withExecTime(item.getQueryExecTime())
.withWareHouseName(item.getWarehouseName())
.withCustomQueryId(item.getCustomQueryId());
.withCustomQueryId(item.getCustomQueryId())
.withResourceGroupName(item.getResourceGroupName());
sortedRowData.add(info);
}

Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryStatisticsItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public final class QueryStatisticsItem {
private final RuntimeProfile queryProfile;
private final TUniqueId executionId;
private final String warehouseName;
private final String resourceGroupName;

private QueryStatisticsItem(Builder builder) {
this.customQueryId = builder.customQueryId;
Expand All @@ -51,6 +52,7 @@ private QueryStatisticsItem(Builder builder) {
this.queryProfile = builder.queryProfile;
this.executionId = builder.executionId;
this.warehouseName = builder.warehouseName;
this.resourceGroupName = builder.resourceGroupName;
}

public String getDb() {
Expand Down Expand Up @@ -102,6 +104,10 @@ public String getWarehouseName() {
return warehouseName;
}

public String getResourceGroupName() {
return resourceGroupName;
}

public static final class Builder {
private String customQueryId;
private String queryId;
Expand All @@ -114,6 +120,7 @@ public static final class Builder {
private RuntimeProfile queryProfile;
private TUniqueId executionId;
private String warehouseName;
private String resourceGroupName;

public Builder() {
fragmentInstanceInfos = Lists.newArrayList();
Expand Down Expand Up @@ -174,6 +181,11 @@ public Builder warehouseName(String warehouseName) {
return this;
}

public Builder resourceGroupName(String resourceGroupName) {
this.resourceGroupName = resourceGroupName;
return this;
}

public QueryStatisticsItem build() {
initDefaultValue(this);
return new QueryStatisticsItem(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,5 +243,7 @@ public static List<TabletCommitInfo> getCommitInfos(Coordinator coord) {

public abstract String getWarehouseName();

public abstract String getResourceGroupName();

public abstract boolean isShortCircuit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ public String getWarehouseName() {
return connectContext.getSessionVariable().getWarehouseName();
}

@Override
public String getResourceGroupName() {
return "";
}

public boolean isShortCircuit() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ public void testGetDedicateCpuCores() throws Exception {
assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" +
"default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" +
"rg1|17|0|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|null|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)\n" +
"rg2|0|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)\n" +
"rt_rg1|15|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rt_rg1_user)");

starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1");
Expand Down Expand Up @@ -1554,7 +1554,7 @@ public void testValidateCpuParametersForCreate() throws Exception {
List<List<String>> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all");
assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" +
"default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" +
"rg1|null|17|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)");
"rg1|0|17|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)");
starRocksAssert.executeResourceGroupDdlSql("DROP RESOURCE GROUP rg1");
}
}
Expand Down Expand Up @@ -1720,8 +1720,8 @@ public void testValidateSumExclusiveCpuCores() throws Exception {
List<List<String>> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all");
assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" +
"default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" +
"rg1|null|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|null|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)");
"rg1|0|16|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|0|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)");
}

{
Expand All @@ -1745,8 +1745,8 @@ public void testValidateSumExclusiveCpuCores() throws Exception {
List<List<String>> rows = starRocksAssert.executeResourceGroupShowSql("show resource groups all");
assertThat(rowsToString(rows)).isEqualTo("default_mv_wg|1|0|80.0%|0|0|0|null|80%|(weight=0.0)\n" +
"default_wg|32|0|100.0%|0|0|0|null|100%|(weight=0.0)\n" +
"rg1|null|14|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|null|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)");
"rg1|0|14|20.0%|0|0|0|null|100%|(weight=1.0, user=rg1_user)\n" +
"rg2|0|15|20.0%|0|0|0|null|100%|(weight=1.0, user=rg2_user)");
}

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withCpuCostNs(97323000)
.withExecTime(3533000)
.withWareHouseName("default_warehouse")
.withCustomQueryId("");
.withCustomQueryId("")
.withResourceGroupName("wg1");


public static final QueryStatisticsInfo QUERY_TWO_LOCAL = new QueryStatisticsInfo()
Expand All @@ -65,7 +66,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withCpuCostNs(96576000)
.withExecTime(2086000)
.withWareHouseName("default_warehouse")
.withCustomQueryId("");
.withCustomQueryId("")
.withResourceGroupName("wg2");

public static final QueryStatisticsInfo QUERY_ONE_REMOTE = new QueryStatisticsInfo()
.withQueryStartTime(1721866428)
Expand All @@ -81,7 +83,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withCpuCostNs(97456000)
.withExecTime(3687000)
.withWareHouseName("default_warehouse")
.withCustomQueryId("");
.withCustomQueryId("")
.withResourceGroupName("wg3");


public static final QueryStatisticsInfo QUERY_TWO_REMOTE = new QueryStatisticsInfo()
Expand All @@ -98,7 +101,8 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withCpuCostNs(96686000)
.withExecTime(2196000)
.withWareHouseName("default_warehouse")
.withCustomQueryId("");
.withCustomQueryId("")
.withResourceGroupName("wg");

public static List<QueryStatisticsInfo> LOCAL_TEST_QUERIES =
new ArrayList<>(List.of(QUERY_ONE_LOCAL, QUERY_TWO_LOCAL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ public void testFetchResult() throws AnalysisException {
.customQueryId("abc1")
.queryId("queryId1")
.warehouseName("wh1")
.resourceGroupName("wg1")
.build()
);
statistic.put("queryId2", new QueryStatisticsItem.Builder()
.queryStartTime(2)
.customQueryId("abc2")
.queryId("queryId2")
.warehouseName("wh1")
.resourceGroupName("wg2")
.build()
);
new MockUp<QeProcessorImpl>() {
Expand Down Expand Up @@ -80,6 +82,8 @@ public Map<String, CurrentQueryInfoProvider.QueryStatistics> getQueryStatistics(
Assert.assertEquals("wh1", list1.get(12));
// CustomQueryId
Assert.assertEquals("abc1", list1.get(13));
// ResourceGroupName
Assert.assertEquals("wg1", list1.get(14));

List<String> list2 = rows.get(1);
Assert.assertEquals(list2.size(), CurrentQueryStatisticsProcDir.TITLE_NAMES.size());
Expand All @@ -89,6 +93,8 @@ public Map<String, CurrentQueryInfoProvider.QueryStatistics> getQueryStatistics(
Assert.assertEquals("wh1", list2.get(12));
// CustomQueryId
Assert.assertEquals("abc2", list2.get(13));
// ResourceGroupName
Assert.assertEquals("wg2", list2.get(14));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public void testEquality() {
firstQuery.getSpillBytes(),
firstQuery.getExecTime(),
firstQuery.getWareHouseName(),
firstQuery.getCustomQueryId()
firstQuery.getCustomQueryId(),
firstQuery.getResourceGroupName()
);
Assert.assertEquals(firstQuery, otherQuery);
Assert.assertEquals(firstQuery.hashCode(), otherQuery.hashCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import com.starrocks.catalog.ResourceGroupClassifier;
import com.starrocks.catalog.ResourceGroupMgr;
import com.starrocks.common.Config;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.load.loadv2.BulkLoadJob;
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.ScanNode;
import com.starrocks.planner.StreamLoadPlanner;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.QueryStatisticsItem;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.server.WarehouseManager;
Expand Down Expand Up @@ -118,6 +121,12 @@ public void testFromQuerySpec() throws Exception {
connectContext, fragments, scanNodes, descTable.toThrift());
JobSpec jobSpec = coordinator.getJobSpec();

QeProcessorImpl.INSTANCE.registerQuery(queryId, new QeProcessorImpl.QueryInfo(connectContext, sql, coordinator));
Map<String, QueryStatisticsItem> queryStatistics = QeProcessorImpl.INSTANCE.getQueryStatistics();
assertThat(queryStatistics).hasSize(1);
assertThat(queryStatistics.get(DebugUtil.printId(queryId)).getResourceGroupName())
.isEqualTo(QUERY_RESOURCE_GROUP.getName());

// Check created jobSpec.
Assert.assertEquals(queryId, jobSpec.getQueryId());
Assert.assertEquals(lastQueryId.toString(), jobSpec.getQueryGlobals().getLast_query_id());
Expand Down
Loading

0 comments on commit 129d280

Please sign in to comment.