diff --git a/dbms/src/Common/BackgroundTask.cpp b/dbms/src/Common/BackgroundTask.cpp index 81786c05f5b..18874f08053 100644 --- a/dbms/src/Common/BackgroundTask.cpp +++ b/dbms/src/Common/BackgroundTask.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include @@ -20,36 +21,6 @@ namespace DB { namespace { -bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size) -{ - resident_set = 0.0; - - // 'file' stat seems to give the most reliable results - std::ifstream stat_stream("/proc/self/stat", std::ios_base::in); - // if "/proc/self/stat" is not supported - if (!stat_stream.is_open()) - return false; - - // dummy vars for leading entries in stat that we don't care about - std::string pid, comm, state, ppid, pgrp, session, tty_nr; - std::string tpgid, flags, minflt, cminflt, majflt, cmajflt; - std::string utime, stime, cutime, cstime, priority, nice; - std::string itrealvalue, starttime; - - // the field we want - Int64 rss; - - stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt - >> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> cur_proc_num_threads - >> itrealvalue >> starttime >> cur_virt_size >> rss; // don't care about the rest - - stat_stream.close(); - - Int64 page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages - resident_set = rss * page_size_kb; - return true; -} - bool isProcStatSupported() { std::ifstream stat_stream("/proc/self/stat", std::ios_base::in); @@ -90,16 +61,9 @@ void CollectProcInfoBackgroundTask::memCheckJob() { try { - double resident_set; - Int64 cur_proc_num_threads = 1; - UInt64 cur_virt_size = 0; while (!end_syn) { - process_mem_usage(resident_set, cur_proc_num_threads, cur_virt_size); - resident_set *= 1024; // unit: byte - real_rss = static_cast(resident_set); - proc_num_threads = cur_proc_num_threads; - proc_virt_size = cur_virt_size; + std::tie(real_rss, proc_num_threads, proc_virt_size) = process_mem_usage(); baseline_of_query_mem_tracker = root_of_query_mem_trackers->get(); usleep(100000); // sleep 100ms } diff --git a/dbms/src/Common/MemoryAllocTrace.cpp b/dbms/src/Common/MemoryAllocTrace.cpp index 35d5ebc67f4..517a9863b7d 100644 --- a/dbms/src/Common/MemoryAllocTrace.cpp +++ b/dbms/src/Common/MemoryAllocTrace.cpp @@ -15,6 +15,8 @@ #include #include // Included for `USE_JEMALLOC` +#include + #if USE_JEMALLOC #include #endif @@ -35,4 +37,45 @@ std::tuple getAllocDeallocPtr() return std::make_tuple(nullptr, nullptr); #endif } + +bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size) +{ + resident_set = 0.0; + + // 'file' stat seems to give the most reliable results + std::ifstream stat_stream("/proc/self/stat", std::ios_base::in); + // if "/proc/self/stat" is not supported + if (!stat_stream.is_open()) + return false; + + // dummy vars for leading entries in stat that we don't care about + std::string pid, comm, state, ppid, pgrp, session, tty_nr; + std::string tpgid, flags, minflt, cminflt, majflt, cmajflt; + std::string utime, stime, cutime, cstime, priority, nice; + std::string itrealvalue, starttime; + + // the field we want + Int64 rss; + + stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt + >> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> cur_proc_num_threads + >> itrealvalue >> starttime >> cur_virt_size >> rss; // don't care about the rest + + stat_stream.close(); + + Int64 page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages + resident_set = rss * page_size_kb; + return true; +} + +std::tuple process_mem_usage() +{ + double resident_set; + Int64 cur_proc_num_threads = 1; + UInt64 cur_virt_size = 0; + process_mem_usage(resident_set, cur_proc_num_threads, cur_virt_size); + resident_set *= 1024; // unit: byte + return std::make_tuple(resident_set, cur_proc_num_threads, cur_virt_size); +} + } // namespace DB diff --git a/dbms/src/Common/MemoryAllocTrace.h b/dbms/src/Common/MemoryAllocTrace.h index fc6303ac3b5..8f3685bc421 100644 --- a/dbms/src/Common/MemoryAllocTrace.h +++ b/dbms/src/Common/MemoryAllocTrace.h @@ -14,9 +14,13 @@ #pragma once +#include + #include namespace DB { std::tuple getAllocDeallocPtr(); -} // namespace DB \ No newline at end of file +bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size); +std::tuple process_mem_usage(); +} // namespace DB diff --git a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp index 7c5b4ebb145..12c69e1a977 100644 --- a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp +++ b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp @@ -183,7 +183,7 @@ void MockRaftStoreProxy::debugAddRegions( auto region = tests::makeRegion(region_ids[i], ranges[i].first, ranges[i].second, kvs.getProxyHelper()); lock.regions.emplace(region_ids[i], region); lock.index.add(region); - tmt.getRegionTable().updateRegion(*region); + tmt.getRegionTable().addRegion(*region); } } } @@ -601,7 +601,7 @@ std::tuple MockRaftStoreProxy::snapshot( uint64_t index, uint64_t term, std::optional deadline_index, - bool cancel_after_prehandle) + std::optional> cancel_after_prehandle) { auto old_kv_region = kvs.getRegion(region_id); RUNTIME_CHECK(old_kv_region != nullptr); @@ -628,7 +628,7 @@ std::tuple MockRaftStoreProxy::snapshot( uint64_t index, uint64_t term, std::optional deadline_index, - bool cancel_after_prehandle) + std::optional> cancel_after_prehandle) { auto region = getRegion(region_id); RUNTIME_CHECK(region != nullptr); @@ -640,10 +640,10 @@ std::tuple MockRaftStoreProxy::snapshot( term = region->getLatestCommitTerm(); } - auto new_kv_region = kvs.genRegionPtr(std::move(region_meta), peer_id, index, term); // The new entry is committed on Proxy's side. region->updateCommitIndex(index); - new_kv_region->setApplied(index, term); + // Would set `applied_index` in genRegionPtr. + auto new_kv_region = kvs.genRegionPtr(std::move(region_meta), peer_id, index, term, tmt.getRegionTable()); std::vector ssts; for (auto & cf : cfs) @@ -663,6 +663,7 @@ std::tuple MockRaftStoreProxy::snapshot( auto rg = RegionPtrWithSnapshotFiles{new_kv_region, std::vector(prehandle_result.ingest_ids)}; if (cancel_after_prehandle) { + cancel_after_prehandle.value()(); kvs.releasePreHandledSnapshot(rg, tmt); return std::make_tuple(kvs.getRegion(region_id), prehandle_result); } diff --git a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h index e346b5155f6..bcabd825a75 100644 --- a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h +++ b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h @@ -242,7 +242,7 @@ struct MockRaftStoreProxy : MutexLockWrap uint64_t index, uint64_t term, std::optional deadline_index, - bool cancel_after_prehandle); + std::optional> cancel_after_prehandle); std::tuple snapshot( KVStore & kvs, TMTContext & tmt, @@ -251,7 +251,7 @@ struct MockRaftStoreProxy : MutexLockWrap uint64_t index, uint64_t term, std::optional deadline_index, - bool cancel_after_prehandle = false); + std::optional> cancel_after_prehandle = std::nullopt); void doApply( KVStore & kvs, @@ -266,6 +266,57 @@ struct MockRaftStoreProxy : MutexLockWrap void clear(); + struct WriteCmdsViewHolder + { + WriteCmdsViewHolder( + std::vector kk, + std::vector vv, + std::vector tt, + std::vector ff) + { + kholder = std::move(kk); + vholder = std::move(vv); + for (size_t i = 0; i < kholder.size(); i++) + { + kbuff.push_back(strIntoView(&kholder[i])); + } + for (size_t i = 0; i < vholder.size(); i++) + { + vbuff.push_back(strIntoView(&vholder[i])); + } + cmd_type = std::move(tt); + cmd_cf = std::move(ff); + } + + const BaseBuffView * getKeys() const { return kbuff.data(); } + + const BaseBuffView * getVals() const { return vbuff.data(); } + + const WriteCmdType * getTypes() const { return cmd_type.data(); } + + const ColumnFamilyType * getCfs() const { return cmd_cf.data(); } + + std::vector kholder; + std::vector vholder; + std::vector kbuff; + std::vector vbuff; + std::vector cmd_type; + std::vector cmd_cf; + }; + + static std::tuple> createWriteCmdsView( + std::vector keys, + std::vector vals, + std::vector cmd_types, + std::vector cmd_cf) + { + std::shared_ptr holder + = std::make_shared(keys, vals, cmd_types, cmd_cf); + return std::make_tuple( + WriteCmdsView{holder->getKeys(), holder->getVals(), holder->getTypes(), holder->getCfs(), keys.size()}, + holder); + } + std::pair generateTiKVKeyValue(uint64_t tso, int64_t t) const; MockRaftStoreProxy() diff --git a/dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp b/dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp index 2c1aec5d265..ebb844711c7 100644 --- a/dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp +++ b/dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp @@ -154,7 +154,11 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args) : RecordKVFormat::encodeWriteCfValue(Region::PutFlag, prewrite_ts, value); TiKVKey commit_key = RecordKVFormat::appendTs(key, commit_ts); - region->insert(ColumnFamilyType::Write, std::move(commit_key), std::move(commit_value)); + region->insertFromSnap( + context.getTMTContext(), + ColumnFamilyType::Write, + std::move(commit_key), + std::move(commit_value)); } MockTiKV::instance().getRaftIndex(region_id); } diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index 940b5348a86..2448d2d4605 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -784,7 +784,7 @@ void handleApplySnapshot( std::optional deadline_index, TMTContext & tmt) { - auto new_region = kvstore.genRegionPtr(std::move(region), peer_id, index, term); + auto new_region = kvstore.genRegionPtr(std::move(region), peer_id, index, term, tmt.getRegionTable()); auto prehandle_result = kvstore.preHandleSnapshotToFiles(new_region, snaps, index, term, deadline_index, tmt); kvstore.applyPreHandledSnapshot( RegionPtrWithSnapshotFiles{new_region, std::move(prehandle_result.ingest_ids)}, diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 52d8ccb0e3d..0833b89465b 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -107,6 +108,7 @@ #include #endif +extern std::atomic tranquil_time_rss; namespace CurrentMetrics { @@ -955,7 +957,25 @@ int Server::main(const std::vector & /*args*/) /// Create TMTContext auto cluster_config = getClusterConfig(global_context->getSecurityConfig(), storage_config.api_version, log); global_context->createTMTContext(raft_config, std::move(cluster_config)); - proxy_machine.initKVStore(global_context->getTMTContext(), store_ident); + + // Must be executed before restore data. + // Get the memory usage of tranquil time. + auto [resident_set, cur_proc_num_threads, cur_virt_size] = process_mem_usage(); + UNUSED(cur_proc_num_threads); + tranquil_time_rss = static_cast(resident_set); + + auto kvs_watermark = settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity); + if (kvs_watermark == 0) + kvs_watermark = server_info.memory_info.capacity * 0.8; + LOG_INFO( + log, + "Global memory status: kvstore_high_watermark={} tranquil_time_rss={} cur_virt_size={} capacity={}", + kvs_watermark, + tranquil_time_rss, + cur_virt_size, + server_info.memory_info.capacity); + + proxy_machine.initKVStore(global_context->getTMTContext(), store_ident, kvs_watermark); global_context->getTMTContext().reloadConfig(config()); // setup the kv cluster for disagg compute node fetching config diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp index 92422543c9c..282920bbe79 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp @@ -194,7 +194,7 @@ Block SSTFilesToBlockInputStream::read() BaseBuffView key = write_cf_reader->keyView(); BaseBuffView value = write_cf_reader->valueView(); auto tikv_key = TiKVKey(key.data, key.len); - region->insert(ColumnFamilyType::Write, std::move(tikv_key), TiKVValue(value.data, value.len)); + region->insertFromSnap(tmt, ColumnFamilyType::Write, std::move(tikv_key), TiKVValue(value.data, value.len)); ++process_keys.write_cf; process_keys.write_cf_bytes += (key.len + value.len); if (process_keys.write_cf % opts.expected_size == 0) @@ -277,7 +277,12 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST( BaseBuffView key = reader->keyView(); BaseBuffView value = reader->valueView(); // TODO: use doInsert to avoid locking - region->insert(cf, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len), DupCheck::AllowSame); + region->insertFromSnap( + tmt, + cf, + TiKVKey(key.data, key.len), + TiKVValue(value.data, value.len), + DupCheck::AllowSame); (*p_process_keys) += 1; (*p_process_keys_bytes) += (key.len + value.len); reader->next(); @@ -337,7 +342,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST( BaseBuffView key = reader->keyView(); BaseBuffView value = reader->valueView(); // TODO: use doInsert to avoid locking - region->insert(cf, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len)); + region->insertFromSnap(tmt, cf, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len)); (*p_process_keys) += 1; (*p_process_keys_bytes) += (key.len + value.len); if (*p_process_keys == process_keys_offset_end) diff --git a/dbms/src/Storages/KVStore/Decode/RegionTable.cpp b/dbms/src/Storages/KVStore/Decode/RegionTable.cpp index 280e46cff12..03f4ef535e0 100644 --- a/dbms/src/Storages/KVStore/Decode/RegionTable.cpp +++ b/dbms/src/Storages/KVStore/Decode/RegionTable.cpp @@ -60,17 +60,13 @@ RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id, return it->second; } -RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const Region & region) -{ - const auto range = region.getRange(); - return insertRegion(table, *range, region.id()); -} - RegionTable::InternalRegion & RegionTable::insertRegion( Table & table, const RegionRangeKeys & region_range_keys, - const RegionID region_id) + const Region & region) { + auto region_id = region.id(); + region.setRegionTableCtx(table.ctx); auto keyspace_id = region_range_keys.getKeyspaceID(); auto & table_regions = table.internal_regions; // Insert table mapping. @@ -84,7 +80,7 @@ RegionTable::InternalRegion & RegionTable::insertRegion( region_id); // Insert region mapping. - regions[region_id] = KeyspaceTableID{keyspace_id, table.table_id}; + region_infos[region_id] = KeyspaceTableID{keyspace_id, table.table_id}; return it->second; } @@ -103,7 +99,7 @@ RegionTable::InternalRegion & RegionTable::getOrInsertRegion(const Region & regi if (auto it = table_regions.find(region.id()); it != table_regions.end()) return it->second; - return insertRegion(table, region); + return insertRegion(table, *region.getRange(), region); } RegionTable::RegionTable(Context & context_) @@ -113,9 +109,9 @@ RegionTable::RegionTable(Context & context_) void RegionTable::clear() { - regions.clear(); + region_infos.clear(); tables.clear(); - safe_ts_map.clear(); + safe_ts_mgr.clear(); } void RegionTable::restore() @@ -123,7 +119,7 @@ void RegionTable::restore() LOG_INFO(log, "RegionTable restore start"); const auto & tmt = context->getTMTContext(); - tmt.getKVStore()->traverseRegions([this](const RegionID, const RegionPtr & region) { updateRegion(*region); }); + tmt.getKVStore()->traverseRegions([this](const RegionID, const RegionPtr & region) { addRegion(*region); }); LOG_INFO(log, "RegionTable restore end, n_tables={}", tables.size()); } @@ -140,11 +136,8 @@ void RegionTable::removeTable(KeyspaceID keyspace_id, TableID table_id) // Remove from region list. for (const auto & region_info : table.internal_regions) { - regions.erase(region_info.first); - { - std::unique_lock write_lock(rw_lock); - safe_ts_map.erase(region_info.first); - } + region_infos.erase(region_info.first); + safe_ts_mgr.remove(region_info.first); } // Remove from table map. @@ -154,12 +147,46 @@ void RegionTable::removeTable(KeyspaceID keyspace_id, TableID table_id) LOG_INFO(log, "remove table from RegionTable success, keyspace={} table_id={}", keyspace_id, table_id); } -void RegionTable::updateRegion(const Region & region) +void RegionTable::addRegion(const Region & region) { std::lock_guard lock(mutex); getOrInsertRegion(region); } +void RegionTable::addPrehandlingRegion(const Region & region) +{ + std::lock_guard lock(mutex); + auto keyspace_id = region.getKeyspaceID(); + auto table_id = region.getMappedTableID(); + auto & table = getOrCreateTable(keyspace_id, table_id); + region.setRegionTableCtx(table.ctx); +} + +size_t RegionTable::getTableRegionSize(KeyspaceID keyspace_id, TableID table_id) const +{ + std::scoped_lock lock(mutex); + + auto it = tables.find(KeyspaceTableID{keyspace_id, table_id}); + if (it == tables.end()) + return 0; + const auto & table = it->second; + if (table.ctx) + return table.ctx->table_size; + return 0; +} + +void RegionTable::debugClearTableRegionSize(KeyspaceID keyspace_id, TableID table_id) +{ + std::scoped_lock lock(mutex); + + auto it = tables.find(KeyspaceTableID{keyspace_id, table_id}); + if (it == tables.end()) + return; + const auto & table = it->second; + if (table.ctx) + table.ctx->table_size = 0; +} + namespace { /// Remove obsolete data for table after data of `handle_range` is removed from this TiFlash node. @@ -209,11 +236,11 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const std::pair handle_range; { - /// We need to protect `regions` and `table` under mutex lock + /// We need to protect `region_infos` and `table` under mutex lock std::lock_guard lock(mutex); - auto it = regions.find(region_id); - if (it == regions.end()) + auto it = region_infos.find(region_id); + if (it == region_infos.end()) { LOG_WARNING(log, "region does not exist, region_id={}", region_id); return; @@ -224,11 +251,8 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const auto internal_region_it = table.internal_regions.find(region_id); handle_range = internal_region_it->second.range_in_table; - regions.erase(it); - { - std::unique_lock write_lock(rw_lock); - safe_ts_map.erase(region_id); - } + region_infos.erase(it); + safe_ts_mgr.remove(region_id); table.internal_regions.erase(internal_region_it); if (table.internal_regions.empty()) { @@ -251,13 +275,14 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const } } +// RaftCommands will directly call `writeCommittedByRegion`. RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtr & region) { const RegionID region_id = region->id(); const auto func_update_region = [&](std::function && callback) -> bool { std::lock_guard lock(mutex); - if (auto it = regions.find(region_id); it != regions.end()) + if (auto it = region_infos.find(region_id); it != region_infos.end()) { auto & internal_region = doGetInternalRegion(it->second, region_id); return callback(internal_region); @@ -396,15 +421,36 @@ void RegionTable::shrinkRegionRange(const Region & region) internal_region.range_in_table = region.getRange()->rawKeys(); } -void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeKeys & region_range_keys) +void RegionTable::replaceRegion(const RegionPtr & old_region, const RegionPtr & new_region) { - std::lock_guard lock(mutex); + const auto region_range_keys = new_region->getRange(); + // Extend region range to make sure data won't be removed. + extendRegionRange(*new_region, *region_range_keys); + if (old_region) + { + std::scoped_lock lock(mutex); + // `old_region` will no longer contribute to the memory of the table. + auto keyspace_id = region_range_keys->getKeyspaceID(); + auto table_id = region_range_keys->getMappedTableID(); + auto & table = getOrCreateTable(keyspace_id, table_id); + old_region->resetRegionTableCtx(); + if unlikely (!new_region->getRegionTableCtx()) + { + // For most of the cases, the region is prehandled, so the ctx is set at that moment. + new_region->setRegionTableCtx(table.ctx); + } + } +} +void RegionTable::extendRegionRange(const Region & region, const RegionRangeKeys & region_range_keys) +{ + std::lock_guard lock(mutex); + const RegionID region_id = region.id(); auto keyspace_id = region_range_keys.getKeyspaceID(); auto table_id = region_range_keys.getMappedTableID(); auto new_handle_range = region_range_keys.rawKeys(); - if (auto it = regions.find(region_id); it != regions.end()) + if (auto it = region_infos.find(region_id); it != region_infos.end()) { auto ks_tbl_id = KeyspaceTableID{keyspace_id, table_id}; RUNTIME_CHECK_MSG( @@ -443,7 +489,7 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK else { auto & table = getOrCreateTable(keyspace_id, table_id); - insertRegion(table, region_range_keys, region_id); + insertRegion(table, region_range_keys, region); LOG_INFO(log, "insert internal region, keyspace={} table_id={} region_id={}", keyspace_id, table_id, region_id); } } @@ -483,70 +529,4 @@ RegionPtrWithCheckpointInfo::RegionPtrWithCheckpointInfo(const Base & base_, Che , checkpoint_info(std::move(checkpoint_info_)) {} -bool RegionTable::isSafeTSLag(UInt64 region_id, UInt64 * leader_safe_ts, UInt64 * self_safe_ts) -{ - { - std::shared_lock lock(rw_lock); - auto it = safe_ts_map.find(region_id); - if (it == safe_ts_map.end()) - { - return false; - } - *leader_safe_ts = it->second->leader_safe_ts.load(std::memory_order_relaxed); - *self_safe_ts = it->second->self_safe_ts.load(std::memory_order_relaxed); - } - LOG_TRACE( - log, - "region_id={} table_id={} leader_safe_ts={} self_safe_ts={}", - region_id, - regions[region_id], - *leader_safe_ts, - *self_safe_ts); - return (*leader_safe_ts > *self_safe_ts) - && ((*leader_safe_ts >> TsoPhysicalShiftBits) - (*self_safe_ts >> TsoPhysicalShiftBits) > SafeTsDiffThreshold); -} - -UInt64 RegionTable::getSelfSafeTS(UInt64 region_id) const -{ - std::shared_lock lock(rw_lock); - auto it = safe_ts_map.find(region_id); - if (it == safe_ts_map.end()) - { - return 0; - } - return it->second->self_safe_ts.load(std::memory_order_relaxed); -} - -void RegionTable::updateSafeTS(UInt64 region_id, UInt64 leader_safe_ts, UInt64 self_safe_ts) -{ - { - std::shared_lock lock(rw_lock); - auto it = safe_ts_map.find(region_id); - if (it == safe_ts_map.end() && (leader_safe_ts == InvalidSafeTS || self_safe_ts == InvalidSafeTS)) - { - LOG_TRACE( - log, - "safe_ts_map empty but safe ts invalid, region_id={} leader_safe_ts={} self_safe_ts={}", - region_id, - leader_safe_ts, - self_safe_ts); - return; - } - if (it != safe_ts_map.end()) - { - if (leader_safe_ts != InvalidSafeTS) - { - it->second->leader_safe_ts.store(leader_safe_ts, std::memory_order_relaxed); - } - if (self_safe_ts != InvalidSafeTS) - { - it->second->self_safe_ts.store(self_safe_ts, std::memory_order_relaxed); - } - return; - } - } - std::unique_lock lock(rw_lock); - safe_ts_map.emplace(region_id, std::make_unique(leader_safe_ts, self_safe_ts)); -} - } // namespace DB diff --git a/dbms/src/Storages/KVStore/Decode/RegionTable.h b/dbms/src/Storages/KVStore/Decode/RegionTable.h index e17235be8e3..22f554752ce 100644 --- a/dbms/src/Storages/KVStore/Decode/RegionTable.h +++ b/dbms/src/Storages/KVStore/Decode/RegionTable.h @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include #include #include @@ -54,18 +56,6 @@ using CheckpointInfoPtr = std::shared_ptr; struct CheckpointIngestInfo; using CheckpointIngestInfoPtr = std::shared_ptr; -using SafeTS = UInt64; -enum : SafeTS -{ - InvalidSafeTS = std::numeric_limits::max(), -}; - -using TsoShiftBits = UInt64; -enum : TsoShiftBits -{ - TsoPhysicalShiftBits = 18, -}; - class RegionTable : private boost::noncopyable { public: @@ -89,23 +79,35 @@ class RegionTable : private boost::noncopyable { explicit Table(const TableID table_id_) : table_id(table_id_) + , ctx(createRegionTableCtx()) {} TableID table_id; InternalRegions internal_regions; + RegionTableCtxPtr ctx; }; explicit RegionTable(Context & context_); + + // Iterate over all regions in KVStore, and add them to RegionTable. void restore(); - void updateRegion(const Region & region); + // When a region is added to region table, happens when split and restore. + void addRegion(const Region & region); + + // Most of the regions are scheduled to TiFlash by a raft snapshot. + void addPrehandlingRegion(const Region & region); + + // When a reigon is removed out of TiFlash. + void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &); + + // Used by apply snapshot. + void replaceRegion(const RegionPtr & old_region, const RegionPtr & new_region); /// This functional only shrink the table range of this region_id void shrinkRegionRange(const Region & region); /// extend range for possible InternalRegion or add one. - void extendRegionRange(RegionID region_id, const RegionRangeKeys & region_range_keys); - - void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &); + void extendRegionRange(const Region & region, const RegionRangeKeys & region_range_keys); // Protects writeBlockByRegionAndFlush and ensures it's executed by only one thread at the same time. // Only one thread can do this at the same time. @@ -149,29 +151,12 @@ class RegionTable : private boost::noncopyable void clear(); -public: - // safe ts is maintained by check_leader RPC (https://github.com/tikv/tikv/blob/1ea26a2ac8761af356cc5c0825eb89a0b8fc9749/components/resolved_ts/src/advance.rs#L262), - // leader_safe_ts is the safe_ts in leader, leader will send to learner to advance safe_ts of learner, and TiFlash will record the safe_ts into safe_ts_map in check_leader RPC. - // self_safe_ts is the safe_ts in TiFlash learner. When TiFlash proxy receive from leader, TiFlash will update safe_ts_map when TiFlash has applied the raft log to applied_index. - struct SafeTsEntry - { - explicit SafeTsEntry(UInt64 leader_safe_ts, UInt64 self_safe_ts) - : leader_safe_ts(leader_safe_ts) - , self_safe_ts(self_safe_ts) - {} - std::atomic leader_safe_ts; - std::atomic self_safe_ts; - }; - using SafeTsEntryPtr = std::unique_ptr; - using SafeTsMap = std::unordered_map; - - void updateSafeTS(UInt64 region_id, UInt64 leader_safe_ts, UInt64 self_safe_ts); + size_t getTableRegionSize(KeyspaceID keyspace_id, TableID table_id) const; + void debugClearTableRegionSize(KeyspaceID keyspace_id, TableID table_id); - // unit: ms. If safe_ts diff is larger than 2min, we think the data synchronization progress is far behind the leader. - static const UInt64 SafeTsDiffThreshold = 2 * 60 * 1000; - bool isSafeTSLag(UInt64 region_id, UInt64 * leader_safe_ts, UInt64 * self_safe_ts); + SafeTsManager & safeTsMgr() { return safe_ts_mgr; } + const SafeTsManager & safeTsMgr() const { return safe_ts_mgr; } - UInt64 getSelfSafeTS(UInt64 region_id) const; private: friend class MockTiDB; @@ -180,8 +165,7 @@ class RegionTable : private boost::noncopyable Table & getOrCreateTable(KeyspaceID keyspace_id, TableID table_id); void removeTable(KeyspaceID keyspace_id, TableID table_id); InternalRegion & getOrInsertRegion(const Region & region); - InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, RegionID region_id); - InternalRegion & insertRegion(Table & table, const Region & region); + InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, const Region & region); InternalRegion & doGetInternalRegion(KeyspaceTableID ks_table_id, RegionID region_id); void addTableToIndex(KeyspaceID keyspace_id, TableID table_id); void removeTableFromIndex(KeyspaceID keyspace_id, TableID table_id); @@ -191,17 +175,16 @@ class RegionTable : private boost::noncopyable TableMap tables; using RegionInfoMap = std::unordered_map; - RegionInfoMap regions; + RegionInfoMap region_infos; using KeyspaceIndex = std::unordered_map, boost::hash>; KeyspaceIndex keyspace_index; - SafeTsMap safe_ts_map; - Context * const context; + SafeTsManager safe_ts_mgr; + mutable std::mutex mutex; - mutable std::shared_mutex rw_lock; LoggerPtr log; }; diff --git a/dbms/src/Storages/KVStore/Decode/RegionTable_fwd.h b/dbms/src/Storages/KVStore/Decode/RegionTable_fwd.h new file mode 100644 index 00000000000..cf520025665 --- /dev/null +++ b/dbms/src/Storages/KVStore/Decode/RegionTable_fwd.h @@ -0,0 +1,34 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +struct RegionTableCtx +{ + // KVStore size of all regions of this table, including the prehandling one. + // So, this size may be larger than the table's real size. + std::atomic_int64_t table_size; + std::atomic_bool warned; +}; +using RegionTableCtxPtr = std::shared_ptr; +inline RegionTableCtxPtr createRegionTableCtx() +{ + return std::make_shared(0, false); +} +} // namespace DB diff --git a/dbms/src/Storages/KVStore/Decode/SafeTsManager.cpp b/dbms/src/Storages/KVStore/Decode/SafeTsManager.cpp new file mode 100644 index 00000000000..76490d06eaa --- /dev/null +++ b/dbms/src/Storages/KVStore/Decode/SafeTsManager.cpp @@ -0,0 +1,83 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ +bool SafeTsManager::isSafeTSLag(RegionID region_id, SafeTS * leader_safe_ts, SafeTS * self_safe_ts) +{ + { + std::shared_lock lock(rw_lock); + auto it = safe_ts_map.find(region_id); + if (it == safe_ts_map.end()) + { + return false; + } + *leader_safe_ts = it->second->leader_safe_ts.load(std::memory_order_relaxed); + *self_safe_ts = it->second->self_safe_ts.load(std::memory_order_relaxed); + } + LOG_TRACE( + DB::Logger::get(), + "region_id={} leader_safe_ts={} self_safe_ts={}", + region_id, + *leader_safe_ts, + *self_safe_ts); + return (*leader_safe_ts > *self_safe_ts) + && ((*leader_safe_ts >> TsoPhysicalShiftBits) - (*self_safe_ts >> TsoPhysicalShiftBits) > SafeTsDiffThreshold); +} + +UInt64 SafeTsManager::getSelfSafeTS(RegionID region_id) const +{ + std::shared_lock lock(rw_lock); + auto it = safe_ts_map.find(region_id); + if (it == safe_ts_map.end()) + { + return 0; + } + return it->second->self_safe_ts.load(std::memory_order_relaxed); +} + +void SafeTsManager::updateSafeTS(RegionID region_id, SafeTS leader_safe_ts, SafeTS self_safe_ts) +{ + { + std::shared_lock lock(rw_lock); + auto it = safe_ts_map.find(region_id); + if (it == safe_ts_map.end() && (leader_safe_ts == InvalidSafeTS || self_safe_ts == InvalidSafeTS)) + { + LOG_TRACE( + DB::Logger::get(), + "safe_ts_map empty but safe ts invalid, region_id={} leader_safe_ts={} self_safe_ts={}", + region_id, + leader_safe_ts, + self_safe_ts); + return; + } + if (it != safe_ts_map.end()) + { + if (leader_safe_ts != InvalidSafeTS) + { + it->second->leader_safe_ts.store(leader_safe_ts, std::memory_order_relaxed); + } + if (self_safe_ts != InvalidSafeTS) + { + it->second->self_safe_ts.store(self_safe_ts, std::memory_order_relaxed); + } + return; + } + } + std::unique_lock lock(rw_lock); + safe_ts_map.emplace(region_id, std::make_unique(leader_safe_ts, self_safe_ts)); +} +} // namespace DB diff --git a/dbms/src/Storages/KVStore/Decode/SafeTsManager.h b/dbms/src/Storages/KVStore/Decode/SafeTsManager.h new file mode 100644 index 00000000000..2fb77aa6a22 --- /dev/null +++ b/dbms/src/Storages/KVStore/Decode/SafeTsManager.h @@ -0,0 +1,81 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +using SafeTS = UInt64; +enum : SafeTS +{ + InvalidSafeTS = std::numeric_limits::max(), +}; + +using TsoShiftBits = UInt64; +enum : TsoShiftBits +{ + TsoPhysicalShiftBits = 18, +}; + +struct SafeTsManager +{ + // safe ts is maintained by check_leader RPC (https://github.com/tikv/tikv/blob/1ea26a2ac8761af356cc5c0825eb89a0b8fc9749/components/resolved_ts/src/advance.rs#L262), + // leader_safe_ts is the safe_ts in leader, leader will send to learner to advance safe_ts of learner, and TiFlash will record the safe_ts into safe_ts_map in check_leader RPC. + // self_safe_ts is the safe_ts in TiFlash learner. When TiFlash proxy receive from leader, TiFlash will update safe_ts_map when TiFlash has applied the raft log to applied_index. + struct SafeTsEntry + { + explicit SafeTsEntry(SafeTS leader_safe_ts, SafeTS self_safe_ts) + : leader_safe_ts(leader_safe_ts) + , self_safe_ts(self_safe_ts) + {} + std::atomic leader_safe_ts; + std::atomic self_safe_ts; + }; + using SafeTsEntryPtr = std::unique_ptr; + using SafeTsMap = std::unordered_map; + + void updateSafeTS(RegionID region_id, SafeTS leader_safe_ts, SafeTS self_safe_ts); + + // unit: ms. If safe_ts diff is larger than 2min, we think the data synchronization progress is far behind the leader. + static const SafeTS SafeTsDiffThreshold = 2 * 60 * 1000; + bool isSafeTSLag(RegionID region_id, SafeTS * leader_safe_ts, SafeTS * self_safe_ts); + + UInt64 getSelfSafeTS(RegionID region_id) const; + + void remove(RegionID region_id) + { + std::unique_lock write_lock(rw_lock); + safe_ts_map.erase(region_id); + } + void clear() + { + std::unique_lock write_lock(rw_lock); + safe_ts_map.clear(); + } + +private: + SafeTsMap safe_ts_map; + mutable std::shared_mutex rw_lock; +}; +} // namespace DB diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index 0844e71230c..9f7ee6d1ac3 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -678,11 +678,10 @@ RawCppPtr PreHandleSnapshot( CHECK_PARSE_PB_BUFF(region, region_buff.data, region_buff.len); auto & tmt = *server->tmt; auto & kvstore = tmt.getKVStore(); - auto new_region = kvstore->genRegionPtr(std::move(region), peer_id, index, term); + auto new_region = kvstore->genRegionPtr(std::move(region), peer_id, index, term, tmt.getRegionTable()); #ifndef NDEBUG { - auto & kvstore = server->tmt->getKVStore(); auto state = kvstore->getProxyHelper()->getRegionLocalState(new_region->id()); assert(state.state() == raft_serverpb::PeerState::Applying); } @@ -997,7 +996,7 @@ void HandleSafeTSUpdate( { RUNTIME_CHECK(server->tmt != nullptr); RegionTable & region_table = server->tmt->getRegionTable(); - region_table.updateSafeTS(region_id, leader_safe_ts, self_safe_ts); + region_table.safeTsMgr().updateSafeTS(region_id, leader_safe_ts, self_safe_ts); } catch (...) { diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp index 23a07ca12dc..cb0a47942eb 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp @@ -116,7 +116,7 @@ HttpRequestRes HandleHttpRequestSyncStatus( { UInt64 leader_safe_ts; UInt64 self_safe_ts; - if (!region_table.isSafeTSLag(region.first, &leader_safe_ts, &self_safe_ts)) + if (!region_table.safeTsMgr().isSafeTSLag(region.first, &leader_safe_ts, &self_safe_ts)) { region_list.push_back(region.first); } diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index ac669db70a5..974f44f48fb 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -265,6 +265,8 @@ void KVStore::removeRegion( { auto manage_lock = genRegionMgrWriteLock(task_lock); auto it = manage_lock.regions.find(region_id); + // Disregister from region table size. + it->second->resetRegionTableCtx(); manage_lock.index.remove( it->second->makeRaftCommandDelegate(task_lock).getRange().comparableKeys(), region_id); // remove index @@ -443,11 +445,21 @@ size_t KVStore::getOngoingPrehandleSubtaskCount() const return std::max(0, prehandling_trace.ongoing_prehandle_subtask_count.load()); } -// Generate a temporary region pointer by the given meta -RegionPtr KVStore::genRegionPtr(metapb::Region && region, UInt64 peer_id, UInt64 index, UInt64 term) +// The only way that to create a region associated to a piece of data, even if it is not going to be inserted in to KVStore. +RegionPtr KVStore::genRegionPtr( + metapb::Region && region, + UInt64 peer_id, + UInt64 index, + UInt64 term, + std::optional> region_table) { auto meta = RegionMeta::genFromMetaRegion(std::move(region), peer_id, index, term); - return std::make_shared(std::move(meta), proxy_helper); + auto new_region = std::make_shared(std::move(meta), proxy_helper); + if (region_table) + { + region_table.value().get().addPrehandlingRegion(*new_region); + } + return new_region; } RegionTaskLock KVStore::genRegionTaskLock(UInt64 region_id) const diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index fb2a5b489f4..2fa57708634 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -157,8 +157,15 @@ class KVStore final : private boost::noncopyable RegionPtr getRegion(RegionID region_id) const; RegionMap getRegionsByRangeOverlap(const RegionRange & range) const; void traverseRegions(std::function && callback) const; - RegionPtr genRegionPtr(metapb::Region && region, UInt64 peer_id, UInt64 index, UInt64 term); + RegionPtr genRegionPtr( + metapb::Region && region, + UInt64 peer_id, + UInt64 index, + UInt64 term, + std::optional> region_table); void handleDestroy(UInt64 region_id, TMTContext & tmt); + void setKVStoreMemoryLimit(size_t s) { maximum_kvstore_memory = s; } + size_t getKVStoreMemoryLimit() const { return maximum_kvstore_memory; } public: // Raft Read and Write EngineStoreApplyRes handleAdminRaftCmd( @@ -417,6 +424,11 @@ class KVStore final : private boost::noncopyable ProxyConfigSummary proxy_config_summary; JointThreadInfoJeallocMapPtr joint_memory_allocation_map; + size_t maximum_kvstore_memory = 0; + +#ifdef DBMS_PUBLIC_GTEST + std::atomic debug_memory_limit_warning_count = 0; +#endif }; /// Encapsulation of lock guard of task mutex in KVStore diff --git a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp index d01c3aab43e..abf5100100f 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp @@ -274,10 +274,8 @@ void KVStore::onSnapshot( // 2. Dump data to RegionTable. { - const auto range = new_region_wrap->getRange(); auto & region_table = tmt.getRegionTable(); - // extend region to make sure data won't be removed. - region_table.extendRegionRange(region_id, *range); + region_table.replaceRegion(old_region, new_region_wrap.base); } // Register the new Region. diff --git a/dbms/src/Storages/KVStore/MultiRaft/IngestSST.cpp b/dbms/src/Storages/KVStore/MultiRaft/IngestSST.cpp index 2379bce6c3b..ede8a961308 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/IngestSST.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/IngestSST.cpp @@ -107,13 +107,13 @@ RegionPtr KVStore::handleIngestSSTByDTFile( if (index <= region->appliedIndex()) return nullptr; - // Create a tmp region to store uncommitted data + // Create a tmp region to store uncommitted data, the uncommitted data will be merged from `tmp_region` to the existing region. So here ignore the region_table. RegionPtr tmp_region; { auto meta_region = region->cloneMetaRegion(); auto meta_snap = region->dumpRegionMetaSnapshot(); auto peer_id = meta_snap.peer.id(); - tmp_region = genRegionPtr(std::move(meta_region), peer_id, index, term); + tmp_region = genRegionPtr(std::move(meta_region), peer_id, index, term, std::nullopt); } // Decode the KV pairs in ingesting SST into DTFiles diff --git a/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp b/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp index 001eea2e1d4..0103b460e5e 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp @@ -218,14 +218,17 @@ bool KVStore::canFlushRegionDataImpl( LOG_INFO( log, "{} flush region due to tryFlushRegionData, index {} term {} truncated_index {} truncated_term {}" - " gap {}/{}", + " gap {}/{} table_in_mem_size={} table_id={} keyspace={}", curr_region.toString(false), index, term, truncated_index, truncated_term, current_applied_gap, - gap_threshold); + gap_threshold, + curr_region_ptr->getRegionTableSize(), + curr_region_ptr->getMappedTableID(), + curr_region_ptr->getKeyspaceID()); GET_METRIC(tiflash_raft_region_flush_bytes, type_flushed).Observe(size_bytes); return forceFlushRegionDataImpl(curr_region, try_until_succeed, tmt, region_task_lock, index, term); } diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index db6e0956560..767d5edca21 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -281,6 +281,7 @@ PrehandleResult KVStore::preHandleSnapshotToFiles( auto ongoing = ongoing_prehandle_task_count.fetch_sub(1) - 1; new_region->afterPrehandleSnapshot(ongoing); }); + new_region->resetWarnMemoryLimitByTable(); PrehandleResult result = preHandleSSTsToDTFiles( // new_region, snaps, @@ -289,6 +290,7 @@ PrehandleResult KVStore::preHandleSnapshotToFiles( DM::FileConvertJobType::ApplySnapshot, tmt); result.stats.start_time = start_time; + new_region->resetWarnMemoryLimitByTable(); return result; } catch (DB::Exception & e) @@ -457,6 +459,7 @@ static void runInParallel( const size_t split_id = part_limit.split_id; auto part_new_region = std::make_shared(new_region->getMeta().clone(), proxy_helper); + part_new_region->setRegionTableCtx(new_region->getRegionTableCtx()); auto part_sst_stream = std::make_shared( part_new_region, prehandle_ctx.snapshot_index, diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp index a1883bca63b..63b32366d73 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp @@ -199,7 +199,7 @@ RegionID RegionRaftCommandDelegate::execCommitMerge( : source_region_meta_delegate.regionState().getRegion().end_key(); region_table.extendRegionRange( - id(), + *this, RegionRangeKeys(TiKVKey::copyFrom(new_start_key), TiKVKey::copyFrom(new_end_key))); } @@ -473,6 +473,11 @@ std::pair Region::handleWriteRaftCmd( handle_write_cmd_func(); } + if (default_put_key_count + lock_put_key_count > 0) + { + maybeWarnMemoryLimitByTable(tmt, "put"); + } + // If transfer-leader happened during ingest-sst, there might be illegal data. if (0 != cmds.len) { @@ -509,6 +514,10 @@ std::pair Region::handleWriteRaftCmd( fmt::join(entry_infos.begin(), entry_infos.end(), ":")); e.rethrow(); } + if (!data_list_to_remove.empty()) + { + resetWarnMemoryLimitByTable(); + } } meta.setApplied(index, term); diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp index 9e1031c7e63..4ce88ec3555 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp @@ -266,7 +266,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd( // update region_table first is safe, because the core rule is established: the range in RegionTable // is always >= range in KVStore. for (const auto & new_region : split_regions) - region_table.updateRegion(*new_region); + region_table.addRegion(*new_region); region_table.shrinkRegionRange(curr_region); } diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h index d84fba36cc7..4639ffe83b9 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h @@ -52,6 +52,10 @@ struct RegionDataMemDiff payload -= other.payload; decoded -= other.decoded; } + + Type total() const { return payload + decoded; } + + size_t totalSize() const { return payload + decoded; } }; enum class DupCheck diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index cf6d201d5ee..9f57c0d7c95 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -38,6 +38,12 @@ void RegionData::recordMemChange(const RegionDataMemDiff & delta) { root_of_kvstore_mem_trackers->free(-delta.payload); } + // A region splits and merges, but its derived region all belong to the same table. + // So we can summarize here rather than `updateMemoryUsage`. + if (region_table_ctx) + { + region_table_ctx->table_size += delta.total(); + } } void RegionData::updateMemoryUsage(const RegionDataMemDiff & delta) @@ -79,7 +85,7 @@ RegionDataMemDiff RegionData::insert(ColumnFamilyType cf, TiKVKey && key, TiKVVa return delta; } -void RegionData::remove(ColumnFamilyType cf, const TiKVKey & key) +RegionDataMemDiff RegionData::remove(ColumnFamilyType cf, const TiKVKey & key) { RegionDataMemDiff delta; switch (cf) @@ -110,6 +116,7 @@ void RegionData::remove(ColumnFamilyType cf, const TiKVKey & key) } recordMemChange(delta); updateMemoryUsage(delta); + return delta; } RegionData::WriteCFIter RegionData::removeDataByWriteIt(const WriteCFIter & write_it) @@ -281,6 +288,7 @@ size_t RegionData::totalSize() const void RegionData::assignRegionData(RegionData && rhs) { + auto size = rhs.resetRegionTableCtx(); recordMemChange(RegionDataMemDiff{-cf_data_size.load(), -decoded_data_size.load()}); resetMemoryUsage(); @@ -290,6 +298,7 @@ void RegionData::assignRegionData(RegionData && rhs) orphan_keys_info = std::move(rhs.orphan_keys_info); updateMemoryUsage(RegionDataMemDiff{rhs.cf_data_size.load(), rhs.decoded_data_size.load()}); + setRegionTableCtx(size); rhs.resetMemoryUsage(); } @@ -402,4 +411,57 @@ size_t RegionData::tryCompactionFilter(Timestamp safe_point) return del_write; } +void RegionData::setRegionTableCtx(RegionTableCtxPtr ctx) const +{ + region_table_ctx = ctx; + if (region_table_ctx) + { + region_table_ctx->table_size.fetch_add(dataSize()); + } +} + +RegionTableCtxPtr RegionData::getRegionTableCtx() const +{ + return region_table_ctx; +} + +RegionTableCtxPtr RegionData::resetRegionTableCtx() const +{ + if (region_table_ctx) + { + region_table_ctx->table_size.fetch_sub(dataSize()); + } + auto prev = region_table_ctx; + // The region no longer binds to a table. + region_table_ctx = nullptr; + return prev; +} + +size_t RegionData::getRegionTableSize() const +{ + if (region_table_ctx) + { + return region_table_ctx->table_size; + } + return 0; +} + +bool RegionData::getRegionTableWarned() const +{ + if (region_table_ctx) + { + return region_table_ctx->warned; + } + return false; +} + +bool RegionData::setRegionTableWarned(bool desired) const +{ + if (region_table_ctx) + { + return region_table_ctx->warned.exchange(desired); + } + return false; +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h index c623334faa4..2ae3810520d 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include @@ -39,7 +40,7 @@ class RegionData using LockInfoPtr = std::unique_ptr; RegionDataMemDiff insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny); - void remove(ColumnFamilyType cf, const TiKVKey & key); + RegionDataMemDiff remove(ColumnFamilyType cf, const TiKVKey & key); WriteCFIter removeDataByWriteIt(const WriteCFIter & write_it); std::optional readDataByWriteIt( @@ -84,6 +85,13 @@ class RegionData String summary() const; size_t tryCompactionFilter(Timestamp safe_point); + void setRegionTableCtx(RegionTableCtxPtr) const; + RegionTableCtxPtr getRegionTableCtx() const; + RegionTableCtxPtr resetRegionTableCtx() const; + size_t getRegionTableSize() const; + bool getRegionTableWarned() const; + bool setRegionTableWarned(bool) const; + struct OrphanKeysInfo { // Protected by region task lock. @@ -120,7 +128,7 @@ class RegionData private: // The memory difference to the KVStore. - static void recordMemChange(const RegionDataMemDiff &); + void recordMemChange(const RegionDataMemDiff &); // The memory difference to this Region. void updateMemoryUsage(const RegionDataMemDiff &); void resetMemoryUsage(); @@ -138,6 +146,7 @@ class RegionData std::atomic cf_data_size = 0; // Size of decoded structures for convenient access, considered as amplification in memory. std::atomic decoded_data_size = 0; + mutable RegionTableCtxPtr region_table_ctx; }; } // namespace DB diff --git a/dbms/src/Storages/KVStore/ProxyStateMachine.h b/dbms/src/Storages/KVStore/ProxyStateMachine.h index 232b6de57b4..bd424033d3b 100644 --- a/dbms/src/Storages/KVStore/ProxyStateMachine.h +++ b/dbms/src/Storages/KVStore/ProxyStateMachine.h @@ -276,19 +276,24 @@ struct ProxyStateMachine } } - void initKVStore(TMTContext & tmt_context, std::optional & store_ident) + void initKVStore( + TMTContext & tmt_context, + std::optional & store_ident, + size_t memory_limit) { + auto kvstore = tmt_context.getKVStore(); if (store_ident) { // Many service would depends on `store_id` when disagg is enabled. // setup the store_id restored from store_ident ASAP // FIXME: (bootstrap) we should bootstrap the tiflash node more early! - auto kvstore = tmt_context.getKVStore(); metapb::Store store_meta; store_meta.set_id(store_ident->store_id()); store_meta.set_node_state(metapb::NodeState::Preparing); kvstore->setStore(store_meta); } + kvstore->setKVStoreMemoryLimit(memory_limit); + LOG_INFO(log, "Set KVStore memory limit {}", memory_limit); } /// Restore TMTContext, including KVStore and RegionTable. diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index ee2df9cb4f0..3a6f5f2c82f 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -125,7 +125,7 @@ std::vector LearnerReadWorker::buildBatchReadIndexReq const RegionID region_id = region_to_query.region_id; // don't stale read in test scenarios. bool can_stale_read = mvcc_query_info.start_ts != std::numeric_limits::max() - && start_ts <= region_table.getSelfSafeTS(region_id); + && start_ts <= region_table.safeTsMgr().getSelfSafeTS(region_id); if (can_stale_read) { batch_read_index_result.emplace(region_id, kvrpcpb::ReadIndexResponse()); diff --git a/dbms/src/Storages/KVStore/Region.cpp b/dbms/src/Storages/KVStore/Region.cpp index bfbcbab45ee..41dddfd2ed5 100644 --- a/dbms/src/Storages/KVStore/Region.cpp +++ b/dbms/src/Storages/KVStore/Region.cpp @@ -28,6 +28,9 @@ #include #include +extern std::atomic real_rss; +std::atomic tranquil_time_rss; + namespace DB { RegionData::WriteCFIter Region::removeDataByWriteIt(const RegionData::WriteCFIter & write_it) @@ -56,15 +59,22 @@ LockInfoPtr Region::getLockInfo(const RegionLockReadQuery & query) const return data.getLockInfo(query); } -void Region::insert(const std::string & cf, TiKVKey && key, TiKVValue && value, DupCheck mode) +void Region::insertDebug(const std::string & cf, TiKVKey && key, TiKVValue && value, DupCheck mode) +{ + std::unique_lock lock(mutex); + doInsert(NameToCF(cf), std::move(key), std::move(value), mode); +} + +void Region::insertFromSnap(TMTContext & tmt, const std::string & cf, TiKVKey && key, TiKVValue && value, DupCheck mode) { - insert(NameToCF(cf), std::move(key), std::move(value), mode); + insertFromSnap(tmt, NameToCF(cf), std::move(key), std::move(value), mode); } -void Region::insert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode) +void Region::insertFromSnap(TMTContext & tmt, ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode) { std::unique_lock lock(mutex); doInsert(type, std::move(key), std::move(value), mode); + maybeWarnMemoryLimitByTable(tmt, "snapshot"); } RegionDataMemDiff Region::doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode) @@ -184,6 +194,11 @@ size_t Region::dataSize() const return data.dataSize(); } +size_t Region::totalSize() const +{ + return data.totalSize() + sizeof(RegionMeta); +} + size_t Region::writeCFCount() const { std::shared_lock lock(mutex); @@ -387,4 +402,57 @@ Timestamp Region::getLastObservedReadTso() const return last_observed_read_tso.load(); } +void Region::setRegionTableCtx(RegionTableCtxPtr ctx) const +{ + data.setRegionTableCtx(ctx); +} + +void Region::maybeWarnMemoryLimitByTable(TMTContext & tmt, const char * from) +{ + // If there are data flow in, we will check if the memory is exhaused. + auto limit = tmt.getKVStore()->getKVStoreMemoryLimit(); + size_t current = real_rss.load() > 0 ? real_rss.load() : 0; + if unlikely (limit == 0 || current == 0) + return; + /// Region management such as split/merge doesn't change the memory consumed by a table in KVStore. + /// The only cases memory is reduced in a table is removing regions, applying snaps and commiting txns. + /// The only cases memory is increased in a table is inserting kv pairs and applying snaps. + /// So, we only print once for a table, until one memory reduce event will happen. + if unlikely (current >= limit * 0.9) + { + auto table_size = getRegionTableSize(); + auto grown_memory = current > tranquil_time_rss ? current - tranquil_time_rss : 0; + // 15% of the total non-tranquil-time memory, but not exceed 10GB. + auto table_memory_limit = std::min(grown_memory * 0.15, 10 * 1024ULL * 1024 * 1024); + if (grown_memory && table_size > table_memory_limit) + { + if (!setRegionTableWarned(true)) + { + // If it is the first time. +#ifdef DBMS_PUBLIC_GTEST + tmt.getKVStore()->debug_memory_limit_warning_count++; +#endif + LOG_INFO( + log, + "Memory limit exceeded, current={} limit={} table_limit={} table_in_mem_size={} table_id={} " + "keyspace={} " + "region_id={} from={}", + current, + limit, + table_memory_limit, + table_size, + mapped_table_id, + keyspace_id, + id(), + from); + } + } + } +} + +void Region::resetWarnMemoryLimitByTable() const +{ + setRegionTableWarned(false); +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index da5ed6166d9..0fa0dbd478f 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -133,8 +134,19 @@ class Region : public std::enable_shared_from_this Region() = delete; ~Region(); - void insert(const std::string & cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny); - void insert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny); + void insertDebug(const std::string & cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny); + void insertFromSnap( + TMTContext & tmt, + ColumnFamilyType type, + TiKVKey && key, + TiKVValue && value, + DupCheck mode = DupCheck::Deny); + void insertFromSnap( + TMTContext & tmt, + const std::string & cf, + TiKVKey && key, + TiKVValue && value, + DupCheck mode = DupCheck::Deny); void remove(const std::string & cf, const TiKVKey & key); // Directly drop all data in this Region object. @@ -165,6 +177,8 @@ class Region : public std::enable_shared_from_this // Payload size in RegionData, show how much data flows in/out of the Region. size_t dataSize() const; + // How much memory the Region consumes. + size_t totalSize() const; size_t writeCFCount() const; std::string dataInfo() const; @@ -228,6 +242,17 @@ class Region : public std::enable_shared_from_this RegionData::OrphanKeysInfo & orphanKeysInfo() { return data.orphan_keys_info; } const RegionData::OrphanKeysInfo & orphanKeysInfo() const { return data.orphan_keys_info; } + // Bind a region to a RegionTable. It could not be bound to another table any more. + // All memory changes to this region would reflect to the binded table. + void setRegionTableCtx(RegionTableCtxPtr ctx) const; + RegionTableCtxPtr getRegionTableCtx() const { return data.getRegionTableCtx(); } + RegionTableCtxPtr resetRegionTableCtx() const { return data.resetRegionTableCtx(); } + size_t getRegionTableSize() const { return data.getRegionTableSize(); } + bool getRegionTableWarned() const { return data.getRegionTableWarned(); } + bool setRegionTableWarned(bool desired) const { return data.setRegionTableWarned(desired); } + void resetWarnMemoryLimitByTable() const; + void maybeWarnMemoryLimitByTable(TMTContext & tmt, const char * from); + public: // Raft Read and Write CommittedScanner createCommittedScanner(bool use_lock, bool need_value); CommittedRemover createCommittedRemover(bool use_lock = true); @@ -278,6 +303,7 @@ class Region : public std::enable_shared_from_this friend class tests::RegionKVStoreTest; friend struct RegionBench::DebugRegion; +private: // Private methods no need to lock mutex, normally // Returns the size of data change(inc or dec) diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index a6a1f3277b5..ca59f91c815 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -322,22 +322,26 @@ static void testRaftSplit(KVStore & kvs, TMTContext & tmt, std::unique_ptrinsert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); - region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); - region->insert( + region->insertFromSnap(tmt, "default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); // row with handle_id == 8 - region->insert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); - region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); - region->insert( + region->insertFromSnap(tmt, "default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); @@ -385,21 +389,25 @@ static void testRaftSplit(KVStore & kvs, TMTContext & tmt, std::unique_ptrinsert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); - region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); - region->insert( + region->insertFromSnap(tmt, "default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); - region->insert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); - region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); - region->insert( + region->insertFromSnap(tmt, "default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); @@ -450,12 +458,14 @@ void RegionKVStoreOldTest::testRaftMerge(Context & ctx, KVStore & kvs, TMTContex { // Region 1 with handle_id == 6 auto region = kvs.getRegion(target_region_id); - region->insert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 6), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); - region->insert("default", RecordKVFormat::genKey(table_id, 6, 5), TiKVValue("value1")); - region->insert( + region->insertFromSnap(tmt, "default", RecordKVFormat::genKey(table_id, 6, 5), TiKVValue("value1")); + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 6, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); @@ -464,12 +474,14 @@ void RegionKVStoreOldTest::testRaftMerge(Context & ctx, KVStore & kvs, TMTContex { // Region 7 with handle_id == 2 auto region = kvs.getRegion(source_region_id); - region->insert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 2), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); - region->insert("default", RecordKVFormat::genKey(table_id, 2, 5), TiKVValue("value1")); - region->insert( + region->insertFromSnap(tmt, "default", RecordKVFormat::genKey(table_id, 2, 5), TiKVValue("value1")); + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 2, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); @@ -604,6 +616,7 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) { auto & ctx = TiFlashTestEnv::getGlobalContext(); TableID table_id = 100; + auto & tmt = ctx.getTMTContext(); KVStore & kvs = getKVS(); UInt64 region_id = 1; proxy_instance->bootstrapWithRegion( @@ -634,12 +647,14 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) } { // Test read committed and lock with CommittedScanner. - region->insert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20, nullptr, 5)); - region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); - region->insert( + region->insertFromSnap(tmt, "default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); @@ -688,7 +703,8 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) region->clearAllData(); } { - region->insert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::LockType::Lock, "PK", 3, 20, nullptr, 5)); @@ -700,7 +716,8 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) region->clearAllData(); } { - region->insert( + region->insertFromSnap( + tmt, "lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::LockType::Pessimistic, "PK", 3, 20, nullptr, 5)); @@ -713,7 +730,8 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) } { // Test duplicate and tryCompactionFilter - region->insert( + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); @@ -723,7 +741,8 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) try { // insert duplicate records - region->insert( + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); @@ -742,7 +761,8 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) } { // Test read and delete committed Del record. - region->insert( + region->insertFromSnap( + tmt, "write", RecordKVFormat::genKey(table_id, 4, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::DelFlag, 5)); @@ -759,7 +779,7 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) } { ASSERT_EQ(0, region->dataSize()); - region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insertFromSnap(tmt, "default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); ASSERT_LT(0, region->dataSize()); region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); ASSERT_EQ(0, region->dataSize()); diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 3cbeac91cb7..08b5c9ed30b 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -853,7 +853,7 @@ try 11, 11, std::nullopt, - false); + std::nullopt); eventuallyPredicate([&]() { return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); }); @@ -975,7 +975,7 @@ try 10, 10, std::nullopt, - false); + std::nullopt); std::mutex exe_mut; std::unique_lock exe_lock(exe_mut); @@ -1059,7 +1059,7 @@ try 0, 0, std::nullopt, - false); + std::nullopt); } ASSERT_EQ(fap_context->tasks_trace->queryState(region_id), FAPAsyncTasks::TaskState::NotScheduled); diff --git a/dbms/src/Storages/KVStore/tests/gtest_learner_read.cpp b/dbms/src/Storages/KVStore/tests/gtest_learner_read.cpp index eeaecedfd6a..d2077ff541f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_learner_read.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_learner_read.cpp @@ -125,8 +125,8 @@ try // region_202 has no safe ts info, can not stale read // region_203 read index cache exist in the `mvcc_query_info` RegionTable region_table(global_ctx); - region_table.updateSafeTS(region_id_200, /*leader_safe_ts*/ 10005, /*self_safe_ts*/ 10005); - region_table.updateSafeTS(region_id_201, /*leader_safe_ts*/ 9995, /*self_safe_ts*/ 9995); + region_table.safeTsMgr().updateSafeTS(region_id_200, /*leader_safe_ts*/ 10005, /*self_safe_ts*/ 10005); + region_table.safeTsMgr().updateSafeTS(region_id_201, /*leader_safe_ts*/ 9995, /*self_safe_ts*/ 9995); RegionsReadIndexResult read_index_result; auto requests = buildBatchReadIndex(worker, region_table, snapshot, read_index_result); diff --git a/dbms/src/Storages/KVStore/tests/gtest_memory.cpp b/dbms/src/Storages/KVStore/tests/gtest_memory.cpp new file mode 100644 index 00000000000..59df4885f18 --- /dev/null +++ b/dbms/src/Storages/KVStore/tests/gtest_memory.cpp @@ -0,0 +1,621 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include // Included for `USE_JEMALLOC` + +extern std::shared_ptr root_of_kvstore_mem_trackers; +extern std::atomic real_rss; + + +namespace DB::tests +{ +using namespace DB::RecordKVFormat; + + +TEST_F(RegionKVStoreTest, MemoryTracker1) +try +{ + auto & ctx = TiFlashTestEnv::getGlobalContext(); + auto & tmt = ctx.getTMTContext(); + initStorages(); + KVStore & kvs = getKVS(); + ctx.getTMTContext().debugSetKVStore(kvstore); + auto table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); + MockRaftStoreProxy::FailCond cond; + const auto decoded_lock_size = sizeof(DecodedLockCFValue) + sizeof(DecodedLockCFValue::Inner); + + auto & region_table = ctx.getTMTContext().getRegionTable(); + auto getStartEnd = [&](RegionID region_id) { + return std::make_pair( + RecordKVFormat::genKey(table_id, region_id), + RecordKVFormat::genKey(table_id, region_id + 99)); + }; + auto pickKey = [&](RegionID region_id, UInt64 number) { + return RecordKVFormat::genKey(table_id, region_id + number, 111); + }; + auto pickWriteDefault = [&](RegionID, UInt64) { + return proxy_instance->generateTiKVKeyValue(111, 999); + }; + auto pickLock = [&](RegionID region_id, UInt64 number) { + return RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", region_id + number, 999) + .toString(); + }; + real_rss.store(1); + kvs.setKVStoreMemoryLimit(1); + { + // default + RegionID region_id = 4100; + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + auto kvr1 = kvs.getRegion(region_id); + auto [index, term] + = proxy_instance + ->rawWrite(region_id, {str_key}, {str_val_default}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + UNUSED(term); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + ASSERT_EQ(kvr1->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(kvr1->dataSize(), kvr1->getData().totalSize()); + ASSERT_EQ(kvr1->getData().totalSize(), region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 1); + } + { + // lock + root_of_kvstore_mem_trackers->reset(); + RegionID region_id = 4200; + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + auto kvr1 = kvs.getRegion(4100); + auto kvr2 = kvs.getRegion(region_id); + auto [index, term] + = proxy_instance + ->rawWrite(region_id, {str_key}, {str_lock_value}, {WriteCmdType::Put}, {ColumnFamilyType::Lock}); + UNUSED(term); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_lock_value.size()); + ASSERT_EQ(kvr2->dataSize(), str_key.dataSize() + str_lock_value.size()); + ASSERT_EQ(kvr2->dataSize() + decoded_lock_size, kvr2->getData().totalSize()); + ASSERT_EQ( + kvr2->getData().totalSize() + kvr1->getData().totalSize(), + region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 1); + } + { + // lock with largetxn + root_of_kvstore_mem_trackers->reset(); + RegionID region_id = 4300; + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + auto kvr1 = kvs.getRegion(4100); + auto kvr2 = kvs.getRegion(4200); + auto kvr3 = kvs.getRegion(region_id); + ASSERT_NE(kvr3, nullptr); + std::string shor_value = "value"; + auto lock_for_update_ts = 7777, txn_size = 1; + const std::vector & async_commit = {"s1", "s2"}; + const std::vector & rollback = {3, 4}; + auto lock_value2 = DB::RegionBench::encodeFullLockCfValue( + Region::DelFlag, + "primary key", + 421321, + std::numeric_limits::max(), + &shor_value, + 66666, + lock_for_update_ts, + txn_size, + async_commit, + rollback, + 1111); + auto [index, term] + = proxy_instance + ->rawWrite(region_id, {str_key}, {str_lock_value}, {WriteCmdType::Put}, {ColumnFamilyType::Lock}); + UNUSED(term); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_lock_value.size()); + ASSERT_EQ(kvr3->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(kvr3->dataSize() + decoded_lock_size, kvr3->getData().totalSize()); + ASSERT_EQ( + kvr3->getData().totalSize() + kvr2->getData().totalSize() + kvr1->getData().totalSize(), + region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 1); + } + { + // insert & remove + root_of_kvstore_mem_trackers->reset(); + RegionID region_id = 5000; + auto originTableSize = region_table.getTableRegionSize(NullspaceID, table_id); + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + region->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + auto delta = str_key.dataSize() + str_val_default.size(); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), delta); + ASSERT_EQ(region_table.getTableRegionSize(NullspaceID, table_id), originTableSize + delta); + region->remove("default", TiKVKey::copyFrom(str_key)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + ASSERT_EQ(region_table.getTableRegionSize(NullspaceID, table_id), originTableSize); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 1); + } + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); + { + // insert + RegionID region_id = 6000; + root_of_kvstore_mem_trackers->reset(); + region_table.debugClearTableRegionSize(NullspaceID, table_id); + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + region->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 1); + } + { + // commit + root_of_kvstore_mem_trackers->reset(); + region_table.debugClearTableRegionSize(NullspaceID, table_id); + + RegionID region_id = 8000; + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + region->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + region->insertFromSnap(tmt, "write", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_write)); + ASSERT_EQ( + str_key.dataSize() * 2 + str_val_default.size() + str_val_write.size(), + region_table.getTableRegionSize(NullspaceID, table_id)); + std::optional data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read); + ASSERT_EQ(1, data_list_read->size()); + RemoveRegionCommitCache(region, *data_list_read); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + ASSERT_EQ(0, region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 1); + } + { + // commit by handleWriteRaftCmd + root_of_kvstore_mem_trackers->reset(); + region_table.debugClearTableRegionSize(NullspaceID, table_id); + + RegionID region_id = 8100; + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + { + auto [view, holder] = MockRaftStoreProxy::createWriteCmdsView( + {str_key, str_key}, + {str_val_default, str_val_write}, + {WriteCmdType::Put, WriteCmdType::Put}, + {ColumnFamilyType::Default, ColumnFamilyType::Write}); + region->handleWriteRaftCmd(view, 66, 6, ctx.getTMTContext()); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + ASSERT_EQ(0, region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 1); + } + { + auto [view, holder] = MockRaftStoreProxy::createWriteCmdsView( + {str_key}, + {str_lock_value}, + {WriteCmdType::Put}, + {ColumnFamilyType::Lock}); + region->handleWriteRaftCmd(view, 67, 6, ctx.getTMTContext()); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 2); + } + } + { + // split & merge + root_of_kvstore_mem_trackers->reset(); + RegionID region_id = 12000; + RegionID region_id2 = 12002; + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 22); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 22); + auto str_lock_value = pickLock(region_id, 22); + region_table.debugClearTableRegionSize(NullspaceID, table_id); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + region->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + auto str_key2 = pickKey(region_id, 80); + auto [str_val_write2, str_val_default2] = pickWriteDefault(region_id, 80); + auto str_lock_value2 = pickLock(region_id, 80); + region->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_val_default2)); + auto original_size = region_table.getTableRegionSize(NullspaceID, table_id); + auto expected = str_key.dataSize() + str_val_default.size() + str_key2.dataSize() + str_val_default2.size(); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), expected); + auto new_region = splitRegion( + region, + RegionMeta( + createPeer(region_id + 1, true), + createRegionInfo( + region_id2, + RecordKVFormat::genKey(table_id, 12050), + RecordKVFormat::genKey(table_id, 12099)), + initialApplyState())); + ASSERT_EQ(original_size, region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), str_key.dataSize() + str_val_default.size()); + ASSERT_EQ(new_region->dataSize(), str_key2.dataSize() + str_val_default2.size()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + ASSERT_EQ(new_region->dataSize(), new_region->getData().totalSize()); + region->mergeDataFrom(*new_region); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), expected); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + ASSERT_EQ(new_region->dataSize(), new_region->getData().totalSize()); + ASSERT_EQ(original_size, region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 2); + } + { + // split & merge with lock + root_of_kvstore_mem_trackers->reset(); + region_table.debugClearTableRegionSize(NullspaceID, table_id); + RegionID region_id = 13100; + RegionID region_id2 = 13102; + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 22); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 22); + auto str_lock_value = pickLock(region_id, 22); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + region->insertFromSnap(tmt, "lock", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_lock_value)); + auto expected = str_key.dataSize() + str_lock_value.size(); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + auto str_key2 = pickKey(region_id, 80); + std::string short_value(97, 'a'); + auto str_lock_value2 + = RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 13180, 111, &short_value) + .toString(); + region->insertFromSnap(tmt, "lock", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_lock_value2)); + auto original_size = region_table.getTableRegionSize(NullspaceID, table_id); + expected += str_key2.dataSize() + str_lock_value2.size(); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + auto new_region = splitRegion( + region, + RegionMeta( + createPeer(region_id + 1, true), + createRegionInfo( + region_id2, + RecordKVFormat::genKey(table_id, 13150), + RecordKVFormat::genKey(table_id, 13199)), + initialApplyState())); + ASSERT_EQ(original_size, region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), str_key.dataSize() + str_lock_value.size()); + ASSERT_EQ(region->getData().totalSize(), region->dataSize() + decoded_lock_size); + ASSERT_EQ(new_region->dataSize(), str_key2.dataSize() + str_lock_value2.size()); + ASSERT_EQ(new_region->getData().totalSize(), new_region->dataSize() + decoded_lock_size); + region->mergeDataFrom(*new_region); + ASSERT_EQ(original_size, region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), expected); + ASSERT_EQ(region->getData().totalSize(), region->dataSize() + 2 * decoded_lock_size); + + // replace a lock + region->insertFromSnap(tmt, "lock", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_lock_value2)); + auto str_lock_value2_2 + = RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 13022, 111).toString(); + region->insertFromSnap(tmt, "lock", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_lock_value2_2)); + expected -= short_value.size(); + expected -= 2; // Short value prefix and length + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), expected); + ASSERT_EQ(region->getData().totalSize(), region->dataSize() + 2 * decoded_lock_size); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 2); + } + { + // insert & snapshot + UInt64 region_id = 14100; + region_table.debugClearTableRegionSize(NullspaceID, table_id); + root_of_kvstore_mem_trackers->reset(); + auto [start, end] = getStartEnd(region_id); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + ASSERT_NE(region, nullptr); + + auto str_key2 = pickKey(region_id, 20); + auto [str_val_write2, str_val_default2] = pickWriteDefault(region_id, 20); + auto str_lock_value2 = pickLock(region_id, 20); + + auto str_key3 = pickKey(region_id, 80); + auto [str_val_write3, str_val_default3] = pickWriteDefault(region_id, 80); + auto str_lock_value3 = pickLock(region_id, 80); + + region->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key3), TiKVValue::copyFrom(str_val_default3)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key3.dataSize() + str_val_default3.size()); + ASSERT_EQ(region->dataSize(), str_key3.dataSize() + str_val_default3.size()); + ASSERT_EQ(region->getData().totalSize(), region->dataSize()); + + MockSSTReader::getMockSSTData().clear(); + MockSSTGenerator default_cf{region_id, table_id, ColumnFamilyType::Default}; + default_cf.insert(14180, str_val_default2); + default_cf.finish_file(); + default_cf.freeze(); + kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface(); + proxy_instance->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf}, 0, 0, std::nullopt); + ASSERT_EQ(region->dataSize(), str_key2.dataSize() + str_val_default2.size()); + ASSERT_EQ(region->getData().totalSize(), region->dataSize()); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key2.dataSize() + str_val_default2.size()); + ASSERT_EQ(region->dataSize(), region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 3); + } + { + // prehandle snapshot and drop + UInt64 region_id = 14200; + region_table.debugClearTableRegionSize(NullspaceID, table_id); + root_of_kvstore_mem_trackers->reset(); + auto [start, end] = getStartEnd(region_id); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + ASSERT_NE(region, nullptr); + + auto str_key2 = pickKey(region_id, 20); + auto [str_val_write2, str_val_default2] = pickWriteDefault(region_id, 20); + auto str_lock_value2 = pickLock(region_id, 20); + + MockSSTReader::getMockSSTData().clear(); + MockSSTGenerator default_cf{region_id, table_id, ColumnFamilyType::Default}; + default_cf.insert(14280, str_val_default2); + default_cf.finish_file(); + default_cf.freeze(); + kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface(); + proxy_instance->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf}, 0, 0, std::nullopt, [&]() { + ASSERT_EQ( + region_table.getTableRegionSize(NullspaceID, table_id), + str_key2.dataSize() + str_val_default2.size()); + }); + ASSERT_EQ(region->dataSize(), 0); + ASSERT_EQ(region->getData().totalSize(), 0); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); + ASSERT_EQ(region_table.getTableRegionSize(NullspaceID, table_id), 0); + ASSERT_EQ(kvs.debug_memory_limit_warning_count, 4); + } + { + // assign + root_of_kvstore_mem_trackers->reset(); + region_table.debugClearTableRegionSize(NullspaceID, table_id); + RegionID region_id = 15100; + RegionID region_id2 = 15200; + auto [start1, end1] = getStartEnd(1100); + auto [start2, end2] = getStartEnd(1200); + proxy_instance + ->debugAddRegions(kvs, ctx.getTMTContext(), {region_id, region_id2}, {{start1, end1}, {start2, end2}}); + RegionPtr region = kvs.getRegion(region_id); + RegionPtr region2 = kvs.getRegion(region_id2); + + auto str_key = pickKey(region_id, 70); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 70); + auto str_lock_value = pickLock(region_id, 70); + + region->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + + auto str_key2 = pickKey(region_id, 80); + auto [str_val_write2, str_val_default2] = pickWriteDefault(region_id, 80); + auto str_lock_value2 = pickLock(region_id, 80); + region2->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_val_default2)); + region2->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + + region->assignRegion(std::move(*region2)); + ASSERT_EQ( + root_of_kvstore_mem_trackers->get(), + str_key.dataSize() + str_val_default.size() + str_key2.dataSize() + str_val_default2.size()); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region_table.getTableRegionSize(NullspaceID, table_id)); + ASSERT_EQ( + region->dataSize(), + str_key.dataSize() + str_val_default.size() + str_key2.dataSize() + str_val_default2.size()); + ASSERT_EQ(region->getData().totalSize(), region->dataSize()); + // `region2` is not allowed to access after move, however, we assert here in order to make sure the logic. + ASSERT_EQ(region2->dataSize(), 0); + ASSERT_EQ(region2->getData().totalSize(), region2->dataSize()); + } + { + // remove region + RegionID region_id = 16000; + root_of_kvstore_mem_trackers->reset(); + region_table.debugClearTableRegionSize(NullspaceID, table_id); + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + auto kvr1 = kvs.getRegion(region_id); + auto [index, term] + = proxy_instance + ->rawWrite(region_id, {str_key}, {str_val_default}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + UNUSED(term); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + kvs.handleDestroy(region_id, ctx.getTMTContext()); + ASSERT_EQ(0, region_table.getTableRegionSize(NullspaceID, table_id)); + } +} +CATCH + +TEST_F(RegionKVStoreTest, MemoryTracker2) +try +{ + auto & ctx = TiFlashTestEnv::getGlobalContext(); + auto & tmt = ctx.getTMTContext(); + initStorages(); + KVStore & kvs = getKVS(); + ctx.getTMTContext().debugSetKVStore(kvstore); + auto table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); + + auto & region_table = ctx.getTMTContext().getRegionTable(); + auto getStartEnd = [&](RegionID region_id) { + return std::make_pair( + RecordKVFormat::genKey(table_id, region_id), + RecordKVFormat::genKey(table_id, region_id + 99)); + }; + auto pickKey = [&](RegionID region_id, UInt64 number) { + return RecordKVFormat::genKey(table_id, region_id + number, 111); + }; + auto pickWriteDefault = [&](RegionID, UInt64) { + return proxy_instance->generateTiKVKeyValue(111, 999); + }; + auto pickLock = [&](RegionID region_id, UInt64 number) { + return RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", region_id + number, 999) + .toString(); + }; + { + // reload + RegionID region_id = 7000; + root_of_kvstore_mem_trackers->reset(); + region_table.debugClearTableRegionSize(NullspaceID, table_id); + auto [start, end] = getStartEnd(region_id); + auto str_key = pickKey(region_id, 1); + auto [str_val_write, str_val_default] = pickWriteDefault(region_id, 1); + auto str_lock_value = pickLock(region_id, 1); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{start, end}}); + RegionPtr region = kvs.getRegion(region_id); + root_of_kvstore_mem_trackers->reset(); + region->insertFromSnap(tmt, "default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + tryPersistRegion(kvs, region_id); + root_of_kvstore_mem_trackers->reset(); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); + reloadKVSFromDisk(false); + ctx.getTMTContext().debugSetKVStore(kvstore); + region_table.restore(); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + // Only this region is persisted. + ASSERT_EQ(region->dataSize(), region_table.getTableRegionSize(NullspaceID, table_id)); + } +} +CATCH + + +#if USE_JEMALLOC // following tests depends on jemalloc +TEST(FFIJemallocTest, JemallocThread) +try +{ + std::thread t2([&]() { + char * a = new char[888888]; + std::thread t1([&]() { + auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs(); + ASSERT_TRUE(allocated != nullptr); + ASSERT_EQ(*allocated, 0); + ASSERT_TRUE(deallocated != nullptr); + ASSERT_EQ(*deallocated, 0); + }); + t1.join(); + auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs(); + ASSERT_TRUE(allocated != nullptr); + ASSERT_GE(*allocated, 888888); + ASSERT_TRUE(deallocated != nullptr); + delete[] a; + }); + t2.join(); + + std::thread t3([&]() { + // Will not cover mmap memory. + auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs(); + char * a = new char[120]; + void * buf = mmap(nullptr, 6000, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + ASSERT_LT(*allocated, 6000); + munmap(buf, 0); + delete[] a; + }); + t3.join(); +} +CATCH + +TEST_F(RegionKVStoreTest, StorageBgPool) +try +{ + using namespace std::chrono_literals; + auto & ctx = TiFlashTestEnv::getGlobalContext(); + auto & pool = ctx.getBackgroundPool(); + const auto size = TiFlashTestEnv::DEFAULT_BG_POOL_SIZE; + std::atomic_bool b = false; + + JointThreadInfoJeallocMap & jm = *ctx.getJointThreadInfoJeallocMap(); + + size_t original_size + = TiFlashMetrics::instance().getStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "bg"); + + auto t = pool.addTask( + [&]() { + auto * x = new int[1000]; + LOG_INFO(Logger::get(), "allocated"); + while (!b.load()) + { + std::this_thread::sleep_for(1500ms); + } + delete[] x; + LOG_INFO(Logger::get(), "released"); + return false; + }, + false, + 5 * 60 * 1000); + std::this_thread::sleep_for(500ms); + + jm.recordThreadAllocInfo(); + + LOG_INFO(DB::Logger::get(), "bg pool size={}", size); + UInt64 r = TiFlashMetrics::instance().getStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "bg"); + ASSERT_GE(r, original_size + sizeof(int) * 1000); + jm.accessStorageMap([size](const JointThreadInfoJeallocMap::AllocMap & m) { + // There are some other bg thread pools + ASSERT_GE(m.size(), size) << m.size(); + }); + jm.accessProxyMap([](const JointThreadInfoJeallocMap::AllocMap & m) { ASSERT_EQ(m.size(), 0); }); + + b.store(true); + + ctx.getBackgroundPool().removeTask(t); +} +CATCH +#endif +} // namespace DB::tests diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index 32cf52cfe40..19a2d7c53f9 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -33,7 +33,7 @@ #include -extern std::shared_ptr root_of_kvstore_mem_trackers; + namespace DB::tests { using namespace DB::RecordKVFormat; @@ -72,276 +72,6 @@ try CATCH -TEST_F(RegionKVStoreTest, MemoryTracker) -try -{ - auto & ctx = TiFlashTestEnv::getGlobalContext(); - initStorages(); - KVStore & kvs = getKVS(); - auto table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); - auto start = RecordKVFormat::genKey(table_id, 0); - auto end = RecordKVFormat::genKey(table_id, 100); - auto str_key = RecordKVFormat::genKey(table_id, 1, 111); - auto [str_val_write, str_val_default] = proxy_instance->generateTiKVKeyValue(111, 999); - auto str_lock_value - = RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 111, 999).toString(); - MockRaftStoreProxy::FailCond cond; - const auto decoded_lock_size = sizeof(DecodedLockCFValue) + sizeof(DecodedLockCFValue::Inner); - proxy_instance->debugAddRegions( - kvs, - ctx.getTMTContext(), - {1, 2, 3}, - {{RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 10)}, - {RecordKVFormat::genKey(table_id, 11), RecordKVFormat::genKey(table_id, 20)}, - {RecordKVFormat::genKey(table_id, 21), RecordKVFormat::genKey(table_id, 30)}}); - - { - // default - auto region_id = 1; - auto kvr1 = kvs.getRegion(region_id); - auto [index, term] - = proxy_instance - ->rawWrite(region_id, {str_key}, {str_val_default}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); - UNUSED(term); - proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); - ASSERT_EQ(kvr1->dataSize(), root_of_kvstore_mem_trackers->get()); - ASSERT_EQ(kvr1->dataSize(), kvr1->getData().totalSize()); - } - { - // lock - root_of_kvstore_mem_trackers->reset(); - auto region_id = 2; - auto kvr1 = kvs.getRegion(region_id); - auto [index, term] - = proxy_instance - ->rawWrite(region_id, {str_key}, {str_lock_value}, {WriteCmdType::Put}, {ColumnFamilyType::Lock}); - UNUSED(term); - proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_lock_value.size()); - ASSERT_EQ(kvr1->dataSize(), root_of_kvstore_mem_trackers->get()); - ASSERT_EQ(kvr1->dataSize() + decoded_lock_size, kvr1->getData().totalSize()); - } - { - // lock with largetxn - root_of_kvstore_mem_trackers->reset(); - auto region_id = 3; - auto kvr1 = kvs.getRegion(region_id); - ASSERT_NE(kvr1, nullptr); - std::string shor_value = "value"; - auto lock_for_update_ts = 7777, txn_size = 1; - const std::vector & async_commit = {"s1", "s2"}; - const std::vector & rollback = {3, 4}; - auto lock_value2 = DB::RegionBench::encodeFullLockCfValue( - Region::DelFlag, - "primary key", - 421321, - std::numeric_limits::max(), - &shor_value, - 66666, - lock_for_update_ts, - txn_size, - async_commit, - rollback, - 1111); - auto [index, term] - = proxy_instance - ->rawWrite(region_id, {str_key}, {str_lock_value}, {WriteCmdType::Put}, {ColumnFamilyType::Lock}); - UNUSED(term); - proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_lock_value.size()); - ASSERT_EQ(kvr1->dataSize(), root_of_kvstore_mem_trackers->get()); - ASSERT_EQ(kvr1->dataSize() + decoded_lock_size, kvr1->getData().totalSize()); - } - { - // insert & remove - root_of_kvstore_mem_trackers->reset(); - RegionPtr region = tests::makeRegion(700, start, end, proxy_helper.get()); - region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); - region->remove("default", TiKVKey::copyFrom(str_key)); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); - ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); - ASSERT_EQ(region->dataSize(), region->getData().totalSize()); - } - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); - { - // insert - root_of_kvstore_mem_trackers->reset(); - RegionPtr region = tests::makeRegion(701, start, end, proxy_helper.get()); - region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); - ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); - ASSERT_EQ(region->dataSize(), region->getData().totalSize()); - } - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); - { - // reload - proxy_instance->debugAddRegions( - kvs, - ctx.getTMTContext(), - {702}, - {{RecordKVFormat::genKey(table_id, 7020), RecordKVFormat::genKey(table_id, 7030)}}); - root_of_kvstore_mem_trackers->reset(); - RegionPtr region = kvs.getRegion(702); - region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); - tryPersistRegion(kvs, 702); - root_of_kvstore_mem_trackers->reset(); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); - reloadKVSFromDisk(false); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); - ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); - ASSERT_EQ(region->dataSize(), region->getData().totalSize()); - } - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); - { - // commit - root_of_kvstore_mem_trackers->reset(); - RegionPtr region = tests::makeRegion(800, start, end, proxy_helper.get()); - region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); - region->insert("write", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_write)); - std::optional data_list_read = ReadRegionCommitCache(region, true); - ASSERT_TRUE(data_list_read); - ASSERT_EQ(1, data_list_read->size()); - RemoveRegionCommitCache(region, *data_list_read); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); - ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); - ASSERT_EQ(region->dataSize(), region->getData().totalSize()); - } - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); - { - // split & merge - root_of_kvstore_mem_trackers->reset(); - RegionPtr region = tests::makeRegion(900, start, end, proxy_helper.get()); - region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); - auto str_key2 = RecordKVFormat::genKey(table_id, 80, 111); - auto [str_val_write2, str_val_default2] = proxy_instance->generateTiKVKeyValue(111, 999); - region->insert("default", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_val_default2)); - auto expected = str_key.dataSize() + str_val_default.size() + str_key2.dataSize() + str_val_default2.size(); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - ASSERT_EQ(region->dataSize(), expected); - auto new_region = splitRegion( - region, - RegionMeta( - createPeer(901, true), - createRegionInfo(902, RecordKVFormat::genKey(table_id, 50), end), - initialApplyState())); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - ASSERT_EQ(region->dataSize(), str_key.dataSize() + str_val_default.size()); - ASSERT_EQ(new_region->dataSize(), str_key2.dataSize() + str_val_default2.size()); - ASSERT_EQ(region->dataSize(), region->getData().totalSize()); - ASSERT_EQ(new_region->dataSize(), new_region->getData().totalSize()); - region->mergeDataFrom(*new_region); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - ASSERT_EQ(region->dataSize(), expected); - ASSERT_EQ(region->dataSize(), region->getData().totalSize()); - ASSERT_EQ(new_region->dataSize(), new_region->getData().totalSize()); - } - { - // split & merge with lock - root_of_kvstore_mem_trackers->reset(); - RegionPtr region = tests::makeRegion(1000, start, end, proxy_helper.get()); - region->insert("lock", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_lock_value)); - auto expected = str_key.dataSize() + str_lock_value.size(); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - auto str_key2 = RecordKVFormat::genKey(table_id, 80, 111); - std::string short_value(97, 'a'); - auto str_lock_value2 - = RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 20, 111, &short_value) - .toString(); - region->insert("lock", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_lock_value2)); - expected += str_key2.dataSize() + str_lock_value2.size(); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - auto new_region = splitRegion( - region, - RegionMeta( - createPeer(1001, true), - createRegionInfo(1002, RecordKVFormat::genKey(table_id, 50), end), - initialApplyState())); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - ASSERT_EQ(region->dataSize(), str_key.dataSize() + str_lock_value.size()); - ASSERT_EQ(region->getData().totalSize(), region->dataSize() + decoded_lock_size); - ASSERT_EQ(new_region->dataSize(), str_key2.dataSize() + str_lock_value2.size()); - ASSERT_EQ(new_region->getData().totalSize(), new_region->dataSize() + decoded_lock_size); - region->mergeDataFrom(*new_region); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - ASSERT_EQ(region->dataSize(), expected); - ASSERT_EQ(region->getData().totalSize(), region->dataSize() + 2 * decoded_lock_size); - - // replace a lock - region->insert("lock", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_lock_value2)); - auto str_lock_value2_2 - = RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 20, 111).toString(); - region->insert("lock", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_lock_value2_2)); - expected -= short_value.size(); - expected -= 2; // Short value prefix and length - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - ASSERT_EQ(region->dataSize(), expected); - ASSERT_EQ(region->getData().totalSize(), region->dataSize() + 2 * decoded_lock_size); - } - { - // insert & snapshot - UInt64 region_id = 1100; - root_of_kvstore_mem_trackers->reset(); - proxy_instance->debugAddRegions( - kvs, - ctx.getTMTContext(), - {region_id}, - {{{RecordKVFormat::genKey(table_id, 1100), RecordKVFormat::genKey(table_id, 1200)}}}); - - RegionPtr region = kvs.getRegion(region_id); - ASSERT_NE(region, nullptr); - auto str_key2 = RecordKVFormat::genKey(table_id, 1120, 111); - auto [str_val_write2, str_val_default2] = proxy_instance->generateTiKVKeyValue(111, 999); - - auto str_key3 = RecordKVFormat::genKey(table_id, 1180, 111); - auto [str_val_write3, str_val_default3] = proxy_instance->generateTiKVKeyValue(111, 999); - - region->insert("default", TiKVKey::copyFrom(str_key3), TiKVValue::copyFrom(str_val_default3)); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key3.dataSize() + str_val_default3.size()); - ASSERT_EQ(region->dataSize(), str_key3.dataSize() + str_val_default3.size()); - ASSERT_EQ(region->getData().totalSize(), region->dataSize()); - - MockSSTReader::getMockSSTData().clear(); - MockSSTGenerator default_cf{region_id, table_id, ColumnFamilyType::Default}; - default_cf.insert(1180, str_val_default2); - default_cf.finish_file(); - default_cf.freeze(); - kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface(); - proxy_instance->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf}, 0, 0, std::nullopt); - ASSERT_EQ(region->dataSize(), str_key2.dataSize() + str_val_default2.size()); - ASSERT_EQ(region->getData().totalSize(), region->dataSize()); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key2.dataSize() + str_val_default2.size()); - } - { - // assign - root_of_kvstore_mem_trackers->reset(); - RegionPtr region = tests::makeRegion(1200, start, end, proxy_helper.get()); - region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); - - auto str_key2 = RecordKVFormat::genKey(table_id, 80, 111); - auto [str_val_write2, str_val_default2] = proxy_instance->generateTiKVKeyValue(111, 999); - RegionPtr region2 = tests::makeRegion(1201, start, end, proxy_helper.get()); - region2->insert("default", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_val_default2)); - region2->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); - - region->assignRegion(std::move(*region2)); - ASSERT_EQ( - root_of_kvstore_mem_trackers->get(), - str_key.dataSize() + str_val_default.size() + str_key2.dataSize() + str_val_default2.size()); - ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); - ASSERT_EQ(region->getData().totalSize(), region->dataSize()); - // `region2` is not allowed to access after move, however, we assert here in order to make sure the logic. - ASSERT_EQ(region2->dataSize(), 0); - ASSERT_EQ(region2->getData().totalSize(), region2->dataSize()); - } - ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); -} -CATCH - TEST_F(RegionKVStoreTest, KVStoreFailRecovery) try { @@ -879,7 +609,7 @@ try 0, 0, std::nullopt, - /*cancel_after_prehandle=*/true); + []() {}); } } } @@ -1208,89 +938,6 @@ try } CATCH -#if USE_JEMALLOC // following tests depends on jemalloc -TEST(FFIJemallocTest, JemallocThread) -try -{ - std::thread t2([&]() { - char * a = new char[888888]; - std::thread t1([&]() { - auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs(); - ASSERT_TRUE(allocated != nullptr); - ASSERT_EQ(*allocated, 0); - ASSERT_TRUE(deallocated != nullptr); - ASSERT_EQ(*deallocated, 0); - }); - t1.join(); - auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs(); - ASSERT_TRUE(allocated != nullptr); - ASSERT_GE(*allocated, 888888); - ASSERT_TRUE(deallocated != nullptr); - delete[] a; - }); - t2.join(); - - std::thread t3([&]() { - // Will not cover mmap memory. - auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs(); - char * a = new char[120]; - void * buf = mmap(nullptr, 6000, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - ASSERT_LT(*allocated, 6000); - munmap(buf, 0); - delete[] a; - }); - t3.join(); -} -CATCH - -TEST_F(RegionKVStoreTest, StorageBgPool) -try -{ - using namespace std::chrono_literals; - auto & ctx = TiFlashTestEnv::getGlobalContext(); - auto & pool = ctx.getBackgroundPool(); - const auto size = TiFlashTestEnv::DEFAULT_BG_POOL_SIZE; - std::atomic_bool b = false; - - JointThreadInfoJeallocMap & jm = *ctx.getJointThreadInfoJeallocMap(); - - size_t original_size - = TiFlashMetrics::instance().getStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "bg"); - - auto t = pool.addTask( - [&]() { - auto * x = new int[1000]; - LOG_INFO(Logger::get(), "allocated"); - while (!b.load()) - { - std::this_thread::sleep_for(1500ms); - } - delete[] x; - LOG_INFO(Logger::get(), "released"); - return false; - }, - false, - 5 * 60 * 1000); - std::this_thread::sleep_for(500ms); - - jm.recordThreadAllocInfo(); - - LOG_INFO(DB::Logger::get(), "bg pool size={}", size); - UInt64 r = TiFlashMetrics::instance().getStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "bg"); - ASSERT_GE(r, original_size + sizeof(int) * 1000); - jm.accessStorageMap([size](const JointThreadInfoJeallocMap::AllocMap & m) { - // There are some other bg thread pools - ASSERT_GE(m.size(), size) << m.size(); - }); - jm.accessProxyMap([](const JointThreadInfoJeallocMap::AllocMap & m) { ASSERT_EQ(m.size(), 0); }); - - b.store(true); - - ctx.getBackgroundPool().removeTask(t); -} -CATCH -#endif - TEST(ProxyMode, Normal) try { @@ -1371,8 +1018,17 @@ try // Overlap EXPECT_THROW( - proxy_instance - ->snapshot(kvs, ctx.getTMTContext(), 2, {default_cf}, make_meta(), peer_id, 0, 0, std::nullopt, false), + proxy_instance->snapshot( + kvs, + ctx.getTMTContext(), + 2, + {default_cf}, + make_meta(), + peer_id, + 0, + 0, + std::nullopt, + std::nullopt), Exception); LOG_INFO(log, "Set to applying"); @@ -1380,8 +1036,17 @@ try r1->mutState().set_state(raft_serverpb::PeerState::Applying); ASSERT_EQ(proxy_helper->getRegionLocalState(1).state(), raft_serverpb::PeerState::Applying); EXPECT_THROW( - proxy_instance - ->snapshot(kvs, ctx.getTMTContext(), 2, {default_cf}, make_meta(), peer_id, 0, 0, std::nullopt, false), + proxy_instance->snapshot( + kvs, + ctx.getTMTContext(), + 2, + {default_cf}, + make_meta(), + peer_id, + 0, + 0, + std::nullopt, + std::nullopt), Exception); @@ -1390,8 +1055,17 @@ try r1->mutState().set_state(raft_serverpb::PeerState::Applying); r1->mutState().mutable_region()->set_start_key(RecordKVFormat::genKey(table_id, 0)); r1->mutState().mutable_region()->set_end_key(RecordKVFormat::genKey(table_id, 1)); - proxy_instance - ->snapshot(kvs, ctx.getTMTContext(), 2, {default_cf}, make_meta(), peer_id, 0, 0, std::nullopt, false); + proxy_instance->snapshot( + kvs, + ctx.getTMTContext(), + 2, + {default_cf}, + make_meta(), + peer_id, + 0, + 0, + std::nullopt, + std::nullopt); } } CATCH diff --git a/dbms/src/Storages/KVStore/tests/gtest_region_block_reader.cpp b/dbms/src/Storages/KVStore/tests/gtest_region_block_reader.cpp index 8902e6305e0..174b19866bc 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_region_block_reader.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_region_block_reader.cpp @@ -665,7 +665,7 @@ try }; for (const auto & [k, v] : kvs) { - region->insert(ColumnFamilyType::Write, TiKVKey(bytesFromHexString(k)), TiKVValue(bytesFromHexString(v))); + region->insertDebug("write", TiKVKey(bytesFromHexString(k)), TiKVValue(bytesFromHexString(v))); } auto data_list_read = ReadRegionCommitCache(region, true); diff --git a/dbms/src/Storages/KVStore/tests/gtest_region_persister.cpp b/dbms/src/Storages/KVStore/tests/gtest_region_persister.cpp index beef2c03cb9..ace2d3c5bcf 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_region_persister.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_region_persister.cpp @@ -205,9 +205,9 @@ try TableID table_id = 100; auto region = makeTmpRegion(); TiKVKey key = RecordKVFormat::genKey(table_id, 323, 9983); - region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + region->insertDebug("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insertDebug("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insertDebug("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); region->updateRaftLogEagerIndex(1024); @@ -237,9 +237,9 @@ try TableID table_id = 100; auto region = makeTmpRegion(); TiKVKey key = RecordKVFormat::genKey(table_id, 323, 9983); - region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + region->insertDebug("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insertDebug("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insertDebug("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); region->updateRaftLogEagerIndex(1024); @@ -284,9 +284,9 @@ try } TiKVKey key = RecordKVFormat::genKey(table_id, 323, 9983); - region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + region->insertDebug("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insertDebug("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insertDebug("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); const auto path = dir_path + "/region_state.test"; WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); @@ -542,10 +542,9 @@ class RegionPersisterTest TEST_P(RegionPersisterTest, Concurrency) try { - RegionManager region_manager; - auto ctx = TiFlashTestEnv::getGlobalContext(); + RegionManager region_manager; RegionMap regions; const TableID table_id = 100; @@ -567,15 +566,11 @@ try auto region = makeRegion(createRegionMeta(region_100, table_id)); TiKVKey key = RecordKVFormat::genKey(table_id, region_100, diff++); - region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert( - ColumnFamilyType::Lock, - TiKVKey::copyFrom(key), - RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + region->insertDebug("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insertDebug("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insertDebug("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); persister.persist(*region, region_task_lock); - regions.emplace(region->id(), region); }); LOG_INFO(log, "paused before persisting region 100"); @@ -588,12 +583,9 @@ try auto region = makeRegion(createRegionMeta(region_101, table_id)); TiKVKey key = RecordKVFormat::genKey(table_id, region_101, diff++); - region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert( - ColumnFamilyType::Lock, - TiKVKey::copyFrom(key), - RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + region->insertDebug("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insertDebug("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insertDebug("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); persister.persist(*region, region_task_lock); @@ -633,12 +625,9 @@ try auto region = makeRegion(createRegionMeta(i, table_id)); TiKVKey key = RecordKVFormat::genKey(table_id, i, diff++); - region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert(ColumnFamilyType::Write, TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert( - ColumnFamilyType::Lock, - TiKVKey::copyFrom(key), - RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + region->insertDebug("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insertDebug("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insertDebug("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); persister.persist(*region, region_task_lock); @@ -698,10 +687,9 @@ CATCH TEST_P(RegionPersisterTest, LargeRegion) try { - RegionManager region_manager; - auto ctx = TiFlashTestEnv::getGlobalContext(); + RegionManager region_manager; const TableID table_id = 100; const RegionID region_id_base = 20; const String large_value(1024 * 512, 'v'); @@ -726,15 +714,9 @@ try break; } TiKVKey key = RecordKVFormat::genKey(table_id, handle_id, tso++); - region->insert(ColumnFamilyType::Default, TiKVKey::copyFrom(key), TiKVValue(large_value.data())); - region->insert( - ColumnFamilyType::Write, - TiKVKey::copyFrom(key), - RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert( - ColumnFamilyType::Lock, - TiKVKey::copyFrom(key), - RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + region->insertDebug("default", TiKVKey::copyFrom(key), TiKVValue(large_value.data())); + region->insertDebug("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insertDebug("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); handle_id += 1; } return region; diff --git a/dbms/src/Storages/KVStore/tests/gtest_sync_status.cpp b/dbms/src/Storages/KVStore/tests/gtest_sync_status.cpp index 36a9df765bf..62f11abc2f8 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_sync_status.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_sync_status.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -193,7 +194,10 @@ void makeRegionsLag(size_t lag_num) auto & tmt = TiFlashTestEnv::getContext()->getTMTContext(); for (size_t i = 0; i < lag_num; i++) { - tmt.getRegionTable().updateSafeTS(i, (RegionTable::SafeTsDiffThreshold + 1) << TsoPhysicalShiftBits, 0); + tmt.getRegionTable().safeTsMgr().updateSafeTS( + i, + (SafeTsManager::SafeTsDiffThreshold + 1) << TsoPhysicalShiftBits, + 0); } }