Skip to content

Commit

Permalink
[wg](chore) rename workload group memory property names
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Nov 15, 2024
1 parent 02e3de3 commit 3da6bba
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 82 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadGroupsScanner::_s_tbls_colu
{"SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"MAX_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"MEMORY_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"MEMORY_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"TAG", TYPE_VARCHAR, sizeof(StringRef), true},
{"READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
{"REMOTE_READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
Expand Down
34 changes: 17 additions & 17 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_
_scan_thread_num(tg_info.scan_thread_num),
_max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num),
_min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num),
_spill_low_watermark(tg_info.spill_low_watermark),
_spill_high_watermark(tg_info.spill_high_watermark),
_memory_low_watermark(tg_info.memory_low_watermark),
_memory_high_watermark(tg_info.memory_high_watermark),
_scan_bytes_per_second(tg_info.read_bytes_per_second),
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
_need_create_query_thread_pool(need_create_query_thread_pool) {
Expand All @@ -91,27 +91,27 @@ std::string WorkloadGroup::debug_string() const {
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
"{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, "
"spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}, "
"memory_low_watermark={}, memory_high_watermark={}, is_shutdown={}, query_num={}, "
"read_bytes_per_second={}, remote_read_bytes_per_second={}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(),
_scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num,
_spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size(),
_memory_low_watermark, _memory_high_watermark, _is_shutdown, _query_ctxs.size(),
_scan_bytes_per_second, _remote_scan_bytes_per_second);
}

std::string WorkloadGroup::memory_debug_string() const {
return fmt::format(
"TG[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, weighted_memory_limit = {}, total_mem_used = {}, "
"wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, "
"spill_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]",
"wg_refresh_interval_memory_growth = {}, memory_low_watermark = {}, "
"memory_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]",
_id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false",
PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES),
PrettyPrinter::print(_total_mem_used, TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth, TUnit::BYTES),
_spill_low_watermark, _spill_high_watermark, _version, _is_shutdown,
_memory_low_watermark, _memory_high_watermark, _version, _is_shutdown,
_query_ctxs.size());
}

Expand All @@ -137,8 +137,8 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
_scan_thread_num = tg_info.scan_thread_num;
_max_remote_scan_thread_num = tg_info.max_remote_scan_thread_num;
_min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num;
_spill_low_watermark = tg_info.spill_low_watermark;
_spill_high_watermark = tg_info.spill_high_watermark;
_memory_low_watermark = tg_info.memory_low_watermark;
_memory_high_watermark = tg_info.memory_high_watermark;
_scan_bytes_per_second = tg_info.read_bytes_per_second;
_remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second;
} else {
Expand Down Expand Up @@ -397,15 +397,15 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
}

// 12 spill low watermark
int spill_low_watermark = SPILL_LOW_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.spill_threshold_low_watermark) {
spill_low_watermark = tworkload_group_info.spill_threshold_low_watermark;
int memory_low_watermark = SPILL_LOW_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.memory_low_watermark) {
memory_low_watermark = tworkload_group_info.memory_low_watermark;
}

// 13 spil high watermark
int spill_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.spill_threshold_high_watermark) {
spill_high_watermark = tworkload_group_info.spill_threshold_high_watermark;
int memory_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.memory_high_watermark) {
memory_high_watermark = tworkload_group_info.memory_high_watermark;
}

// 14 scan io
Expand All @@ -431,8 +431,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.scan_thread_num = scan_thread_num,
.max_remote_scan_thread_num = max_remote_scan_thread_num,
.min_remote_scan_thread_num = min_remote_scan_thread_num,
.spill_low_watermark = spill_low_watermark,
.spill_high_watermark = spill_high_watermark,
.memory_low_watermark = memory_low_watermark,
.memory_high_watermark = memory_high_watermark,
.read_bytes_per_second = read_bytes_per_second,
.remote_read_bytes_per_second = remote_read_bytes_per_second};
}
Expand Down
18 changes: 9 additions & 9 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
void do_sweep();

int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
return _memory_low_watermark.load(std::memory_order_relaxed);
}
int spill_threashold_high_water_mark() const {
return _spill_high_watermark.load(std::memory_order_relaxed);
return _memory_high_watermark.load(std::memory_order_relaxed);
}

void set_weighted_memory_ratio(double ratio);
Expand All @@ -107,7 +107,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
_memory_high_watermark.load(std::memory_order_relaxed) / 100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
Expand All @@ -122,10 +122,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
*is_low_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
_memory_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
_memory_high_watermark.load(std::memory_order_relaxed) / 100));
}

