Skip to content

Commit

Permalink
Preload follower sys tables data and index roots (ydb-platform#5835)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored Jul 1, 2024
1 parent 24736e9 commit ada3760
Show file tree
Hide file tree
Showing 19 changed files with 689 additions and 165 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_boot_bundle.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ namespace NBoot {
void TryFinalize()
{
if (!LeftReads) {
for (auto req : Loader->Run()) {
for (auto req : Loader->Run(false)) {
LeftReads += Logic->LoadPages(this, req);
}
}
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tablet_flat/flat_dbase_sz_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace NTable {
auto *partStore = CheckedCast<const NTable::TPartStore*>(part);

auto info = partStore->PageCollections.at(groupId.Index).Get();
auto type = EPage(info->PageCollection->Page(pageId).Type);
auto type = info->GetPageType(pageId);

switch (type) {
case EPage::FlatIndex:
Expand All @@ -55,11 +55,11 @@ namespace NTable {
}

private:
void AddPageSize(TInfo *info, TPageId page) noexcept
void AddPageSize(TInfo *info, TPageId pageId) noexcept
{
if (Touched[info].insert(page).second) {
if (Touched[info].insert(pageId).second) {
Pages++;
Bytes += info->PageCollection->Page(page).Size;
Bytes += info->GetPageSize(pageId);
}
}

Expand Down
19 changes: 17 additions & 2 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ bool TExecutor::PrepareExternalPart(TPendingPartSwitch &partSwitch, TPendingPart
}

if (auto* stage = bundle.GetStage<TPendingPartSwitch::TLoaderStage>()) {
if (auto fetch = stage->Loader.Run()) {
if (auto fetch = stage->Loader.Run(PreloadTablesData.contains(partSwitch.TableId))) {
Y_ABORT_UNLESS(fetch.size() == 1, "Cannot handle loads from more than one page collection");

for (auto req : fetch) {
Expand Down Expand Up @@ -1935,6 +1935,17 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &

const std::pair<ui32, ui64> toLoad = PrivatePageCache->Request(pages, pad, pageCollectionInfo);
if (toLoad.first) {
if (auto logl = Logger->Log(ELnLev::Dbg03)) {
logl
<< NFmt::Do(*this) << " requests PageCollection " << pageCollectionInfo->PageCollection->Label()
<< " " << toLoad.second << " bytes, " << toLoad.first << " pages: [";
for (auto i : xrange(pages.size())) {
if (i != 0) logl << ", ";
logl << pages[i] << " " << ui32(pageCollectionInfo->GetPageType(pages[i]));
}
logl << "]";
}

auto *req = new NPageCollection::TFetch(0, pageCollectionInfo->PageCollection, std::move(pages), pad->GetWaitingTraceId());

loadPages += toLoad.first;
Expand Down Expand Up @@ -3469,7 +3480,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
const auto &newPart = result.Part;

TPageCollectionProtoHelper::Snap(snap, newPart, tableId, logicResult.Changes.NewPartsLevel);
TPageCollectionProtoHelper(true, true).Do(bySwitchAux->AddHotBundles(), newPart);
TPageCollectionProtoHelper(true, false).Do(bySwitchAux->AddHotBundles(), newPart);
}
}

Expand Down Expand Up @@ -4773,5 +4784,9 @@ void TExecutor::ApplyCompactionChanges(
}
}

void TExecutor::SetPreloadTablesData(THashSet<ui32> tables) {
PreloadTablesData = std::move(tables);
}

}
}
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ class TExecutor
TAutoPtr<NUtil::ILogger> Logger;

ui32 FollowerId = 0;
THashSet<ui32> PreloadTablesData;

// This becomes true when executor enables the use of leases, e.g. starts persisting them
// This may become false again when leases are not actively used for some time
Expand Down Expand Up @@ -687,6 +688,8 @@ class TExecutor

void RegisterExternalTabletCounters(TAutoPtr<TTabletCountersBase> appCounters) override;

virtual void SetPreloadTablesData(THashSet<ui32> tables) override;

static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::FLAT_EXECUTOR;
}
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tablet_flat/flat_executor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6264,7 +6264,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorBTreeIndex) {
// after restart we have no pages in private cache
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) }, true);
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 332);
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 330);
}

Y_UNIT_TEST(EnableLocalDBBtreeIndex_True) { // uses b-tree index
Expand Down Expand Up @@ -6302,7 +6302,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorBTreeIndex) {
// after restart we have no pages in private cache
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) }, true);
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 332);
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 330);
}

Y_UNIT_TEST(EnableLocalDBBtreeIndex_True_EnableLocalDBFlatIndex_False) { // uses b-tree index
Expand Down Expand Up @@ -6341,7 +6341,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorBTreeIndex) {
// after restart we have no pages in private cache
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) }, true);
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 332);
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 330);
}

