Skip to content

Commit

Permalink
KVStore: Record decoded memory of each Region (#9780)
Browse files Browse the repository at this point in the history
ref #9722

Signed-off-by: Calvin Neo <[email protected]>

Co-authored-by: JaySon-Huang <[email protected]>
  • Loading branch information
CalvinNeo and JaySon-Huang authored Jan 25, 2025
1 parent a8f72d4 commit d27d7c0
Show file tree
Hide file tree
Showing 17 changed files with 525 additions and 418 deletions.
66 changes: 66 additions & 0 deletions dbms/src/Debug/MockKVStore/MockUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,70 @@ inline RegionPtr makeRegion(RegionMeta && meta)
return std::make_shared<Region>(std::move(meta), nullptr);
}

// Generates a lock value which fills all fields, only for test use.
inline TiKVValue encodeFullLockCfValue(
UInt8 lock_type,
const String & primary,
Timestamp ts,
UInt64 ttl,
const String * short_value,
Timestamp min_commit_ts,
Timestamp for_update_ts,
uint64_t txn_size,
const std::vector<std::string> & async_commit,
const std::vector<uint64_t> & rollback,
UInt64 generation = 0)
{
auto lock_value = RecordKVFormat::encodeLockCfValue(lock_type, primary, ts, ttl, short_value, min_commit_ts);
WriteBufferFromOwnString res;
res.write(lock_value.getStr().data(), lock_value.getStr().size());
{
res.write(RecordKVFormat::MIN_COMMIT_TS_PREFIX);
RecordKVFormat::encodeUInt64(min_commit_ts, res);
}
{
res.write(RecordKVFormat::FOR_UPDATE_TS_PREFIX);
RecordKVFormat::encodeUInt64(for_update_ts, res);
}
{
res.write(RecordKVFormat::TXN_SIZE_PREFIX);
RecordKVFormat::encodeUInt64(txn_size, res);
}
{
res.write(RecordKVFormat::ROLLBACK_TS_PREFIX);
TiKV::writeVarUInt(rollback.size(), res);
for (auto ts : rollback)
{
RecordKVFormat::encodeUInt64(ts, res);
}
}
{
res.write(RecordKVFormat::ASYNC_COMMIT_PREFIX);
TiKV::writeVarUInt(async_commit.size(), res);
for (const auto & s : async_commit)
{
writeVarInt(s.size(), res);
res.write(s.data(), s.size());
}
}
{
res.write(RecordKVFormat::LAST_CHANGE_PREFIX);
RecordKVFormat::encodeUInt64(12345678, res);
TiKV::writeVarUInt(87654321, res);
}
{
res.write(RecordKVFormat::TXN_SOURCE_PREFIX_FOR_LOCK);
TiKV::writeVarUInt(876543, res);
}
{
res.write(RecordKVFormat::PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX);
}
if (generation > 0)
{
res.write(RecordKVFormat::GENERATION_PREFIX);
RecordKVFormat::encodeUInt64(generation, res);
}
return TiKVValue(res.releaseStr());
}

} // namespace DB::RegionBench
13 changes: 8 additions & 5 deletions dbms/src/Storages/KVStore/Decode/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ namespace FailPoints
extern const char force_set_num_regions_for_table[];
} // namespace FailPoints

void RegionTable::InternalRegion::updateRegionCacheBytes(size_t cache_bytes_)
{
cache_bytes = cache_bytes_;
}

RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id, const TableID table_id)
{
auto ks_table_id = KeyspaceTableID{keyspace_id, table_id};
Expand Down Expand Up @@ -156,7 +161,7 @@ void RegionTable::updateRegion(const Region & region)
{
std::lock_guard lock(mutex);
auto & internal_region = getOrInsertRegion(region);
internal_region.cache_bytes = region.dataSize();
internal_region.updateRegionCacheBytes(region.dataSize());
}

namespace
Expand Down Expand Up @@ -308,9 +313,7 @@ RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtr & regi

func_update_region([&](InternalRegion & internal_region) -> bool {
internal_region.pause_flush = false;
internal_region.cache_bytes = region->dataSize();

internal_region.last_flush_time = Clock::now();
internal_region.updateRegionCacheBytes(region->dataSize());
return true;
});

Expand Down Expand Up @@ -380,7 +383,7 @@ void RegionTable::shrinkRegionRange(const Region & region)
std::lock_guard lock(mutex);
auto & internal_region = getOrInsertRegion(region);
internal_region.range_in_table = region.getRange()->rawKeys();
internal_region.cache_bytes = region.dataSize();
internal_region.updateRegionCacheBytes(region.dataSize());
}

void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeKeys & region_range_keys)
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/KVStore/Decode/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,14 @@ class RegionTable : private boost::noncopyable
, range_in_table(range_in_table_)
{}

void updateRegionCacheBytes(size_t);

RegionID region_id;
std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr> range_in_table;
bool pause_flush = false;

private:
Int64 cache_bytes = 0;
Timepoint last_flush_time = Clock::now();
};

using InternalRegions = std::unordered_map<RegionID, InternalRegion>;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,11 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
if unlikely (is_v2)
{
// There may be orphan default key in a snapshot.
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame);
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame).payload;
}
else
{
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny);
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny).payload;
}
}
catch (Exception & e)
Expand Down
Loading

0 comments on commit d27d7c0

Please sign in to comment.