diff --git a/docs/en/operations/settings/merge-tree-settings.md b/docs/en/operations/settings/merge-tree-settings.md index a7bba76a05a..648403cbeed 100644 --- a/docs/en/operations/settings/merge-tree-settings.md +++ b/docs/en/operations/settings/merge-tree-settings.md @@ -41,11 +41,11 @@ Possible values: - Any positive integer. -Default value: 300. +Default value: 3000. To achieve maximum performance of `SELECT` queries, it is necessary to minimize the number of parts processed, see [Merge Tree](../../development/architecture.md#merge-tree). -You can set a larger value to 600 (1200), this will reduce the probability of the `Too many parts` error, but at the same time `SELECT` performance might degrade. Also in case of a merge issue (for example, due to insufficient disk space) you will notice it later than it could be with the original 300. +Prior to 23.6 this setting was set to 300. You can set a higher different value, it will reduce the probability of the `Too many parts` error, but at the same time `SELECT` performance might degrade. Also in case of a merge issue (for example, due to insufficient disk space) you will notice it later than it could be with the original 300. ## parts_to_delay_insert {#parts-to-delay-insert} @@ -97,8 +97,16 @@ max_k = parts_to_throw_insert - parts_to_delay_insert k = 1 + parts_count_in_partition - parts_to_delay_insert delay_milliseconds = pow(max_delay_to_insert * 1000, k / max_k) ``` +For example, if a partition has 299 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, `INSERT` is delayed for `pow( 1 * 1000, (1 + 299 - 150) / (300 - 150) ) = 1000` milliseconds. -For example if a partition has 299 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, `INSERT` is delayed for `pow( 1 * 1000, (1 + 299 - 150) / (300 - 150) ) = 1000` milliseconds. +Starting from version 23.1 formula has been changed to: +```code +allowed_parts_over_threshold = parts_to_throw_insert - parts_to_delay_insert +parts_over_threshold = parts_count_in_partition - parts_to_delay_insert + 1 +delay_milliseconds = max(min_delay_to_insert_ms, (max_delay_to_insert * 1000) * parts_over_threshold / allowed_parts_over_threshold) +``` + +For example, if a partition has 224 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, min_delay_to_insert_ms = 10, `INSERT` is delayed for `max( 10, 1 * 1000 * (224 - 150 + 1) / (300 - 150) ) = 500` milliseconds. ## max_parts_in_total {#max-parts-in-total} diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2307b4c21f4..e0f9902379c 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -271,7 +271,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, max_replica_delay_for_distributed_queries, 300, "If set, distributed queries of Replicated tables will choose servers with replication delay in seconds less than the specified value (not inclusive). Zero means do not take delay into account.", 0) \ M(Bool, fallback_to_stale_replicas_for_distributed_queries, true, "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is enabled, the query will be performed anyway, otherwise the error will be reported.", 0) \ M(UInt64, preferred_max_column_in_block_size_bytes, 0, "Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.", 0) \ - \ + M(UInt64, parts_to_delay_insert, 0, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \ + M(UInt64, parts_to_throw_insert, 0, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \ M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \ M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \ M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \ diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index ccc7fc92305..298225c4f00 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -1429,7 +1429,7 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti const auto & settings = getContext()->getSettingsRef(); - calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountForPartition()); + calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountAndSizeForPartition().first); total_number_of_bytes += table_merge_tree->totalBytes(settings).value(); total_number_of_rows += table_merge_tree->totalRows(settings).value(); total_number_of_parts += table_merge_tree->getPartsCount(); diff --git a/src/Loggers/OwnPatternFormatter.cpp b/src/Loggers/OwnPatternFormatter.cpp index 02a2c2e510b..0c2256aaa1b 100644 --- a/src/Loggers/OwnPatternFormatter.cpp +++ b/src/Loggers/OwnPatternFormatter.cpp @@ -4,8 +4,6 @@ #include #include #include -#include -#include #include diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 61bd95d814f..02ebcfc7e72 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3438,42 +3438,49 @@ size_t MergeTreeData::getPartsCount() const } -size_t MergeTreeData::getMaxPartsCountForPartitionWithState(DataPartState state) const +std::pair MergeTreeData::getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const { auto lock = lockParts(); - size_t res = 0; - size_t cur_count = 0; + size_t cur_parts_count = 0; + size_t cur_parts_size = 0; + size_t max_parts_count = 0; + size_t argmax_parts_size = 0; + const String * cur_partition_id = nullptr; for (const auto & part : getDataPartsStateRange(state)) { - if (cur_partition_id && part->info.partition_id == *cur_partition_id) - { - ++cur_count; - } - else + if (!cur_partition_id || part->info.partition_id != *cur_partition_id) { cur_partition_id = &part->info.partition_id; - cur_count = 1; + cur_parts_count = 0; + cur_parts_size = 0; } - res = std::max(res, cur_count); + ++cur_parts_count; + cur_parts_size += part->getBytesOnDisk(); + + if (cur_parts_count > max_parts_count) + { + max_parts_count = cur_parts_count; + argmax_parts_size = cur_parts_size; + } } - return res; + return {max_parts_count, argmax_parts_size}; } -size_t MergeTreeData::getMaxPartsCountForPartition() const +std::pair MergeTreeData::getMaxPartsCountAndSizeForPartition() const { - return getMaxPartsCountForPartitionWithState(DataPartState::Active); + return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Active); } -size_t MergeTreeData::getMaxInactivePartsCountForPartition() const +size_t MergeTreeData::getMaxOutdatedPartsCountForPartition() const { - return getMaxPartsCountForPartitionWithState(DataPartState::Outdated); + return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first; } @@ -3492,65 +3499,109 @@ std::optional MergeTreeData::getMinPartDataVersion() const } -void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const +void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const { const auto settings = getSettings(); + const auto & query_settings = query_context->getSettingsRef(); const size_t parts_count_in_total = getPartsCount(); - if (parts_count_in_total >= settings->max_parts_in_total) + + /// Check if we have too many parts in total + if (allow_throw && parts_count_in_total >= settings->max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-stream setting.", ErrorCodes::TOO_MANY_PARTS); + throw Exception( + ErrorCodes::TOO_MANY_PARTS, + "Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified " + "with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", + toString(parts_count_in_total)); } - size_t parts_count_in_partition = getMaxPartsCountForPartition(); - ssize_t k_inactive = -1; - if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) + size_t outdated_parts_over_threshold = 0; { - size_t inactive_parts_count_in_partition = getMaxInactivePartsCountForPartition(); - if (settings->inactive_parts_to_throw_insert > 0 && inactive_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) + size_t outdated_parts_count_in_partition = 0; + if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) + outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition(); + + if (allow_throw && settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( ErrorCodes::TOO_MANY_PARTS, "Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts", - inactive_parts_count_in_partition); + outdated_parts_count_in_partition); } - k_inactive = ssize_t(inactive_parts_count_in_partition) - ssize_t(settings->inactive_parts_to_delay_insert); + if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert) + outdated_parts_over_threshold = outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1; } - if (parts_count_in_partition >= settings->parts_to_throw_insert) + auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); + size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0; + const auto active_parts_to_delay_insert + = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; + const auto active_parts_to_throw_insert + = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; + size_t active_parts_over_threshold = 0; { - ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception( - ErrorCodes::TOO_MANY_PARTS, - "Too many parts ({}). Merges are processing significantly slower than inserts", - parts_count_in_partition); + bool parts_are_large_enough_in_average + = settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts; + + if (allow_throw && parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average) + { + ProfileEvents::increment(ProfileEvents::RejectedInserts); + throw Exception( + ErrorCodes::TOO_MANY_PARTS, + "Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts", + parts_count_in_partition, + ReadableSize(average_part_size)); + } + if (active_parts_to_delay_insert > 0 && parts_count_in_partition >= active_parts_to_delay_insert + && !parts_are_large_enough_in_average) + /// if parts_count == parts_to_delay_insert -> we're 1 part over threshold + active_parts_over_threshold = parts_count_in_partition - active_parts_to_delay_insert + 1; } - if (k_inactive < 0 && parts_count_in_partition < settings->parts_to_delay_insert) + /// no need for delay + if (!active_parts_over_threshold && !outdated_parts_over_threshold) return; - const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(settings->parts_to_delay_insert); - size_t max_k; - size_t k; - if (k_active > k_inactive) + UInt64 delay_milliseconds = 0; { - max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert; - k = k_active + 1; - } - else - { - max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; - k = k_inactive + 1; + size_t parts_over_threshold = 0; + size_t allowed_parts_over_threshold = 1; + const bool use_active_parts_threshold = (active_parts_over_threshold >= outdated_parts_over_threshold); + if (use_active_parts_threshold) + { + parts_over_threshold = active_parts_over_threshold; + allowed_parts_over_threshold = active_parts_to_throw_insert - active_parts_to_delay_insert; + } + else + { + parts_over_threshold = outdated_parts_over_threshold; + allowed_parts_over_threshold = outdated_parts_over_threshold; /// if throw threshold is not set, will use max delay + if (settings->inactive_parts_to_throw_insert > 0) + allowed_parts_over_threshold = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; + } + + const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); + if (allowed_parts_over_threshold == 0 || parts_over_threshold > allowed_parts_over_threshold) + { + delay_milliseconds = max_delay_milliseconds; + } + else + { + double delay_factor = static_cast(parts_over_threshold) / allowed_parts_over_threshold; + const UInt64 min_delay_milliseconds = settings->min_delay_to_insert_ms; + delay_milliseconds = std::max(min_delay_milliseconds, static_cast(max_delay_milliseconds * delay_factor)); + } } - const UInt64 delay_milliseconds = static_cast(::pow(settings->max_delay_to_insert * 1000, static_cast(k) / max_k)); ProfileEvents::increment(ProfileEvents::DelayedInserts); ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds); CurrentMetrics::Increment metric_increment(CurrentMetrics::DelayedInserts); - LOG_INFO(log, "Delaying inserting block by {} ms. because there are {} parts", delay_milliseconds, parts_count_in_partition); + LOG_INFO(log, "Delaying inserting block by {} ms. because there are {} parts and their average size is {}", + delay_milliseconds, parts_count_in_partition, ReadableSize(average_part_size)); if (until) until->tryWait(delay_milliseconds); @@ -3558,6 +3609,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(delay_milliseconds))); } + MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 69152c3ff1a..aea8edfbea5 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -506,9 +506,13 @@ class MergeTreeData : public IStorage, public WithMutableContext size_t getTotalActiveSizeInRows() const; size_t getPartsCount() const; - size_t getMaxPartsCountForPartitionWithState(DataPartState state) const; - size_t getMaxPartsCountForPartition() const; - size_t getMaxInactivePartsCountForPartition() const; + + /// Returns a pair with: max number of parts in partition across partitions; sum size of parts inside that partition. + /// (if there are multiple partitions with max number of parts, the sum size of parts is returned for arbitrary of them) + std::pair getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const; + std::pair getMaxPartsCountAndSizeForPartition() const; + + size_t getMaxOutdatedPartsCountForPartition() const; /// Get min value of part->info.getDataVersion() for all active parts. /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. @@ -528,7 +532,7 @@ class MergeTreeData : public IStorage, public WithMutableContext /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. - void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const; + void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const; /// Renames temporary part to a permanent part and adds it to the parts set. /// It is assumed that the part does not intersect with existing parts. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 684a7ceb9ec..b2112c9f120 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -66,11 +66,13 @@ struct Settings; M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \ \ /** Inserts settings. */ \ - M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \ + M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \ - M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ + M(UInt64, parts_to_throw_insert, 3000, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \ + M(UInt64, max_avg_part_size_for_too_many_parts, 1ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \ M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \ + M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \ \ /** Replication settings. */ \ diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 9ee287ea268..5d424f29856 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -49,9 +49,9 @@ MergeTreeSink::MergeTreeSink( void MergeTreeSink::onStart() { - /// Only check "too many parts" before write, + /// It's only allowed to throw "too many parts" before write, /// because interrupting long-running INSERT query in the middle is not convenient for users. - storage.delayInsertOrThrowIfNeeded(); + storage.delayInsertOrThrowIfNeeded(nullptr, context, true); } void MergeTreeSink::onFinish() @@ -61,6 +61,9 @@ void MergeTreeSink::onFinish() void MergeTreeSink::consume(Chunk chunk) { + if (num_blocks_processed > 0) + storage.delayInsertOrThrowIfNeeded(nullptr, context, false); + auto block = getHeader().cloneWithColumns(chunk.detachColumns()); if (!storage_snapshot->object_columns.get()->empty()) convertDynamicColumnsToTuples(block, storage_snapshot); @@ -149,6 +152,8 @@ void MergeTreeSink::consume(Chunk chunk) finishDelayedChunk(); delayed_chunk = std::make_unique(); delayed_chunk->partitions = std::move(partitions); + + ++num_blocks_processed; } void MergeTreeSink::finishDelayedChunk() diff --git a/src/Storages/MergeTree/MergeTreeSink.h b/src/Storages/MergeTree/MergeTreeSink.h index f0cbcc2e6b6..c4723d38a43 100644 --- a/src/Storages/MergeTree/MergeTreeSink.h +++ b/src/Storages/MergeTree/MergeTreeSink.h @@ -45,7 +45,8 @@ class MergeTreeSink final : public SinkToStorage size_t max_parts_per_block; ContextPtr context; StorageSnapshotPtr storage_snapshot; - uint64_t chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token + UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token + UInt64 num_blocks_processed = 0; /// We can delay processing for previous chunk and start writing a new one. struct DelayedChunk; diff --git a/tests/queries_not_supported/0_stateless/parts_to_throw_insert/02280_add_query_level_settings.reference b/tests/queries_ported/0_stateless/02280_add_query_level_settings.reference similarity index 100% rename from tests/queries_not_supported/0_stateless/parts_to_throw_insert/02280_add_query_level_settings.reference rename to tests/queries_ported/0_stateless/02280_add_query_level_settings.reference diff --git a/tests/queries_not_supported/0_stateless/parts_to_throw_insert/02280_add_query_level_settings.sql b/tests/queries_ported/0_stateless/02280_add_query_level_settings.sql similarity index 100% rename from tests/queries_not_supported/0_stateless/parts_to_throw_insert/02280_add_query_level_settings.sql rename to tests/queries_ported/0_stateless/02280_add_query_level_settings.sql diff --git a/tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.reference b/tests/queries_ported/0_stateless/02458_relax_too_many_parts.reference similarity index 100% rename from tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.reference rename to tests/queries_ported/0_stateless/02458_relax_too_many_parts.reference diff --git a/tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.sql b/tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql similarity index 68% rename from tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.sql rename to tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql index 5ff4851f155..559ae149746 100644 --- a/tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.sql +++ b/tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql @@ -1,5 +1,5 @@ DROP STREAM IF EXISTS test; -CREATE STREAM test (x uint64, s string) ENGINE = MergeTree ORDER BY tuple() SETTINGS parts_to_throw_insert = 3; +CREATE STREAM test (x uint64, s string) ENGINE = MergeTree ORDER BY x SETTINGS parts_to_throw_insert = 3, max_parts_to_merge_at_once = 1; -- The "too many parts" threshold works: SET max_block_size = 1, min_insert_block_size_rows = 1, min_insert_block_size_bytes = 1; @@ -14,7 +14,7 @@ ALTER STREAM test MODIFY SETTING max_avg_part_size_for_too_many_parts = '1M'; -- It works in the same way if parts are small: SYSTEM START MERGES test; -OPTIMIZE STREAM test FINAL; +OPTIMIZE TABLE test FINAL SETTINGS optimize_throw_if_noop=1; SYSTEM STOP MERGES test; INSERT INTO test VALUES (5, 'a'); @@ -23,14 +23,14 @@ INSERT INTO test VALUES (7, 'a'); -- { serverError TOO_MANY_PARTS } -- But it allows having more parts if their average size is large: SYSTEM START MERGES test; -OPTIMIZE STREAM test FINAL; +OPTIMIZE TABLE test FINAL SETTINGS optimize_throw_if_noop=1; SYSTEM STOP MERGES test; SET max_block_size = 65000, min_insert_block_size_rows = 65000, min_insert_block_size_bytes = '1M'; -INSERT INTO test SELECT number, randomString(1000) FROM numbers(0, 10000); -INSERT INTO test SELECT number, randomString(1000) FROM numbers(10000, 10000); -INSERT INTO test SELECT number, randomString(1000) FROM numbers(20000, 10000); +INSERT INTO test SELECT number, random_string(1000) FROM numbers(0, 10000); +INSERT INTO test SELECT number, random_string(1000) FROM numbers(10000, 10000); +INSERT INTO test SELECT number, random_string(1000) FROM numbers(20000, 10000); -SELECT count(), round(avg(bytes), -6) FROM system.parts WHERE database = currentDatabase() AND stream = 'test' AND active; +SELECT count(), round(avg(bytes), -6) FROM system.parts WHERE database = current_database() AND table = 'test' AND active; DROP STREAM test; diff --git a/tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.reference b/tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.reference new file mode 100644 index 00000000000..c104ff58aff --- /dev/null +++ b/tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.reference @@ -0,0 +1,6 @@ +0 +300 +500 +750 +1000 +TOO_MANY_PARTS diff --git a/tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.sh b/tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.sh new file mode 100755 index 00000000000..b1efb033d3b --- /dev/null +++ b/tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "DROP STREAM IF EXISTS test_02521_insert_delay" +# Create MergeTree with settings which allow to insert maximum 5 parts, on 6th it'll throw TOO_MANY_PARTS +$CLICKHOUSE_CLIENT -q "CREATE STREAM test_02521_insert_delay (key uint32, value string) Engine=MergeTree() ORDER BY key SETTINGS parts_to_delay_insert=1, parts_to_throw_insert=5, max_delay_to_insert=1, min_delay_to_insert_ms=300" +$CLICKHOUSE_CLIENT -q "SYSTEM STOP MERGES test_02521_insert_delay" + +# Every delay is increased by max_delay_to_insert*1000/(parts_to_throw_insert - parts_to_delay_insert + 1), here it's 250ms +# 0-indexed INSERT - no delay, 1-indexed INSERT - 300ms instead of 250ms due to min_delay_to_insert_ms +for i in {0..4} +do + query_id="${CLICKHOUSE_DATABASE}_02521_${i}_$RANDOM$RANDOM" + $CLICKHOUSE_CLIENT --query_id="$query_id" -q "INSERT INTO test_02521_insert_delay SELECT number, to_string(number) FROM numbers(${i}, 1)" + $CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS" + $CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "select ProfileEvents['DelayedInsertsMilliseconds'] as delay from system.query_log where event_date >= yesterday() and query_id = {query_id:string} order by delay desc limit 1" +done + +$CLICKHOUSE_CLIENT -q "INSERT INTO test_02521_insert_delay VALUES(0, 'This query throws error')" 2>&1 | grep -o 'TOO_MANY_PARTS' | head -n 1 + +$CLICKHOUSE_CLIENT -q "DROP STREAM test_02521_insert_delay"