std::string debug_string() const;
Expand Down Expand Up @@ -233,8 +233,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::atomic<int> _scan_thread_num;
std::atomic<int> _max_remote_scan_thread_num;
std::atomic<int> _min_remote_scan_thread_num;
std::atomic<int> _spill_low_watermark;
std::atomic<int> _spill_high_watermark;
std::atomic<int> _memory_low_watermark;
std::atomic<int> _memory_high_watermark;
std::atomic<int64_t> _scan_bytes_per_second {-1};
std::atomic<int64_t> _remote_scan_bytes_per_second {-1};

Expand Down Expand Up @@ -282,8 +282,8 @@ struct WorkloadGroupInfo {
const int scan_thread_num = 0;
const int max_remote_scan_thread_num = 0;
const int min_remote_scan_thread_num = 0;
const int spill_low_watermark = 0;
const int spill_high_watermark = 0;
const int memory_low_watermark = 0;
const int memory_high_watermark = 0;
const int read_bytes_per_second = -1;
const int remote_read_bytes_per_second = -1;
// log cgroup cpu info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ public class SchemaTable extends Table {
.column("SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("MAX_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("MEMORY_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("MEMORY_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("TAG", ScalarType.createVarchar(256))
.column("READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.column("REMOTE_READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {

public static final String MIN_REMOTE_SCAN_THREAD_NUM = "min_remote_scan_thread_num";

public static final String SPILL_THRESHOLD_LOW_WATERMARK = "spill_threshold_low_watermark";
public static final String MEMORY_LOW_WATERMARK = "memory_low_watermark";

public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark";
public static final String MEMORY_HIGH_WATERMARK = "memory_high_watermark";

public static final String TAG = "tag";

Expand All @@ -96,7 +96,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
.add(MEMORY_LOW_WATERMARK).add(MEMORY_HIGH_WATERMARK)
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build();

public static final ImmutableMap<String, Integer> WORKLOAD_TYPE_MAP = new ImmutableMap.Builder<String, Integer>()
Expand Down Expand Up @@ -150,19 +150,19 @@ private WorkloadGroup(long id, String name, Map<String, String> properties, long
this.cpuHardLimit = Integer.parseInt(cpuHardLimitStr);
this.properties.put(CPU_HARD_LIMIT, cpuHardLimitStr);
}
if (properties.containsKey(SPILL_THRESHOLD_LOW_WATERMARK)) {
String lowWatermarkStr = properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
if (properties.containsKey(MEMORY_LOW_WATERMARK)) {
String lowWatermarkStr = properties.get(MEMORY_LOW_WATERMARK);
if (lowWatermarkStr.endsWith("%")) {
lowWatermarkStr = lowWatermarkStr.substring(0, lowWatermarkStr.length() - 1);
}
this.properties.put(SPILL_THRESHOLD_LOW_WATERMARK, lowWatermarkStr);
this.properties.put(MEMORY_LOW_WATERMARK, lowWatermarkStr);
}
if (properties.containsKey(SPILL_THRESHOLD_HIGH_WATERMARK)) {
String highWatermarkStr = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
if (properties.containsKey(MEMORY_HIGH_WATERMARK)) {
String highWatermarkStr = properties.get(MEMORY_HIGH_WATERMARK);
if (highWatermarkStr.endsWith("%")) {
highWatermarkStr = highWatermarkStr.substring(0, highWatermarkStr.length() - 1);
}
this.properties.put(SPILL_THRESHOLD_HIGH_WATERMARK, highWatermarkStr);
this.properties.put(MEMORY_HIGH_WATERMARK, highWatermarkStr);
}
if (properties.containsKey(TAG)) {
this.properties.put(TAG, properties.get(TAG).toLowerCase());
Expand Down Expand Up @@ -356,8 +356,8 @@ private static void checkProperties(Map<String, String> properties) throws DdlEx
}

int lowWaterMark = SPILL_LOW_WATERMARK_DEFAULT_VALUE;
if (properties.containsKey(SPILL_THRESHOLD_LOW_WATERMARK)) {
String lowVal = properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
if (properties.containsKey(MEMORY_LOW_WATERMARK)) {
String lowVal = properties.get(MEMORY_LOW_WATERMARK);
if (lowVal.endsWith("%")) {
lowVal = lowVal.substring(0, lowVal.length() - 1);
}
Expand All @@ -369,15 +369,15 @@ private static void checkProperties(Map<String, String> properties) throws DdlEx
lowWaterMark = intValue;
} catch (NumberFormatException e) {
throw new DdlException(
SPILL_THRESHOLD_LOW_WATERMARK
MEMORY_LOW_WATERMARK
+ " must be a positive integer(1 ~ 100) or -1. but input value is "
+ lowVal);
}
}

int highWaterMark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
if (properties.containsKey(SPILL_THRESHOLD_HIGH_WATERMARK)) {
String highVal = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
if (properties.containsKey(MEMORY_HIGH_WATERMARK)) {
String highVal = properties.get(MEMORY_HIGH_WATERMARK);
if (highVal.endsWith("%")) {
highVal = highVal.substring(0, highVal.length() - 1);
}
Expand All @@ -389,14 +389,14 @@ private static void checkProperties(Map<String, String> properties) throws DdlEx
highWaterMark = intValue;
} catch (NumberFormatException e) {
throw new DdlException(
SPILL_THRESHOLD_HIGH_WATERMARK + " must be a positive integer(1 ~ 100). but input value is "
MEMORY_HIGH_WATERMARK + " must be a positive integer(1 ~ 100). but input value is "
+ highVal);
}
}

if (lowWaterMark > highWaterMark) {
throw new DdlException(SPILL_THRESHOLD_HIGH_WATERMARK + "(" + highWaterMark + ") should bigger than "
+ SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + ")");
throw new DdlException(MEMORY_HIGH_WATERMARK + "(" + highWaterMark + ") should bigger than "
+ MEMORY_LOW_WATERMARK + "(" + lowWaterMark + ")");
}

if (properties.containsKey(READ_BYTES_PER_SECOND)) {
Expand Down Expand Up @@ -521,7 +521,7 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
row.add("-1");
} else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
row.add("-1");
} else if (SPILL_THRESHOLD_LOW_WATERMARK.equals(key)) {
} else if (MEMORY_LOW_WATERMARK.equals(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) {
row.add(SPILL_LOW_WATERMARK_DEFAULT_VALUE + "%");
Expand All @@ -530,7 +530,7 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
} else {
row.add(val + "%");
}
} else if (SPILL_THRESHOLD_HIGH_WATERMARK.equals(key)) {
} else if (MEMORY_HIGH_WATERMARK.equals(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) {
row.add(SPILL_HIGH_WATERMARK_DEFAULT_VALUE + "%");
Expand Down Expand Up @@ -643,14 +643,14 @@ public TopicInfo toTopicInfo() {
tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr));
}

String spillLowWatermarkStr = properties.get(SPILL_THRESHOLD_LOW_WATERMARK);
if (spillLowWatermarkStr != null) {
tWorkloadGroupInfo.setSpillThresholdLowWatermark(Integer.parseInt(spillLowWatermarkStr));
String memoryLowWatermarkStr = properties.get(MEMORY_LOW_WATERMARK);
if (memoryLowWatermarkStr != null) {
tWorkloadGroupInfo.setMemoryLowWatermark(Integer.parseInt(memoryLowWatermarkStr));
}

String spillHighWatermarkStr = properties.get(SPILL_THRESHOLD_HIGH_WATERMARK);
if (spillHighWatermarkStr != null) {
tWorkloadGroupInfo.setSpillThresholdHighWatermark(Integer.parseInt(spillHighWatermarkStr));
String memoryHighWatermarkStr = properties.get(MEMORY_HIGH_WATERMARK);
if (memoryHighWatermarkStr != null) {
tWorkloadGroupInfo.setMemoryHighWatermark(Integer.parseInt(memoryHighWatermarkStr));
}

String readBytesPerSecStr = properties.get(READ_BYTES_PER_SECOND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
.add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
.add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK)
.add(WorkloadGroup.MEMORY_LOW_WATERMARK).add(WorkloadGroup.MEMORY_HIGH_WATERMARK)
.add(WorkloadGroup.TAG)
.add(WorkloadGroup.READ_BYTES_PER_SECOND).add(WorkloadGroup.REMOTE_READ_BYTES_PER_SECOND)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
Expand Down
4 changes: 2 additions & 2 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ struct TWorkloadGroupInfo {
9: optional i32 scan_thread_num
10: optional i32 max_remote_scan_thread_num
11: optional i32 min_remote_scan_thread_num
12: optional i32 spill_threshold_low_watermark
13: optional i32 spill_threshold_high_watermark
12: optional i32 memory_low_watermark
13: optional i32 memory_high_watermark
14: optional i64 read_bytes_per_second
15: optional i64 remote_read_bytes_per_second
16: optional string tag
Expand Down
Loading

0 comments on commit 3da6bba

Please sign in to comment.