Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Record table-wise memory usage and warn when it goes beyond the threshold #9835

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6fa43cf
initial
CalvinNeo Jan 27, 2025
a762486
clang-format
CalvinNeo Feb 5, 2025
38707c9
fix tests
CalvinNeo Feb 5, 2025
8def5f8
enhance test
CalvinNeo Feb 5, 2025
53e5cd5
fix table memory
CalvinNeo Feb 5, 2025
9583b5d
Merge branch 'master' into record-tablewise
CalvinNeo Feb 5, 2025
12745a4
fix test
CalvinNeo Feb 6, 2025
706d23e
Merge branch 'record-tablewise' of github.com:CalvinNeo/tics into rec…
CalvinNeo Feb 6, 2025
caa245b
rename
CalvinNeo Feb 7, 2025
2974c44
get table size
CalvinNeo Feb 7, 2025
523baab
introduce safe ts mgr
CalvinNeo Feb 7, 2025
7f67ccb
warn table
CalvinNeo Feb 7, 2025
11e285b
fmt
CalvinNeo Feb 7, 2025
49fe956
warn
CalvinNeo Feb 7, 2025
75898dd
remvoe helper
CalvinNeo Feb 7, 2025
280e031
Merge remote-tracking branch 'upstream/master' into record-tablewise
CalvinNeo Feb 8, 2025
9fb32de
rename
CalvinNeo Feb 8, 2025
dea150b
try record snapshot
CalvinNeo Feb 8, 2025
047bd37
add some tests
CalvinNeo Feb 8, 2025
47f833e
prehandle comsumes memory
CalvinNeo Feb 8, 2025
fcab661
fix tests
CalvinNeo Feb 8, 2025
e0de1c2
fmt
CalvinNeo Feb 8, 2025
2b2522f
fmt
CalvinNeo Feb 8, 2025
1383412
clippy
CalvinNeo Feb 8, 2025
28144fa
Merge remote-tracking branch 'upstream/master' into record-tablewise
CalvinNeo Feb 11, 2025
7693fb6
pick some
CalvinNeo Feb 12, 2025
8ec442a
picked
CalvinNeo Feb 12, 2025
888ce9d
Update dbms/src/Storages/KVStore/Decode/RegionTable.cpp
CalvinNeo Feb 12, 2025
cb02a8e
Update dbms/src/Storages/KVStore/Decode/SafeTsMgr.h
CalvinNeo Feb 12, 2025
540ab2c
format
CalvinNeo Feb 12, 2025
8e44a9f
Merge branch 'record-tablewise' of github.com:CalvinNeo/tics into rec…
CalvinNeo Feb 12, 2025
458274d
Update dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp
CalvinNeo Feb 12, 2025
2f1afc9
Update dbms/src/Storages/KVStore/Region.cpp
CalvinNeo Feb 12, 2025
5680b5d
fixprob
CalvinNeo Feb 12, 2025
e1c62c2
rename to ptr
CalvinNeo Feb 12, 2025
0291ac6
Merge branch 'record-tablewise' of github.com:CalvinNeo/tics into rec…
CalvinNeo Feb 12, 2025
8a1e7a4
f
CalvinNeo Feb 12, 2025
6ac0804
Merge remote-tracking branch 'upstream/master' into record-tablewise
CalvinNeo Feb 13, 2025
1707ae5
Fix CI lint; rename SafeTsMgr -> SafeTsManager
JaySon-Huang Feb 14, 2025
5c70111
Fix CI
JaySon-Huang Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void MockRaftStoreProxy::debugAddRegions(
auto region = tests::makeRegion(region_ids[i], ranges[i].first, ranges[i].second, kvs.getProxyHelper());
lock.regions.emplace(region_ids[i], region);
lock.index.add(region);
tmt.getRegionTable().updateRegion(*region);
tmt.getRegionTable().addRegion(*region);
}
}
}
Expand Down Expand Up @@ -601,7 +601,7 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
uint64_t index,
uint64_t term,
std::optional<uint64_t> deadline_index,
bool cancel_after_prehandle)
std::optional<std::function<void()>> cancel_after_prehandle)
{
auto old_kv_region = kvs.getRegion(region_id);
RUNTIME_CHECK(old_kv_region != nullptr);
Expand All @@ -628,7 +628,7 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
uint64_t index,
uint64_t term,
std::optional<uint64_t> deadline_index,
bool cancel_after_prehandle)
std::optional<std::function<void()>> cancel_after_prehandle)
{
auto region = getRegion(region_id);
RUNTIME_CHECK(region != nullptr);
Expand All @@ -640,10 +640,10 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
term = region->getLatestCommitTerm();
}

auto new_kv_region = kvs.genRegionPtr(std::move(region_meta), peer_id, index, term);
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);
new_kv_region->setApplied(index, term);
// Would set `appleid_index` in genRegionPtr.
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
auto new_kv_region = kvs.genRegionPtr(std::move(region_meta), peer_id, index, term, tmt, true);

std::vector<SSTView> ssts;
for (auto & cf : cfs)
Expand All @@ -663,6 +663,7 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
auto rg = RegionPtrWithSnapshotFiles{new_kv_region, std::vector(prehandle_result.ingest_ids)};
if (cancel_after_prehandle)
{
cancel_after_prehandle.value()();
kvs.releasePreHandledSnapshot(rg, tmt);
return std::make_tuple(kvs.getRegion(region_id), prehandle_result);
}
Expand Down
55 changes: 53 additions & 2 deletions dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ struct MockRaftStoreProxy : MutexLockWrap
uint64_t index,
uint64_t term,
std::optional<uint64_t> deadline_index,
bool cancel_after_prehandle);
std::optional<std::function<void()>> cancel_after_prehandle);
std::tuple<RegionPtr, PrehandleResult> snapshot(
KVStore & kvs,
TMTContext & tmt,
Expand All @@ -251,7 +251,7 @@ struct MockRaftStoreProxy : MutexLockWrap
uint64_t index,
uint64_t term,
std::optional<uint64_t> deadline_index,
bool cancel_after_prehandle = false);
std::optional<std::function<void()>> cancel_after_prehandle = std::nullopt);

