Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Nov 16, 2023
1 parent 5349958 commit 0e803d1
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 20 deletions.
1 change: 0 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,6 @@ DEFINE_Bool(wait_internal_group_commit_finish, "false");

// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_mInt32(group_commit_interval_ms, "10000");

DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
Expand Down
1 change: 0 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,6 @@ DECLARE_Bool(wait_internal_group_commit_finish);

// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);
DECLARE_mInt32(group_commit_interval_ms);

// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.
Expand Down
14 changes: 7 additions & 7 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
*eos = false;
std::unique_lock l(mutex);
if (!need_commit) {
auto left_milliseconds = config::group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
auto left_milliseconds =
_group_commit_interval_ms - std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (left_milliseconds <= 0) {
need_commit = true;
}
}
while (_status.ok() && _block_queue.empty() &&
(!need_commit || (need_commit && !_load_ids.empty()))) {
CHECK(*_single_block_queue_bytes == 0);
auto left_milliseconds = config::group_commit_interval_ms;
auto left_milliseconds = _group_commit_interval_ms;
if (!need_commit) {
left_milliseconds = config::group_commit_interval_ms -
left_milliseconds = _group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
Expand Down Expand Up @@ -251,7 +251,7 @@ Status GroupCommitTable::_create_group_commit_load(
{
load_block_queue = std::make_shared<LoadBlockQueue>(
instance_id, label, txn_id, schema_version, _all_block_queues_bytes,
result.wait_internal_group_commit_finish);
result.wait_internal_group_commit_finish, result.group_commit_interval_ms);
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
Expand Down
7 changes: 5 additions & 2 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ class LoadBlockQueue {
LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id,
int64_t schema_version,
std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
bool wait_internal_group_commit_finish)
bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms)
: load_instance_id(load_instance_id),
label(label),
txn_id(txn_id),
schema_version(schema_version),
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_start_time(std::chrono::steady_clock::now()),
_all_block_queues_bytes(all_block_queues_bytes) {
_all_block_queues_bytes(all_block_queues_bytes),
_group_commit_interval_ms(group_commit_interval_ms) {
_single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
};

Expand Down Expand Up @@ -79,6 +80,8 @@ class LoadBlockQueue {
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
// memory consumption of one load block queue, used for correctness check.
std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
// group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
};

class GroupCommitTable {
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3276,6 +3276,10 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION).append("\" = \"");
sb.append(olapTable.enableSingleReplicaCompaction()).append("\"");

// group commit interval ms
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS).append("\" = \"");
sb.append(olapTable.getGroupCommitIntervalMs()).append("\"");

// enable duplicate without keys by default
if (olapTable.isDuplicateWithoutKey()) {
sb.append(",\n\"")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,13 @@ public String getSequenceMapCol() {
}

public void setGroupCommitIntervalMs(int groupCommitIntervalMs) {
properties.put(PropertyAnalyzer.PROPERTIES_GROUP_INTERVAL_COMMIT_MS, Integer.toString(groupCommitIntervalMs));
properties.put(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS, Integer.toString(groupCommitIntervalMs));
}

public int getGroupCommitIntervalMs() {
return Integer.parseInt(properties.getOrDefault(
PropertyAnalyzer.PROPERTIES_GROUP_INTERVAL_COMMIT_MS,
Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_INTERVAL_COMMIT_MS_DEFAULT_VALUE)));
PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS,
Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE)));
}

public void buildReplicaAllocation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ public class PropertyAnalyzer {
private static final double MAX_FPP = 0.05;
private static final double MIN_FPP = 0.0001;

public static final String PROPERTIES_GROUP_INTERVAL_COMMIT_MS = "group_interval_commit_ms";
public static final int PROPERTIES_GROUP_INTERVAL_COMMIT_MS_DEFAULT_VALUE = 1000;
public static final String PROPERTIES_GROUP_COMMIT_INTERVAL_MS = "group_commit_interval_ms";
public static final int PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE = 1000;

// compaction policy
public static final String SIZE_BASED_COMPACTION_POLICY = "size_based";
Expand Down Expand Up @@ -1164,16 +1164,16 @@ public static boolean analyzeUniqueKeyMergeOnWrite(Map<String, String> propertie
* @throws AnalysisException
*/
public static int analyzeGroupCommitIntervalMs(Map<String, String> properties) throws AnalysisException {
int groupCommitIntervalMs = PROPERTIES_GROUP_INTERVAL_COMMIT_MS_DEFAULT_VALUE;
if (properties != null && properties.containsKey(PROPERTIES_GROUP_INTERVAL_COMMIT_MS)) {
String groupIntervalCommitMsStr = properties.get(PROPERTIES_GROUP_INTERVAL_COMMIT_MS);
int groupCommitIntervalMs = PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE;
if (properties != null && properties.containsKey(PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) {
String groupIntervalCommitMsStr = properties.get(PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
try {
groupCommitIntervalMs = Integer.parseInt(groupIntervalCommitMsStr);
} catch (Exception e) {
throw new AnalysisException("schema version format error");
}

properties.remove(PROPERTIES_GROUP_INTERVAL_COMMIT_MS);
properties.remove(PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
}

return groupCommitIntervalMs;
Expand Down

0 comments on commit 0e803d1

Please sign in to comment.