Skip to content

Commit

Permalink
dont repeat requests for already applied modifications (ydb-platform#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Apr 27, 2024
1 parent 3863877 commit 673c274
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void TBehaviourRegistrator::Handle(TEvTableDescriptionFailed::TPtr& ev) {

void TBehaviourRegistrator::Handle(TEvStartRegistration::TPtr& /*ev*/) {
NInitializer::TDSAccessorInitialized::Execute(ReqConfig,
Behaviour->GetTypeId(), Behaviour->GetInitializer(), InternalController, RegistrationData->GetInitializationSnapshot());
Behaviour->GetTypeId(), Behaviour->GetInitializer(), InternalController, RegistrationData->GetSnapshotOwner());
}

void TBehaviourRegistrator::Handle(NInitializer::TEvInitializationFinished::TPtr& ev) {
Expand Down
8 changes: 4 additions & 4 deletions ydb/services/metadata/ds_table/registration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ void TRegistrationData::InitializationFinished(const TString& initId) {
}

void TRegistrationData::SetInitializationSnapshot(NFetcher::ISnapshot::TPtr s) {
const bool notInitializedBefore = !InitializationSnapshot;
InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(s);
Y_ABORT_UNLESS(InitializationSnapshot);
const bool notInitializedBefore = !SnapshotOwner->HasInitializationSnapshot();
SnapshotOwner->SetInitializationSnapshot(s);
if (notInitializedBefore) {
EventsWaiting->TryResendOne();
}
Expand All @@ -91,11 +90,12 @@ void TRegistrationData::StartInitialization() {
}

TRegistrationData::TRegistrationData() {
SnapshotOwner = std::make_shared<TInitializationSnapshotOwner>();
InitializationFetcher = std::make_shared<NInitializer::TFetcher>();
}

void TRegistrationData::NoInitializationSnapshot() {
InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero());
SnapshotOwner->NoInitializationSnapshot();
EventsWaiting->TryResendOne();
}

Expand Down
33 changes: 32 additions & 1 deletion ydb/services/metadata/ds_table/registration.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,37 @@ class TEventsCollector {
void Initialized(const TString& initId);
};

class TRegistrationData;

class TInitializationSnapshotOwner {
private:
mutable TMutex Mutex;
std::shared_ptr<NInitializer::TSnapshot> InitializationSnapshot;
friend class TRegistrationData;
void SetInitializationSnapshot(NFetcher::ISnapshot::TPtr s) {
auto snapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(s);
Y_ABORT_UNLESS(snapshot);
TGuard<TMutex> g(Mutex);
InitializationSnapshot = snapshot;
}

void NoInitializationSnapshot() {
TGuard<TMutex> g(Mutex);
InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero());
}
public:
bool HasInitializationSnapshot() const {
TGuard<TMutex> g(Mutex);
return !!InitializationSnapshot;
}
bool HasModification(const TString& componentId, const TString& modificationId) const {
NInitializer::TDBInitializationKey key(componentId, modificationId);
TGuard<TMutex> g(Mutex);
return InitializationSnapshot->GetObjects().contains(key);
}

};

class TRegistrationData {
public:
enum class EStage {
Expand All @@ -170,8 +201,8 @@ class TRegistrationData {
};
private:
YDB_READONLY(EStage, Stage, EStage::Created);
YDB_READONLY_DEF(std::shared_ptr<NInitializer::TSnapshot>, InitializationSnapshot);
YDB_READONLY_DEF(std::shared_ptr<NInitializer::TFetcher>, InitializationFetcher);
YDB_READONLY_DEF(std::shared_ptr<TInitializationSnapshotOwner>, SnapshotOwner);
public:
TRegistrationData();

Expand Down
2 changes: 1 addition & 1 deletion ydb/services/metadata/ds_table/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ IActor* CreateService(const TConfig& config) {

void TService::PrepareManagers(std::vector<IClassBehaviour::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender) {
TBehavioursId id(managers);
if (RegistrationData->GetInitializationSnapshot()) {
if (RegistrationData->GetSnapshotOwner()->HasInitializationSnapshot()) {
auto bInitializer = NInitializer::TDBObjectBehaviour::GetInstance();
switch (RegistrationData->GetStage()) {
case TRegistrationData::EStage::Created:
Expand Down
23 changes: 15 additions & 8 deletions ydb/services/metadata/initializer/accessor_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
namespace NKikimr::NMetadata::NInitializer {

void TDSAccessorInitialized::DoNextModifier(const bool doPop) {
if (InitializationSnapshotOwner->HasInitializationSnapshot() && !doPop) {
while (Modifiers.size() && !doPop) {
if (InitializationSnapshotOwner->HasModification(ComponentId, Modifiers.front()->GetModificationId())) {
Modifiers.pop_front();
} else {
break;
}
}
}
if (doPop) {
Modifiers.pop_front();
}
Expand All @@ -27,11 +36,11 @@ void TDSAccessorInitialized::DoNextModifier(const bool doPop) {
TDSAccessorInitialized::TDSAccessorInitialized(const NRequest::TConfig& config,
const TString& componentId,
IInitializationBehaviour::TPtr initializationBehaviour,
IInitializerOutput::TPtr controller, std::shared_ptr<TSnapshot> initializationSnapshot)
IInitializerOutput::TPtr controller, const std::shared_ptr<NProvider::TInitializationSnapshotOwner>& snapshotOwner)
: Config(config)
, InitializationBehaviour(initializationBehaviour)
, ExternalController(controller)
, InitializationSnapshot(initializationSnapshot)
, InitializationSnapshotOwner(snapshotOwner)
, ComponentId(componentId)
{
}
Expand All @@ -40,7 +49,7 @@ void TDSAccessorInitialized::OnModificationFinished(const TString& modificationI
ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size();
Y_ABORT_UNLESS(Modifiers.size());
Y_ABORT_UNLESS(Modifiers.front()->GetModificationId() == modificationId);
if (NProvider::TServiceOperator::IsEnabled() && InitializationSnapshot) {
if (NProvider::TServiceOperator::IsEnabled() && InitializationSnapshotOwner->HasInitializationSnapshot()) {
TDBInitialization dbInit(ComponentId, Modifiers.front()->GetModificationId());
NModifications::IOperationsManager::TExternalModificationContext extContext;
extContext.SetUserToken(NACLib::TSystemUsers::Metadata());
Expand All @@ -58,9 +67,6 @@ void TDSAccessorInitialized::OnModificationFinished(const TString& modificationI
void TDSAccessorInitialized::OnPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) {
for (auto&& i : modifiers) {
TDBInitializationKey key(ComponentId, i->GetModificationId());
if (InitializationSnapshot && InitializationSnapshot->GetObjects().contains(key)) {
continue;
}
Modifiers.emplace_back(i);
}
DoNextModifier(false);
Expand Down Expand Up @@ -93,10 +99,11 @@ void TDSAccessorInitialized::OnAlteringFinished() {

void TDSAccessorInitialized::Execute(const NRequest::TConfig& config, const TString& componentId,
IInitializationBehaviour::TPtr initializationBehaviour, IInitializerOutput::TPtr controller,
std::shared_ptr<TSnapshot> initializationSnapshot)
const std::shared_ptr<NProvider::TInitializationSnapshotOwner>& snapshotOwner)
{
AFL_VERIFY(snapshotOwner);
std::shared_ptr<TDSAccessorInitialized> initializer(new TDSAccessorInitialized(config,
componentId, initializationBehaviour, controller, initializationSnapshot));
componentId, initializationBehaviour, controller, snapshotOwner));
initializer->SelfPtr = initializer;

initializationBehaviour->Prepare(initializer);
Expand Down
7 changes: 4 additions & 3 deletions ydb/services/metadata/initializer/accessor_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/services/metadata/abstract/common.h>
#include <ydb/services/metadata/abstract/initialization.h>
#include <ydb/services/metadata/ds_table/config.h>
#include <ydb/services/metadata/ds_table/registration.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/event_local.h>
Expand All @@ -22,7 +23,7 @@ class TDSAccessorInitialized: public IInitializerInput,
const NRequest::TConfig Config;
IInitializationBehaviour::TPtr InitializationBehaviour;
IInitializerOutput::TPtr ExternalController;
std::shared_ptr<TSnapshot> InitializationSnapshot;
const std::shared_ptr<NProvider::TInitializationSnapshotOwner> InitializationSnapshotOwner;
const TString ComponentId;
std::shared_ptr<TDSAccessorInitialized> SelfPtr;

Expand All @@ -38,12 +39,12 @@ class TDSAccessorInitialized: public IInitializerInput,
TDSAccessorInitialized(const NRequest::TConfig& config,
const TString& componentId,
IInitializationBehaviour::TPtr initializationBehaviour,
IInitializerOutput::TPtr controller, std::shared_ptr<TSnapshot> initializationSnapshot);
IInitializerOutput::TPtr controller, const std::shared_ptr<NProvider::TInitializationSnapshotOwner>& snapshotOwner);
public:
static void Execute(const NRequest::TConfig& config,
const TString& componentId,
IInitializationBehaviour::TPtr initializationBehaviour,
IInitializerOutput::TPtr controller, std::shared_ptr<TSnapshot> initializationSnapshot);
IInitializerOutput::TPtr controller, const std::shared_ptr<NProvider::TInitializationSnapshotOwner>& initializationSnapshotOwner);

};

Expand Down

0 comments on commit 673c274

Please sign in to comment.