void doApply(
KVStore & kvs,
Expand All @@ -266,6 +266,57 @@ struct MockRaftStoreProxy : MutexLockWrap

void clear();

struct WriteCmdsViewHolder
{
WriteCmdsViewHolder(
std::vector<std::string> kk,
std::vector<std::string> vv,
std::vector<WriteCmdType> tt,
std::vector<ColumnFamilyType> ff)
{
kholder = std::move(kk);
vholder = std::move(vv);
for (size_t i = 0; i < kholder.size(); i++)
{
kbuff.push_back(strIntoView(&kholder[i]));
}
for (size_t i = 0; i < vholder.size(); i++)
{
vbuff.push_back(strIntoView(&vholder[i]));
}
cmd_type = std::move(tt);
cmd_cf = std::move(ff);
}

const BaseBuffView * getKeys() const { return kbuff.data(); }

const BaseBuffView * getVals() const { return vbuff.data(); }

const WriteCmdType * getTypes() const { return cmd_type.data(); }

const ColumnFamilyType * getCfs() const { return cmd_cf.data(); }

std::vector<std::string> kholder;
std::vector<std::string> vholder;
std::vector<BaseBuffView> kbuff;
std::vector<BaseBuffView> vbuff;
std::vector<WriteCmdType> cmd_type;
std::vector<ColumnFamilyType> cmd_cf;
};

static std::tuple<WriteCmdsView, std::shared_ptr<WriteCmdsViewHolder>> createWriteCmdsView(
std::vector<std::string> keys,
std::vector<std::string> vals,
std::vector<WriteCmdType> cmd_types,
std::vector<ColumnFamilyType> cmd_cf)
{
std::shared_ptr<WriteCmdsViewHolder> holder
= std::make_shared<WriteCmdsViewHolder>(keys, vals, cmd_types, cmd_cf);
return std::make_tuple(
WriteCmdsView{holder->getKeys(), holder->getVals(), holder->getTypes(), holder->getCfs(), keys.size()},
holder);
}

std::pair<std::string, std::string> generateTiKVKeyValue(uint64_t tso, int64_t t) const;

MockRaftStoreProxy()
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args)
: RecordKVFormat::encodeWriteCfValue(Region::PutFlag, prewrite_ts, value);
TiKVKey commit_key = RecordKVFormat::appendTs(key, commit_ts);

region->insert(ColumnFamilyType::Write, std::move(commit_key), std::move(commit_value));
region->insertFromSnap(
context.getTMTContext(),
ColumnFamilyType::Write,
std::move(commit_key),
std::move(commit_value));
}
MockTiKV::instance().getRaftIndex(region_id);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ void handleApplySnapshot(
std::optional<uint64_t> deadline_index,
TMTContext & tmt)
{
auto new_region = kvstore.genRegionPtr(std::move(region), peer_id, index, term);
auto new_region = kvstore.genRegionPtr(std::move(region), peer_id, index, term, tmt, true);
auto prehandle_result = kvstore.preHandleSnapshotToFiles(new_region, snaps, index, term, deadline_index, tmt);
kvstore.applyPreHandledSnapshot(
RegionPtrWithSnapshotFiles{new_region, std::move(prehandle_result.ingest_ids)},
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Create TMTContext
auto cluster_config = getClusterConfig(global_context->getSecurityConfig(), storage_config.api_version, log);
global_context->createTMTContext(raft_config, std::move(cluster_config));
proxy_machine.initKVStore(global_context->getTMTContext(), store_ident);
proxy_machine.initKVStore(
global_context->getTMTContext(),
store_ident,
settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity));

global_context->getTMTContext().reloadConfig(config());
// setup the kv cluster for disagg compute node fetching config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ Block SSTFilesToBlockInputStream::read()
BaseBuffView key = write_cf_reader->keyView();
BaseBuffView value = write_cf_reader->valueView();
auto tikv_key = TiKVKey(key.data, key.len);
region->insert(ColumnFamilyType::Write, std::move(tikv_key), TiKVValue(value.data, value.len));
region->insertFromSnap(tmt, ColumnFamilyType::Write, std::move(tikv_key), TiKVValue(value.data, value.len));
++process_keys.write_cf;
process_keys.write_cf_bytes += (key.len + value.len);
if (process_keys.write_cf % opts.expected_size == 0)
Expand Down Expand Up @@ -277,7 +277,12 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
BaseBuffView key = reader->keyView();
BaseBuffView value = reader->valueView();
// TODO: use doInsert to avoid locking
region->insert(cf, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len), DupCheck::AllowSame);
region->insertFromSnap(
tmt,
cf,
TiKVKey(key.data, key.len),
TiKVValue(value.data, value.len),
DupCheck::AllowSame);
(*p_process_keys) += 1;
(*p_process_keys_bytes) += (key.len + value.len);
reader->next();
Expand Down Expand Up @@ -337,7 +342,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
BaseBuffView key = reader->keyView();
BaseBuffView value = reader->valueView();
// TODO: use doInsert to avoid locking
region->insert(cf, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len));
region->insertFromSnap(tmt, cf, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len));
(*p_process_keys) += 1;
(*p_process_keys_bytes) += (key.len + value.len);
if (*p_process_keys == process_keys_offset_end)
Expand Down
Loading