From ada37609c99f06f79c332b81643e66112e9c1a18 Mon Sep 17 00:00:00 2001 From: kungurtsev Date: Mon, 1 Jul 2024 12:08:33 +0200 Subject: [PATCH] Preload follower sys tables data and index roots (#5835) --- ydb/core/tablet_flat/flat_boot_bundle.h | 2 +- ydb/core/tablet_flat/flat_dbase_sz_env.h | 8 +- ydb/core/tablet_flat/flat_executor.cpp | 19 +- ydb/core/tablet_flat/flat_executor.h | 3 + ydb/core/tablet_flat/flat_executor_ut.cpp | 8 +- ydb/core/tablet_flat/flat_ops_compact.h | 2 +- ydb/core/tablet_flat/flat_part_keys.h | 70 --- ydb/core/tablet_flat/flat_part_loader.cpp | 90 ++-- ydb/core/tablet_flat/flat_part_loader.h | 98 +++- ydb/core/tablet_flat/flat_part_store.h | 4 +- ydb/core/tablet_flat/flat_sausagecache.cpp | 22 +- ydb/core/tablet_flat/flat_sausagecache.h | 33 +- ydb/core/tablet_flat/flat_scan_actor.h | 2 +- ydb/core/tablet_flat/tablet_flat_executor.h | 3 +- ydb/core/tablet_flat/ut/ut_slice_loader.cpp | 8 +- ydb/core/tx/datashard/datashard.cpp | 1 + ydb/core/tx/datashard/datashard__stats.cpp | 4 +- .../tx/datashard/datashard_ut_common_kqp.h | 2 +- .../tx/datashard/datashard_ut_followers.cpp | 475 +++++++++++++++++- 19 files changed, 689 insertions(+), 165 deletions(-) diff --git a/ydb/core/tablet_flat/flat_boot_bundle.h b/ydb/core/tablet_flat/flat_boot_bundle.h index 6bc94ed25247..657f67791bca 100644 --- a/ydb/core/tablet_flat/flat_boot_bundle.h +++ b/ydb/core/tablet_flat/flat_boot_bundle.h @@ -107,7 +107,7 @@ namespace NBoot { void TryFinalize() { if (!LeftReads) { - for (auto req : Loader->Run()) { + for (auto req : Loader->Run(false)) { LeftReads += Logic->LoadPages(this, req); } } diff --git a/ydb/core/tablet_flat/flat_dbase_sz_env.h b/ydb/core/tablet_flat/flat_dbase_sz_env.h index 02f8f172c6bd..a330ef171949 100644 --- a/ydb/core/tablet_flat/flat_dbase_sz_env.h +++ b/ydb/core/tablet_flat/flat_dbase_sz_env.h @@ -35,7 +35,7 @@ namespace NTable { auto *partStore = CheckedCast(part); auto info = partStore->PageCollections.at(groupId.Index).Get(); - auto type = EPage(info->PageCollection->Page(pageId).Type); + auto type = info->GetPageType(pageId); switch (type) { case EPage::FlatIndex: @@ -55,11 +55,11 @@ namespace NTable { } private: - void AddPageSize(TInfo *info, TPageId page) noexcept + void AddPageSize(TInfo *info, TPageId pageId) noexcept { - if (Touched[info].insert(page).second) { + if (Touched[info].insert(pageId).second) { Pages++; - Bytes += info->PageCollection->Page(page).Size; + Bytes += info->GetPageSize(pageId); } } diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 72f6bae938ee..8d73dd4cc04a 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1193,7 +1193,7 @@ bool TExecutor::PrepareExternalPart(TPendingPartSwitch &partSwitch, TPendingPart } if (auto* stage = bundle.GetStage()) { - if (auto fetch = stage->Loader.Run()) { + if (auto fetch = stage->Loader.Run(PreloadTablesData.contains(partSwitch.TableId))) { Y_ABORT_UNLESS(fetch.size() == 1, "Cannot handle loads from more than one page collection"); for (auto req : fetch) { @@ -1935,6 +1935,17 @@ void TExecutor::PostponeTransaction(TAutoPtr seat, TPageCollectionTxEnv & const std::pair toLoad = PrivatePageCache->Request(pages, pad, pageCollectionInfo); if (toLoad.first) { + if (auto logl = Logger->Log(ELnLev::Dbg03)) { + logl + << NFmt::Do(*this) << " requests PageCollection " << pageCollectionInfo->PageCollection->Label() + << " " << toLoad.second << " bytes, " << toLoad.first << " pages: ["; + for (auto i : xrange(pages.size())) { + if (i != 0) logl << ", "; + logl << pages[i] << " " << ui32(pageCollectionInfo->GetPageType(pages[i])); + } + logl << "]"; + } + auto *req = new NPageCollection::TFetch(0, pageCollectionInfo->PageCollection, std::move(pages), pad->GetWaitingTraceId()); loadPages += toLoad.first; @@ -3469,7 +3480,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled) const auto &newPart = result.Part; TPageCollectionProtoHelper::Snap(snap, newPart, tableId, logicResult.Changes.NewPartsLevel); - TPageCollectionProtoHelper(true, true).Do(bySwitchAux->AddHotBundles(), newPart); + TPageCollectionProtoHelper(true, false).Do(bySwitchAux->AddHotBundles(), newPart); } } @@ -4773,5 +4784,9 @@ void TExecutor::ApplyCompactionChanges( } } +void TExecutor::SetPreloadTablesData(THashSet tables) { + PreloadTablesData = std::move(tables); +} + } } diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 47c0ede8b1e7..68fee6558675 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -362,6 +362,7 @@ class TExecutor TAutoPtr Logger; ui32 FollowerId = 0; + THashSet PreloadTablesData; // This becomes true when executor enables the use of leases, e.g. starts persisting them // This may become false again when leases are not actively used for some time @@ -687,6 +688,8 @@ class TExecutor void RegisterExternalTabletCounters(TAutoPtr appCounters) override; + virtual void SetPreloadTablesData(THashSet tables) override; + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::FLAT_EXECUTOR; } diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp index ad7a7e308a3d..5f9552f6f833 100644 --- a/ydb/core/tablet_flat/flat_executor_ut.cpp +++ b/ydb/core/tablet_flat/flat_executor_ut.cpp @@ -6264,7 +6264,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorBTreeIndex) { // after restart we have no pages in private cache env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) }, true); UNIT_ASSERT_VALUES_EQUAL(readRows, 1000); - UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 332); + UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 330); } Y_UNIT_TEST(EnableLocalDBBtreeIndex_True) { // uses b-tree index @@ -6302,7 +6302,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorBTreeIndex) { // after restart we have no pages in private cache env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) }, true); UNIT_ASSERT_VALUES_EQUAL(readRows, 1000); - UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 332); + UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 330); } Y_UNIT_TEST(EnableLocalDBBtreeIndex_True_EnableLocalDBFlatIndex_False) { // uses b-tree index @@ -6341,7 +6341,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorBTreeIndex) { // after restart we have no pages in private cache env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) }, true); UNIT_ASSERT_VALUES_EQUAL(readRows, 1000); - UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 332); + UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 330); } Y_UNIT_TEST(EnableLocalDBBtreeIndex_False_EnableLocalDBFlatIndex_False) { // uses flat index @@ -6468,7 +6468,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorBTreeIndex) { // after restart we have no pages in private cache env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) }, true); UNIT_ASSERT_VALUES_EQUAL(readRows, 1000); - UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 332); + UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 330); } } diff --git a/ydb/core/tablet_flat/flat_ops_compact.h b/ydb/core/tablet_flat/flat_ops_compact.h index 61e1acedbeef..17e50618bd95 100644 --- a/ydb/core/tablet_flat/flat_ops_compact.h +++ b/ydb/core/tablet_flat/flat_ops_compact.h @@ -315,7 +315,7 @@ namespace NTabletFlatExecutor { { }, std::move(result.Overlay)); - auto fetch = loader.Run(); + auto fetch = loader.Run(false); Y_ABORT_UNLESS(!fetch, "Just compacted part needs to load some pages"); diff --git a/ydb/core/tablet_flat/flat_part_keys.h b/ydb/core/tablet_flat/flat_part_keys.h index aeaa649fae9c..bdfcee6bd6fa 100644 --- a/ydb/core/tablet_flat/flat_part_keys.h +++ b/ydb/core/tablet_flat/flat_part_keys.h @@ -3,80 +3,10 @@ #include "flat_part_iface.h" #include "flat_part_index_iter_iface.h" #include "flat_part_slice.h" -#include "flat_sausage_fetch.h" -#include "flat_sausagecache.h" namespace NKikimr { namespace NTable { - class TKeysEnv : public IPages { - public: - using TCache = NTabletFlatExecutor::TPrivatePageCache::TInfo; - - TKeysEnv(const TPart *part, TIntrusivePtr cache) - : Part(part) - , Cache(std::move(cache)) - { - } - - TResult Locate(const TMemTable*, ui64, ui32) noexcept override - { - Y_ABORT("IPages::Locate(TMemTable*, ...) shouldn't be used here"); - } - - TResult Locate(const TPart*, ui64, ELargeObj) noexcept override - { - Y_ABORT("IPages::Locate(TPart*, ...) shouldn't be used here"); - } - - const TSharedData* TryGetPage(const TPart* part, TPageId pageId, TGroupId groupId) override - { - Y_ABORT_UNLESS(part == Part, "Unsupported part"); - Y_ABORT_UNLESS(groupId.IsMain(), "Unsupported column group"); - - if (auto* extra = ExtraPages.FindPtr(pageId)) { - return extra; - } else if (auto* cached = Cache->Lookup(pageId)) { - // Save page in case it's evicted on the next iteration - ExtraPages[pageId] = *cached; - return cached; - } else { - NeedPages.insert(pageId); - return nullptr; - } - } - - void Check(bool has) const noexcept - { - Y_ABORT_UNLESS(bool(NeedPages) == has, "Loader does not have some ne"); - } - - TAutoPtr GetFetches() - { - if (NeedPages) { - TVector pages(NeedPages.begin(), NeedPages.end()); - std::sort(pages.begin(), pages.end()); - return new NPageCollection::TFetch{ 0, Cache->PageCollection, std::move(pages) }; - } else { - return nullptr; - } - } - - void Save(ui32 cookie, NSharedCache::TEvResult::TLoaded&& loaded) noexcept - { - if (cookie == 0 && NeedPages.erase(loaded.PageId)) { - ExtraPages[loaded.PageId] = TPinnedPageRef(loaded.Page).GetData(); - Cache->Fill(std::move(loaded)); - } - } - - private: - const TPart* Part; - TIntrusivePtr Cache; - THashMap ExtraPages; - THashSet NeedPages; - }; - class TKeysLoader { public: explicit TKeysLoader(const TPart* part, IPages* env) diff --git a/ydb/core/tablet_flat/flat_part_loader.cpp b/ydb/core/tablet_flat/flat_part_loader.cpp index 164903684b95..b031c698cce2 100644 --- a/ydb/core/tablet_flat/flat_part_loader.cpp +++ b/ydb/core/tablet_flat/flat_part_loader.cpp @@ -23,6 +23,7 @@ TLoader::TLoader(TVector> pageCollections, if (Packs.size() < 1) { Y_Fail("Cannot load TPart from " << Packs.size() << " page collections"); } + LoaderEnv = MakeHolder(Packs[0]); } TLoader::~TLoader() { } @@ -153,25 +154,45 @@ TAutoPtr TLoader::StageCreatePartView() noexcept Y_ABORT_UNLESS(!PartView, "PartView already initialized in CreatePartView stage"); Y_ABORT_UNLESS(Packs && Packs.front()); - TVector load; - for (auto page: { SchemeId, GlobsId, - SmallId, LargeId, ByKeyId, - GarbageStatsId, TxIdStatsId }) { - if (page != Max() && !Packs[0]->Lookup(page)) - load.push_back(page); + auto getPage = [&](TPageId pageId) { + return pageId == Max() + ? nullptr + : LoaderEnv->TryGetPage(nullptr, pageId, {}); + }; + + if (BTreeGroupIndexes) { + // Note: preload root nodes only because we don't want to have multiple restarts here + for (const auto& meta : BTreeGroupIndexes) { + if (meta.LevelCount) getPage(meta.GetPageId()); + } + for (const auto& meta : BTreeHistoricIndexes) { + if (meta.LevelCount) getPage(meta.GetPageId()); + } + } else if (FlatGroupIndexes) { + for (auto indexPageId : FlatGroupIndexes) { + getPage(indexPageId); + } + for (auto indexPageId : FlatHistoricIndexes) { + getPage(indexPageId); + } + } + + for (auto pageId: { SchemeId, GlobsId, SmallId, LargeId, ByKeyId, GarbageStatsId, TxIdStatsId }) { + Y_DEBUG_ABORT_UNLESS(pageId == Max() || NeedIn(Packs[0]->GetPageType(pageId))); + getPage(pageId); } - if (load) { - return new NPageCollection::TFetch{ 0, Packs[0]->PageCollection, std::move(load) }; + if (auto fetch = LoaderEnv->GetFetch()) { + return fetch; } - auto *scheme = GetPage(SchemeId); - auto *large = GetPage(LargeId); - auto *small = GetPage(SmallId); - auto *blobs = GetPage(GlobsId); - auto *byKey = GetPage(ByKeyId); - auto *garbageStats = GetPage(GarbageStatsId); - auto *txIdStats = GetPage(TxIdStatsId); + auto *scheme = getPage(SchemeId); + auto *large = getPage(LargeId); + auto *small = getPage(SmallId); + auto *blobs = getPage(GlobsId); + auto *byKey = getPage(ByKeyId); + auto *garbageStats = getPage(GarbageStatsId); + auto *txIdStats = getPage(TxIdStatsId); if (scheme == nullptr) { Y_ABORT("Scheme page is not loaded"); @@ -198,7 +219,7 @@ TAutoPtr TLoader::StageCreatePartView() noexcept // Note: although we also have flat index, it shouldn't be loaded; so let's not count it here } else { for (auto indexPage : FlatGroupIndexes) { - indexesRawSize += GetPageSize(indexPage); + indexesRawSize += Packs[0]->GetPageSize(indexPage); } } @@ -242,7 +263,7 @@ TAutoPtr TLoader::StageCreatePartView() noexcept PartView = { partStore, std::move(overlay.Screen), std::move(overlay.Slices) }; - KeysEnv = new TKeysEnv(PartView.Part.Get(), TPartStore::Storages(PartView).at(0)); + LoaderEnv->ProvidePart(PartView.Part.Get()); return nullptr; } @@ -256,17 +277,17 @@ TAutoPtr TLoader::StageSliceBounds() noexcept return nullptr; } - KeysEnv->Check(false); /* ensure there is no pending pages to load */ + LoaderEnv->EnsureNoNeedPages(); - TKeysLoader loader(PartView.Part.Get(), KeysEnv.Get()); + TKeysLoader loader(PartView.Part.Get(), LoaderEnv.Get()); if (auto run = loader.Do(PartView.Screen)) { - KeysEnv->Check(false); /* On success there shouldn't be left loads */ + LoaderEnv->EnsureNoNeedPages(); /* On success there shouldn't be left loads */ PartView.Slices = std::move(run); TOverlay{ PartView.Screen, PartView.Slices }.Validate(); return nullptr; - } else if (auto fetches = KeysEnv->GetFetches()) { + } else if (auto fetches = LoaderEnv->GetFetch()) { return fetches; } else { Y_ABORT("Screen keys loader stalled without result"); @@ -288,17 +309,28 @@ void TLoader::StageDeltas() noexcept } } -void TLoader::Save(ui64 cookie, TArrayRef blocks) noexcept +TAutoPtr TLoader::StagePreloadData() noexcept +{ + auto partStore = PartView.As(); + + // Note: preload works only for main group pages + auto total = partStore->PageCollections[0]->Total(); + + TVector toLoad(::Reserve(total)); + for (TPageId pageId : xrange(total)) { + LoaderEnv->TryGetPage(PartView.Part.Get(), pageId, {}); + } + + return LoaderEnv->GetFetch(); +} + +void TLoader::Save(ui64 cookie, TArrayRef loadedPages) noexcept { Y_ABORT_UNLESS(cookie == 0, "Only the leader pack is used on load"); - if (Stage == EStage::PartView) { - for (auto& loaded : blocks) { - Packs[0]->Fill(std::move(loaded), true); - } - } else if (Stage == EStage::Slice) { - for (auto& loaded : blocks) { - KeysEnv->Save(cookie, std::move(loaded)); + if (Stage == EStage::PartView || Stage == EStage::Slice || Stage == EStage::PreloadData) { + for (auto& loaded : loadedPages) { + LoaderEnv->Save(cookie, std::move(loaded)); } } else { Y_Fail("Unexpected pages save on stage " << int(Stage)); diff --git a/ydb/core/tablet_flat/flat_part_loader.h b/ydb/core/tablet_flat/flat_part_loader.h index 5f549fcf31c1..f7b53e89ff7f 100644 --- a/ydb/core/tablet_flat/flat_part_loader.h +++ b/ydb/core/tablet_flat/flat_part_loader.h @@ -13,8 +13,6 @@ namespace NKikimr { namespace NTable { - class TKeysEnv; - class TLoader { public: enum class EStage : ui8 { @@ -22,11 +20,87 @@ namespace NTable { PartView, Slice, Deltas, + PreloadData, Result, }; using TCache = NTabletFlatExecutor::TPrivatePageCache::TInfo; + struct TLoaderEnv : public IPages { + TLoaderEnv(TIntrusivePtr cache) + : Cache(std::move(cache)) + { + } + + TResult Locate(const TMemTable*, ui64, ui32) noexcept override + { + Y_ABORT("IPages::Locate(TMemTable*, ...) shouldn't be used here"); + } + + TResult Locate(const TPart*, ui64, ELargeObj) noexcept override + { + Y_ABORT("IPages::Locate(TPart*, ...) shouldn't be used here"); + } + + void ProvidePart(const TPart* part) noexcept + { + Y_ABORT_IF(Part); + Part = part; + } + + const TSharedData* TryGetPage(const TPart* part, TPageId pageId, TGroupId groupId) override + { + Y_ABORT_UNLESS(part == Part, "Unsupported part"); + Y_ABORT_UNLESS(groupId.IsMain(), "Unsupported column group"); + + if (auto* savedPage = SavedPages.FindPtr(pageId)) { + return savedPage; + } else if (auto* cached = Cache->Lookup(pageId)) { + // Save page in case it's evicted on the next iteration + SavedPages[pageId] = *cached; + return cached; + } else { + NeedPages.insert(pageId); + return nullptr; + } + } + + void EnsureNoNeedPages() const noexcept + { + Y_ABORT_UNLESS(!NeedPages); + } + + TAutoPtr GetFetch() + { + if (NeedPages) { + TVector pages(NeedPages.begin(), NeedPages.end()); + std::sort(pages.begin(), pages.end()); + return new NPageCollection::TFetch{ 0, Cache->PageCollection, std::move(pages) }; + } else { + return nullptr; + } + } + + void Save(ui32 cookie, NSharedCache::TEvResult::TLoaded&& loaded) noexcept + { + if (cookie == 0 && NeedPages.erase(loaded.PageId)) { + auto type = Cache->GetPageType(loaded.PageId); + SavedPages[loaded.PageId] = TPinnedPageRef(loaded.Page).GetData(); + if (type != EPage::FlatIndex) { + // hack: saving flat index to private cache will break sticky logic + // keep it in shared cache only for now + Cache->Fill(std::move(loaded), NeedIn(type)); + } + } + } + + private: + const TPart* Part = nullptr; + TIntrusivePtr Cache; + THashMap SavedPages; + THashSet NeedPages; + }; + TLoader(TPartComponents ou) : TLoader(TPartStore::Construct(std::move(ou.PageCollectionComponents)), std::move(ou.Legacy), @@ -42,7 +116,7 @@ namespace NTable { TEpoch epoch = NTable::TEpoch::Max()); ~TLoader(); - TVector> Run() + TVector> Run(bool preloadData) { while (Stage < EStage::Result) { TAutoPtr fetch; @@ -60,6 +134,11 @@ namespace NTable { case EStage::Deltas: StageDeltas(); break; + case EStage::PreloadData: + if (preloadData) { + fetch = StagePreloadData(); + } + break; default: break; } @@ -140,16 +219,6 @@ namespace NTable { (FlatGroupIndexes || BTreeGroupIndexes); } - const TSharedData* GetPage(TPageId page) noexcept - { - return page == Max() ? nullptr : Packs[0]->Lookup(page); - } - - size_t GetPageSize(TPageId page) noexcept - { - return Packs[0]->PageCollection->Page(page).Size; - } - void ParseMeta(TArrayRef plain) noexcept { TMemoryInput stream(plain.data(), plain.size()); @@ -162,6 +231,7 @@ namespace NTable { TAutoPtr StageCreatePartView() noexcept; TAutoPtr StageSliceBounds() noexcept; void StageDeltas() noexcept; + TAutoPtr StagePreloadData() noexcept; private: TVector> Packs; @@ -186,6 +256,6 @@ namespace NTable { TRowVersion MaxRowVersion; NProto::TRoot Root; TPartView PartView; - TAutoPtr KeysEnv; + THolder LoaderEnv; }; }} diff --git a/ydb/core/tablet_flat/flat_part_store.h b/ydb/core/tablet_flat/flat_part_store.h index 0f4b7af3eec5..daa26955e5b8 100644 --- a/ydb/core/tablet_flat/flat_part_store.h +++ b/ydb/core/tablet_flat/flat_part_store.h @@ -76,13 +76,13 @@ class TPartStore : public TPart, public IBundle { ui64 GetPageSize(NPage::TPageId pageId, NPage::TGroupId groupId) const override { Y_ABORT_UNLESS(groupId.Index < PageCollections.size()); - return PageCollections[groupId.Index]->PageCollection->Page(pageId).Size; + return PageCollections[groupId.Index]->GetPageSize(pageId); } NPage::EPage GetPageType(NPage::TPageId pageId, NPage::TGroupId groupId) const override { Y_ABORT_UNLESS(groupId.Index < PageCollections.size()); - return EPage(PageCollections[groupId.Index]->PageCollection->Page(pageId).Type); + return PageCollections[groupId.Index]->GetPageType(pageId); } ui8 GetGroupChannel(NPage::TGroupId groupId) const override diff --git a/ydb/core/tablet_flat/flat_sausagecache.cpp b/ydb/core/tablet_flat/flat_sausagecache.cpp index 088f0c7cbb95..469ce978537e 100644 --- a/ydb/core/tablet_flat/flat_sausagecache.cpp +++ b/ydb/core/tablet_flat/flat_sausagecache.cpp @@ -4,7 +4,7 @@ namespace NKikimr { namespace NTabletFlatExecutor { -TPrivatePageCache::TPage::TPage(size_t size, ui32 pageId, TInfo* info) +TPrivatePageCache::TPage::TPage(size_t size, TPageId pageId, TInfo* info) : LoadState(LoadStateNo) , Sticky(false) , SharedPending(false) @@ -155,7 +155,7 @@ TPrivatePageCache::TInfo* TPrivatePageCache::Info(TLogoBlobID id) { return nullptr; } -void TPrivatePageCache::MarkSticky(ui32 pageId, TInfo *collectionInfo) { +void TPrivatePageCache::MarkSticky(TPageId pageId, TInfo *collectionInfo) { TPage *page = collectionInfo->EnsurePage(pageId); if (Y_LIKELY(!page->Sticky)) { // N.B. the call site that marks pages as sticky starts to load them @@ -284,7 +284,7 @@ void TPrivatePageCache::TPrivatePageCache::TryEraseIfUnnecessary(TPage *page) { Stats.TotalExclusive -= page->Size; page->PinnedBody = { }; } - const ui32 pageId = page->Id; + const TPageId pageId = page->Id; auto* info = page->Info; Y_DEBUG_ABORT_UNLESS(info->PageMap[pageId].Get() == page); Y_ABORT_UNLESS(info->PageMap.erase(pageId)); @@ -304,9 +304,7 @@ void TPrivatePageCache::TPrivatePageCache::TryShareBody(TPage *page) { } } -const TSharedData* TPrivatePageCache::Lookup(ui32 pageId, TInfo *info) { - using EPage = NTable::NPage::EPage; - +const TSharedData* TPrivatePageCache::Lookup(TPageId pageId, TInfo *info) { TPage *page = info->EnsurePage(pageId); TryLoad(page); @@ -326,7 +324,7 @@ const TSharedData* TPrivatePageCache::Lookup(ui32 pageId, TInfo *info) { ToLoad.PushBack(page); // Note: we mark flat index pages sticky before we load them - if (!page->Sticky && EPage(info->PageCollection->Page(page->Id).Type) == EPage::FlatIndex) { + if (!page->Sticky && info->GetPageType(page->Id) == EPage::FlatIndex) { MarkSticky(page->Id, info); } @@ -335,7 +333,7 @@ const TSharedData* TPrivatePageCache::Lookup(ui32 pageId, TInfo *info) { return nullptr; } -TSharedPageRef TPrivatePageCache::LookupShared(ui32 pageId, TInfo *info) { +TSharedPageRef TPrivatePageCache::LookupShared(TPageId pageId, TInfo *info) { TPage *page = info->GetPage(pageId); if (!page) return { }; @@ -461,7 +459,7 @@ void TPrivatePageCache::UnpinPages(TPinned &pinned, size_t &unpinnedPages) { for (auto &xinfoid : pinned) { if (TPrivatePageCache::TInfo *info = Info(xinfoid.first)) { for (auto &x : xinfoid.second) { - ui32 pageId = x.first; + TPageId pageId = x.first; TPrivatePageCachePinPad *pad = x.second.Get(); x.second.Reset(); TPage *page = info->GetPage(pageId); @@ -504,7 +502,7 @@ void TPrivatePageCache::ResetTouchesAndToLoad(bool verifyEmpty) { Stats.CurrentCacheMisses = 0; } -void TPrivatePageCache::UpdateSharedBody(TInfo *info, ui32 pageId, TSharedPageRef shared) { +void TPrivatePageCache::UpdateSharedBody(TInfo *info, TPageId pageId, TSharedPageRef shared) { TPage *page = info->GetPage(pageId); if (!page) return; @@ -529,7 +527,7 @@ void TPrivatePageCache::UpdateSharedBody(TInfo *info, ui32 pageId, TSharedPageRe TryUnload(page); } -void TPrivatePageCache::DropSharedBody(TInfo *info, ui32 pageId) { +void TPrivatePageCache::DropSharedBody(TInfo *info, TPageId pageId) { TPage *page = info->GetPage(pageId); if (!page) return; @@ -605,7 +603,7 @@ THashMap> TPrivatePageCache return ret; } -THashMap> TPrivatePageCache::GetPrepareSharedTouched() { +THashMap> TPrivatePageCache::GetPrepareSharedTouched() { return std::move(ToTouchShared); } diff --git a/ydb/core/tablet_flat/flat_sausagecache.h b/ydb/core/tablet_flat/flat_sausagecache.h index 5ab2ce6c3f29..0677aa92da30 100644 --- a/ydb/core/tablet_flat/flat_sausagecache.h +++ b/ydb/core/tablet_flat/flat_sausagecache.h @@ -1,7 +1,6 @@ #pragma once -#include "defs.h" +#include "flat_page_iface.h" #include "flat_sausage_gut.h" -#include "flat_sausage_fetch.h" #include "shared_handle.h" #include "shared_cache_events.h" #include @@ -20,6 +19,8 @@ struct TPrivatePageCacheWaitPad : public TExplicitSimpleCounter { class TPrivatePageCache { using TPinned = THashMap>>; + using EPage = NTable::NPage::EPage; + using TPageId = NTable::NPage::TPageId; public: struct TInfo; @@ -53,7 +54,7 @@ class TPrivatePageCache { ui32 Sticky : 1; ui32 SharedPending : 1; - const ui32 Id; + const TPageId Id; const size_t Size; TInfo* const Info; @@ -62,7 +63,7 @@ class TPrivatePageCache { TSharedPageRef SharedBody; TSharedData PinnedBody; - TPage(size_t size, ui32 pageId, TInfo* info); + TPage(size_t size, TPageId pageId, TInfo* info); TPage(const TPage&) = delete; TPage(TPage&&) = delete; @@ -102,16 +103,24 @@ class TPrivatePageCache { return PageMap.size(); } - const TSharedData* Lookup(ui32 pageId) const noexcept { + const TSharedData* Lookup(TPageId pageId) const noexcept { auto* page = GetPage(pageId); return page ? page->GetPinnedBody() : nullptr; } - TPage* GetPage(ui32 pageId) const noexcept { + TPage* GetPage(TPageId pageId) const noexcept { return PageMap[pageId].Get(); } - TPage* EnsurePage(ui32 pageId) noexcept { + EPage GetPageType(TPageId pageId) const noexcept { + return EPage(PageCollection->Page(pageId).Type); + } + + ui64 GetPageSize(TPageId pageId) const noexcept { + return PageCollection->Page(pageId).Size; + } + + TPage* EnsurePage(TPageId pageId) noexcept { auto* page = GetPage(pageId); if (!page) { PageMap.emplace(pageId, THolder(page = new TPage(PageCollection->Page(pageId).Size, pageId, this))); @@ -146,7 +155,7 @@ class TPrivatePageCache { TInfo* Info(TLogoBlobID id); - void MarkSticky(ui32 pageId, TInfo *collectionInfo); + void MarkSticky(TPageId pageId, TInfo *collectionInfo); const TStats& GetStats() const { return Stats; } @@ -163,16 +172,16 @@ class TPrivatePageCache { THashMap> GetToLoad() const; void ResetTouchesAndToLoad(bool verifyEmpty); - void UpdateSharedBody(TInfo *collectionInfo, ui32 pageId, TSharedPageRef shared); - void DropSharedBody(TInfo *collectionInfo, ui32 pageId); + void UpdateSharedBody(TInfo *collectionInfo, TPageId pageId, TSharedPageRef shared); + void DropSharedBody(TInfo *collectionInfo, TPageId pageId); TPage::TWaitQueuePtr ProvideBlock(NSharedCache::TEvResult::TLoaded&& loaded, TInfo *collectionInfo); THashMap> DetachPrivatePageCache(); - THashMap> GetPrepareSharedTouched(); + THashMap> GetPrepareSharedTouched(); private: THashMap> PageCollections; - THashMap> ToTouchShared; + THashMap> ToTouchShared; TStats Stats; diff --git a/ydb/core/tablet_flat/flat_scan_actor.h b/ydb/core/tablet_flat/flat_scan_actor.h index 7a425842b9b3..ca719ca2372a 100644 --- a/ydb/core/tablet_flat/flat_scan_actor.h +++ b/ydb/core/tablet_flat/flat_scan_actor.h @@ -194,7 +194,7 @@ namespace NOps { } void RunLoader() { - for (auto req : Loader->Run()) { + for (auto req : Loader->Run(false)) { Send(Owner, new TEvPrivate::TEvLoadPages(std::move(req))); ++ReadsLeft; } diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index d13d72341347..761d7ebe8a02 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -621,9 +621,10 @@ namespace NFlatExecutorSetup { // Returns current database scheme (executor must be active) virtual const NTable::TScheme& Scheme() const noexcept = 0; + virtual void SetPreloadTablesData(THashSet tables) = 0; + ui32 Generation() const { return Generation0; } ui32 Step() const { return Step0; } - protected: // IExecutor() diff --git a/ydb/core/tablet_flat/ut/ut_slice_loader.cpp b/ydb/core/tablet_flat/ut/ut_slice_loader.cpp index fd0648608966..ca214424c36c 100644 --- a/ydb/core/tablet_flat/ut/ut_slice_loader.cpp +++ b/ydb/core/tablet_flat/ut/ut_slice_loader.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -142,11 +143,12 @@ namespace { TCheckResult result; TIntrusiveConstPtr pageCollection = new TTestPartPageCollection(part, 0); - TKeysEnv env(part.Get(), new TCache(pageCollection)); + NTable::TLoader::TLoaderEnv env(new TCache(pageCollection)); + env.ProvidePart(part.Get()); TKeysLoader loader(part.Get(), &env); while (!(result.Run = loader.Do(screen))) { - if (auto fetch = env.GetFetches()) { + if (auto fetch = env.GetFetch()) { UNIT_ASSERT_C(fetch->PageCollection.Get() == pageCollection.Get(), "TLoader wants to fetch from an unexpected pageCollection"); UNIT_ASSERT_C(fetch->Pages, "TLoader wants a fetch, but there are no pages"); @@ -162,7 +164,7 @@ namespace { UNIT_ASSERT_C(false, "TKeysLoader was stalled"); } } - env.Check(false); /* On success there shouldn't be left loads */ + env.EnsureNoNeedPages(); /* On success there shouldn't be left loads */ const auto scrSize = screen ? screen->Size() : 1; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 8dc1400ca887..b1d02b0009e3 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -361,6 +361,7 @@ void TDataShard::OnActivateExecutor(const TActorContext& ctx) { SyncConfig(); State = TShardState::Readonly; FollowerState = { }; + Executor()->SetPreloadTablesData({Schema::Sys::TableId, Schema::UserTables::TableId, Schema::Snapshots::TableId}); Become(&TThis::StateWorkAsFollower); SignalTabletActive(ctx); LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Follower switched to work state: " << TabletID()); diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp index 38d5fa2c193e..9087ecd9bd4b 100644 --- a/ydb/core/tx/datashard/datashard__stats.cpp +++ b/ydb/core/tx/datashard/datashard__stats.cpp @@ -89,7 +89,7 @@ class TTableStatsCoroBuilder : public TActorCoroImpl, private IPages { auto partStore = CheckedCast(part); auto info = partStore->PageCollections.at(groupId.Index).Get(); - auto type = EPage(info->PageCollection->Page(pageId).Type); + auto type = info->GetPageType(pageId); Y_ABORT_UNLESS(type == EPage::FlatIndex || type == EPage::BTreeIndex); auto& partPages = Pages[part]; @@ -99,7 +99,7 @@ class TTableStatsCoroBuilder : public TActorCoroImpl, private IPages { } auto fetchEv = new NPageCollection::TFetch{ {}, info->PageCollection, TVector{ pageId } }; - PagesSize += info->PageCollection->Page(pageId).Size; + PagesSize += info->GetPageSize(pageId); Send(MakeSharedPageCacheId(), new NSharedCache::TEvRequest(NSharedCache::EPriority::Bkgr, fetchEv, SelfActorId)); Spent->Alter(false); // pause measurement diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 61b189a860c4..1e4950575184 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -141,7 +141,7 @@ namespace NKqpHelpers { } inline TString FormatResult(const Ydb::ResultSet& rs) { - Cerr << JoinSeq(", ", rs.rows()); + Cerr << JoinSeq(", ", rs.rows()) << Endl; return JoinSeq(", ", rs.rows()); } diff --git a/ydb/core/tx/datashard/datashard_ut_followers.cpp b/ydb/core/tx/datashard/datashard_ut_followers.cpp index b4c4ea7712a7..f32604f21b8b 100644 --- a/ydb/core/tx/datashard/datashard_ut_followers.cpp +++ b/ydb/core/tx/datashard/datashard_ut_followers.cpp @@ -177,13 +177,20 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) { runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_DEBUG); InitRoot(server, sender); + TIntrusivePtr policy = new NLocalDb::TCompactionPolicy(); + policy->MinDataPageSize = 1; + policy->MinBTreeIndexNodeSize = 1; + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() .Shards(2) - .Followers(1)); + .Followers(1) + .Policy(policy.Get())); const auto shards = GetTableShards(server, sender, "/Root/table-1"); UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); @@ -191,7 +198,7 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) { ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);"); // Wait for leader to promote the follower read edge (and stop writing to the Sys table) - Cerr << "... sleeping" << Endl; + Cerr << "... sleeping after upsert" << Endl; runtime.SimulateSleep(TDuration::Seconds(1)); UNIT_ASSERT_VALUES_EQUAL( @@ -219,7 +226,7 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) { } // Allow table to finish compaction - Cerr << "... sleeping" << Endl; + Cerr << "... sleeping after compaction" << Endl; runtime.SimulateSleep(TDuration::Seconds(1)); // Reboot follower @@ -229,11 +236,430 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) { { .Follower = true }); // Allow it to boot properly - Cerr << "... sleeping" << Endl; + Cerr << "... sleeping after restart" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Read from follower must succeed + Cerr << "... checking after restart" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + + // Update row values and sleep + Cerr << "... updating rows" << Endl; + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);"); + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Read from follower must see updated values + Cerr << "... checking after update" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 44 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 55 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 66 } }"); + } + + Y_UNIT_TEST(FollowerAfterSysCompaction) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableForceFollowers(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TIntrusivePtr policy = new NLocalDb::TCompactionPolicy(); + policy->MinDataPageSize = 1; + policy->MinBTreeIndexNodeSize = 1; + + CreateShardedTable(server, sender, "/Root", "table-1", + TShardedTableOptions() + .Shards(2) + .Followers(1) + .Policy(policy.Get())); + + const auto shards = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);"); + + // Wait for leader to promote the follower read edge (and stop writing to the Sys table) + Cerr << "... sleeping after upsert" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + + // Now we ask the leader to compact the Sys table + { + NActorsProto::TRemoteHttpInfo pb; + pb.SetMethod(HTTP_METHOD_GET); + pb.SetPath("/executorInternals"); + auto* p1 = pb.AddQueryParams(); + p1->SetKey("force_compaction"); + p1->SetValue("1"); + SendViaPipeCache(runtime, shards.at(0), sender, + std::make_unique(std::move(pb))); + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT_C( + ev->Get()->Html.Contains("Table will be compacted in the near future"), + ev->Get()->Html); + } + + // Allow table to finish compaction + Cerr << "... sleeping after compaction" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Read from follower must succeed + Cerr << "... checking after compaction" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + + // Update row values and sleep + Cerr << "... updating rows" << Endl; + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);"); + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Read from follower must see updated values + Cerr << "... checking after update" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 44 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 55 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 66 } }"); + } + + Y_UNIT_TEST(FollowerAfterDataCompaction) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableForceFollowers(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TIntrusivePtr policy = new NLocalDb::TCompactionPolicy(); + policy->MinDataPageSize = 1; + policy->MinBTreeIndexNodeSize = 1; + + CreateShardedTable(server, sender, "/Root", "table-1", + TShardedTableOptions() + .Shards(2) + .Followers(1) + .Policy(policy.Get())); + + const auto shards = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);"); + + // Wait for leader to promote the follower read edge (and stop writing to the Sys table) + Cerr << "... sleeping after upsert" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + + // Now we ask the leader to compact the table + { + NActorsProto::TRemoteHttpInfo pb; + pb.SetMethod(HTTP_METHOD_GET); + pb.SetPath("/executorInternals"); + auto* p1 = pb.AddQueryParams(); + p1->SetKey("force_compaction"); + p1->SetValue("1001"); + SendViaPipeCache(runtime, shards.at(0), sender, + std::make_unique(std::move(pb))); + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT_C( + ev->Get()->Html.Contains("Table will be compacted in the near future"), + ev->Get()->Html); + } + + // Allow table to finish compaction + Cerr << "... sleeping after compaction" << Endl; runtime.SimulateSleep(TDuration::Seconds(1)); + auto observer = runtime.AddObserver([&](NSharedCache::TEvRequest::TPtr& ev) { + NSharedCache::TEvRequest *msg = ev->Get(); + Cerr << "Captured pages request" << Endl; + for (auto pageId : msg->Fetch->Pages) { + auto type = NTable::NPage::EPage(msg->Fetch->PageCollection->Page(pageId).Type); + UNIT_ASSERT_C(type != NTable::EPage::BTreeIndex && type != NTable::EPage::FlatIndex, "Index pages should be preload during a part switch"); + } + }); + // Read from follower must succeed - Cerr << "... checking" << Endl; + Cerr << "... checking after compaction" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + + // Update row values and sleep + Cerr << "... updating rows" << Endl; + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);"); + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Read from follower must see updated values + Cerr << "... checking after update" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 44 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 55 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 66 } }"); + } + + Y_UNIT_TEST(FollowerDuringSysPartSwitch) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableForceFollowers(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TIntrusivePtr policy = new NLocalDb::TCompactionPolicy(); + policy->MinDataPageSize = 1; + policy->MinBTreeIndexNodeSize = 1; + + CreateShardedTable(server, sender, "/Root", "table-1", + TShardedTableOptions() + .Followers(1) + .Policy(policy.Get())); + + const auto shards = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);"); + + // Wait for leader to promote the follower read edge (and stop writing to the Sys table) + Cerr << "... sleeping after upsert" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + + TVector> blockedReads; + auto observer = runtime.AddObserver([&](NSharedCache::TEvRequest::TPtr& ev) { + NSharedCache::TEvRequest *msg = ev->Get(); + for (auto pageId : msg->Fetch->Pages) { + auto type = NTable::NPage::EPage(msg->Fetch->PageCollection->Page(pageId).Type); + if (type == NTable::NPage::EPage::Schem2) { + Cerr << "... blocking part load read" << Endl; + blockedReads.emplace_back(ev.Release()); + } + } + }); + + // Now we ask the leader to compact the table + Cerr << "... compacting" << Endl; + { + NActorsProto::TRemoteHttpInfo pb; + pb.SetMethod(HTTP_METHOD_GET); + pb.SetPath("/executorInternals"); + auto* p1 = pb.AddQueryParams(); + p1->SetKey("force_compaction"); + p1->SetValue("1"); + SendViaPipeCache(runtime, shards.at(0), sender, + std::make_unique(std::move(pb))); + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT_C( + ev->Get()->Html.Contains("Table will be compacted in the near future"), + ev->Get()->Html); + } + + WaitFor(runtime, [&]{ return blockedReads.size(); }, "blocked read"); + + Cerr << "... checking after compaction" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + + Cerr << "... unblocking read" << Endl; + observer.Remove(); + ui32 readDataPages = 0; + observer = runtime.AddObserver([&](NSharedCache::TEvRequest::TPtr& ev) { + NSharedCache::TEvRequest *msg = ev->Get(); + for (auto pageId : msg->Fetch->Pages) { + auto type = NTable::NPage::EPage(msg->Fetch->PageCollection->Page(pageId).Type); + readDataPages += type == NTable::NPage::EPage::DataPage; + } + }); + for (auto& ev : blockedReads) { + runtime.Send(ev.Release(), 0, true); + } + blockedReads.clear(); + + runtime.SimulateSleep(TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL_C(readDataPages, 1, "Sys data should have been preloaded"); + + Cerr << "... checking after part switch" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + UNIT_ASSERT_VALUES_EQUAL(readDataPages, 1); + + // Update row values and sleep + Cerr << "... updating rows" << Endl; + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);"); + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Read from follower must see updated values + Cerr << "... checking after update" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 44 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 55 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 66 } }"); + UNIT_ASSERT_VALUES_EQUAL(readDataPages, 1); + } + + Y_UNIT_TEST(FollowerDuringDataPartSwitch) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableForceFollowers(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TIntrusivePtr policy = new NLocalDb::TCompactionPolicy(); + policy->MinDataPageSize = 1; + policy->MinBTreeIndexNodeSize = 1; + + CreateShardedTable(server, sender, "/Root", "table-1", + TShardedTableOptions() + .Followers(1) + .Policy(policy.Get())); + + const auto shards = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);"); + + // Wait for leader to promote the follower read edge (and stop writing to the Sys table) + Cerr << "... sleeping after upsert" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + + TVector> blockedReads; + auto observer = runtime.AddObserver([&](NSharedCache::TEvRequest::TPtr& ev) { + NSharedCache::TEvRequest *msg = ev->Get(); + for (auto pageId : msg->Fetch->Pages) { + auto type = NTable::NPage::EPage(msg->Fetch->PageCollection->Page(pageId).Type); + if (type == NTable::NPage::EPage::Schem2) { + Cerr << "... blocking part load read" << Endl; + blockedReads.emplace_back(ev.Release()); + } + } + }); + + // Now we ask the leader to compact the table + { + NActorsProto::TRemoteHttpInfo pb; + pb.SetMethod(HTTP_METHOD_GET); + pb.SetPath("/executorInternals"); + auto* p1 = pb.AddQueryParams(); + p1->SetKey("force_compaction"); + p1->SetValue("1001"); + SendViaPipeCache(runtime, shards.at(0), sender, + std::make_unique(std::move(pb))); + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT_C( + ev->Get()->Html.Contains("Table will be compacted in the near future"), + ev->Get()->Html); + } + + WaitFor(runtime, [&]{ return blockedReads.size(); }, "blocked read"); + + Cerr << "... checking after compaction" << Endl; UNIT_ASSERT_VALUES_EQUAL( KqpSimpleStaleRoExec(runtime, "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", @@ -242,13 +668,49 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) { "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + Cerr << "... unblocking read" << Endl; + observer.Remove(); + ui32 readDataPages = 0; + observer = runtime.AddObserver([&](NSharedCache::TEvRequest::TPtr& ev) { + NSharedCache::TEvRequest *msg = ev->Get(); + for (auto pageId : msg->Fetch->Pages) { + auto type = NTable::NPage::EPage(msg->Fetch->PageCollection->Page(pageId).Type); + readDataPages += type == NTable::NPage::EPage::DataPage; + } + }); + for (auto& ev : blockedReads) { + runtime.Send(ev.Release(), 0, true); + } + blockedReads.clear(); + + runtime.SimulateSleep(TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL_C(readDataPages, 0, "Shouldn't have preload data"); + + Cerr << "... checking after part switch" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, + "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", + "/Root"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 33 } }"); + UNIT_ASSERT_EQUAL(readDataPages, 3); + // Update row values and sleep Cerr << "... updating rows" << Endl; ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);"); runtime.SimulateSleep(TDuration::Seconds(1)); + observer = runtime.AddObserver([&](NSharedCache::TEvRequest::TPtr& ev) { + NSharedCache::TEvRequest *msg = ev->Get(); + for (auto pageId : msg->Fetch->Pages) { + auto type = NTable::NPage::EPage(msg->Fetch->PageCollection->Page(pageId).Type); + UNIT_ASSERT_C(type != NTable::NPage::EPage::DataPage, "Shouldn't read any data"); + } + }); + // Read from follower must see updated values - Cerr << "... checking" << Endl; + Cerr << "... checking after update" << Endl; UNIT_ASSERT_VALUES_EQUAL( KqpSimpleStaleRoExec(runtime, "SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3", @@ -256,6 +718,7 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) { "{ items { uint32_value: 1 } items { uint32_value: 44 } }, " "{ items { uint32_value: 2 } items { uint32_value: 55 } }, " "{ items { uint32_value: 3 } items { uint32_value: 66 } }"); + UNIT_ASSERT_EQUAL(readDataPages, 3); } } // Y_UNIT_TEST_SUITE(DataShardFollowers)