Y_UNIT_TEST(EnableLocalDBBtreeIndex_False_EnableLocalDBFlatIndex_False) { // uses flat index
Expand Down Expand Up @@ -6468,7 +6468,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorBTreeIndex) {
// after restart we have no pages in private cache
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) }, true);
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 332);
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 330);
}

}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_ops_compact.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ namespace NTabletFlatExecutor {
{ },
std::move(result.Overlay));

auto fetch = loader.Run();
auto fetch = loader.Run(false);

Y_ABORT_UNLESS(!fetch, "Just compacted part needs to load some pages");

Expand Down
70 changes: 0 additions & 70 deletions ydb/core/tablet_flat/flat_part_keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,80 +3,10 @@
#include "flat_part_iface.h"
#include "flat_part_index_iter_iface.h"
#include "flat_part_slice.h"
#include "flat_sausage_fetch.h"
#include "flat_sausagecache.h"

namespace NKikimr {
namespace NTable {

class TKeysEnv : public IPages {
public:
using TCache = NTabletFlatExecutor::TPrivatePageCache::TInfo;

TKeysEnv(const TPart *part, TIntrusivePtr<TCache> cache)
: Part(part)
, Cache(std::move(cache))
{
}

TResult Locate(const TMemTable*, ui64, ui32) noexcept override
{
Y_ABORT("IPages::Locate(TMemTable*, ...) shouldn't be used here");
}

TResult Locate(const TPart*, ui64, ELargeObj) noexcept override
{
Y_ABORT("IPages::Locate(TPart*, ...) shouldn't be used here");
}

const TSharedData* TryGetPage(const TPart* part, TPageId pageId, TGroupId groupId) override
{
Y_ABORT_UNLESS(part == Part, "Unsupported part");
Y_ABORT_UNLESS(groupId.IsMain(), "Unsupported column group");

if (auto* extra = ExtraPages.FindPtr(pageId)) {
return extra;
} else if (auto* cached = Cache->Lookup(pageId)) {
// Save page in case it's evicted on the next iteration
ExtraPages[pageId] = *cached;
return cached;
} else {
NeedPages.insert(pageId);
return nullptr;
}
}

void Check(bool has) const noexcept
{
Y_ABORT_UNLESS(bool(NeedPages) == has, "Loader does not have some ne");
}

TAutoPtr<NPageCollection::TFetch> GetFetches()
{
if (NeedPages) {
TVector<TPageId> pages(NeedPages.begin(), NeedPages.end());
std::sort(pages.begin(), pages.end());
return new NPageCollection::TFetch{ 0, Cache->PageCollection, std::move(pages) };
} else {
return nullptr;
}
}

void Save(ui32 cookie, NSharedCache::TEvResult::TLoaded&& loaded) noexcept
{
if (cookie == 0 && NeedPages.erase(loaded.PageId)) {
ExtraPages[loaded.PageId] = TPinnedPageRef(loaded.Page).GetData();
Cache->Fill(std::move(loaded));
}
}

private:
const TPart* Part;
TIntrusivePtr<TCache> Cache;
THashMap<TPageId, TSharedData> ExtraPages;
THashSet<TPageId> NeedPages;
};

class TKeysLoader {
public:
explicit TKeysLoader(const TPart* part, IPages* env)
Expand Down
90 changes: 61 additions & 29 deletions ydb/core/tablet_flat/flat_part_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ TLoader::TLoader(TVector<TIntrusivePtr<TCache>> pageCollections,
if (Packs.size() < 1) {
Y_Fail("Cannot load TPart from " << Packs.size() << " page collections");
}
LoaderEnv = MakeHolder<TLoaderEnv>(Packs[0]);
}

TLoader::~TLoader() { }
Expand Down Expand Up @@ -153,25 +154,45 @@ TAutoPtr<NPageCollection::TFetch> TLoader::StageCreatePartView() noexcept
Y_ABORT_UNLESS(!PartView, "PartView already initialized in CreatePartView stage");
Y_ABORT_UNLESS(Packs && Packs.front());

TVector<TPageId> load;
for (auto page: { SchemeId, GlobsId,
SmallId, LargeId, ByKeyId,
GarbageStatsId, TxIdStatsId }) {
if (page != Max<TPageId>() && !Packs[0]->Lookup(page))
load.push_back(page);
auto getPage = [&](TPageId pageId) {
return pageId == Max<TPageId>()
? nullptr
: LoaderEnv->TryGetPage(nullptr, pageId, {});
};

if (BTreeGroupIndexes) {
// Note: preload root nodes only because we don't want to have multiple restarts here
for (const auto& meta : BTreeGroupIndexes) {
if (meta.LevelCount) getPage(meta.GetPageId());
}
for (const auto& meta : BTreeHistoricIndexes) {
if (meta.LevelCount) getPage(meta.GetPageId());
}
} else if (FlatGroupIndexes) {
for (auto indexPageId : FlatGroupIndexes) {
getPage(indexPageId);
}
for (auto indexPageId : FlatHistoricIndexes) {
getPage(indexPageId);
}
}

for (auto pageId: { SchemeId, GlobsId, SmallId, LargeId, ByKeyId, GarbageStatsId, TxIdStatsId }) {
Y_DEBUG_ABORT_UNLESS(pageId == Max<TPageId>() || NeedIn(Packs[0]->GetPageType(pageId)));
getPage(pageId);
}

if (load) {
return new NPageCollection::TFetch{ 0, Packs[0]->PageCollection, std::move(load) };
if (auto fetch = LoaderEnv->GetFetch()) {
return fetch;
}

auto *scheme = GetPage(SchemeId);
auto *large = GetPage(LargeId);
auto *small = GetPage(SmallId);
auto *blobs = GetPage(GlobsId);
auto *byKey = GetPage(ByKeyId);
auto *garbageStats = GetPage(GarbageStatsId);
auto *txIdStats = GetPage(TxIdStatsId);
auto *scheme = getPage(SchemeId);
auto *large = getPage(LargeId);
auto *small = getPage(SmallId);
auto *blobs = getPage(GlobsId);
auto *byKey = getPage(ByKeyId);
auto *garbageStats = getPage(GarbageStatsId);
auto *txIdStats = getPage(TxIdStatsId);

if (scheme == nullptr) {
Y_ABORT("Scheme page is not loaded");
Expand All @@ -198,7 +219,7 @@ TAutoPtr<NPageCollection::TFetch> TLoader::StageCreatePartView() noexcept
// Note: although we also have flat index, it shouldn't be loaded; so let's not count it here
} else {
for (auto indexPage : FlatGroupIndexes) {
indexesRawSize += GetPageSize(indexPage);
indexesRawSize += Packs[0]->GetPageSize(indexPage);
}
}

Expand Down Expand Up @@ -242,7 +263,7 @@ TAutoPtr<NPageCollection::TFetch> TLoader::StageCreatePartView() noexcept

PartView = { partStore, std::move(overlay.Screen), std::move(overlay.Slices) };

KeysEnv = new TKeysEnv(PartView.Part.Get(), TPartStore::Storages(PartView).at(0));
LoaderEnv->ProvidePart(PartView.Part.Get());

return nullptr;
}
Expand All @@ -256,17 +277,17 @@ TAutoPtr<NPageCollection::TFetch> TLoader::StageSliceBounds() noexcept
return nullptr;
}

KeysEnv->Check(false); /* ensure there is no pending pages to load */
LoaderEnv->EnsureNoNeedPages();

TKeysLoader loader(PartView.Part.Get(), KeysEnv.Get());
TKeysLoader loader(PartView.Part.Get(), LoaderEnv.Get());

if (auto run = loader.Do(PartView.Screen)) {
KeysEnv->Check(false); /* On success there shouldn't be left loads */
LoaderEnv->EnsureNoNeedPages(); /* On success there shouldn't be left loads */
PartView.Slices = std::move(run);
TOverlay{ PartView.Screen, PartView.Slices }.Validate();

return nullptr;
} else if (auto fetches = KeysEnv->GetFetches()) {
} else if (auto fetches = LoaderEnv->GetFetch()) {
return fetches;
} else {
Y_ABORT("Screen keys loader stalled without result");
Expand All @@ -288,17 +309,28 @@ void TLoader::StageDeltas() noexcept
}
}

void TLoader::Save(ui64 cookie, TArrayRef<NSharedCache::TEvResult::TLoaded> blocks) noexcept
TAutoPtr<NPageCollection::TFetch> TLoader::StagePreloadData() noexcept
{
auto partStore = PartView.As<TPartStore>();

// Note: preload works only for main group pages
auto total = partStore->PageCollections[0]->Total();

TVector<TPageId> toLoad(::Reserve(total));
for (TPageId pageId : xrange(total)) {
LoaderEnv->TryGetPage(PartView.Part.Get(), pageId, {});
}

return LoaderEnv->GetFetch();
}

void TLoader::Save(ui64 cookie, TArrayRef<NSharedCache::TEvResult::TLoaded> loadedPages) noexcept
{
Y_ABORT_UNLESS(cookie == 0, "Only the leader pack is used on load");

if (Stage == EStage::PartView) {
for (auto& loaded : blocks) {
Packs[0]->Fill(std::move(loaded), true);
}
} else if (Stage == EStage::Slice) {
for (auto& loaded : blocks) {
KeysEnv->Save(cookie, std::move(loaded));
if (Stage == EStage::PartView || Stage == EStage::Slice || Stage == EStage::PreloadData) {
for (auto& loaded : loadedPages) {
LoaderEnv->Save(cookie, std::move(loaded));
}
} else {
Y_Fail("Unexpected pages save on stage " << int(Stage));
Expand Down
Loading

0 comments on commit ada3760

Please sign in to comment.