Skip to content

Commit

Permalink
Better handling of connection loss (ydb-platform#9993)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Oct 2, 2024
1 parent ea37d52 commit 91f8a44
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 51 deletions.
18 changes: 13 additions & 5 deletions ydb/core/change_exchange/change_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,27 @@ void TChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) {
}
}

void TChangeSender::CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged) {
if (partitioningChanged) {
void TChangeSender::CreateSendersImpl(const TVector<ui64>& partitionIds) {
if (partitionIds) {
CreateMissingSenders(partitionIds);
} else {
RecreateSenders(GonePartitions);
RecreateSenders(std::exchange(GonePartitions, {}));
}

GonePartitions.clear();

if (!Enqueued || !RequestRecords()) {
SendRecords();
}
}

void TChangeSender::CreateSenders(const TVector<ui64>& partitionIds) {
Y_ABORT_UNLESS(partitionIds);
CreateSendersImpl(partitionIds);
}

void TChangeSender::CreateSenders() {
CreateSendersImpl({});
}

void TChangeSender::KillSenders() {
for (const auto& [_, sender] : std::exchange(Senders, {})) {
if (sender.ActorId) {
Expand Down Expand Up @@ -303,6 +310,7 @@ void TChangeSender::OnGone(ui64 partitionId) {
if (it->second.Ready) {
--ReadySenders;
}

Senders.erase(it);
GonePartitions.push_back(partitionId);

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/change_exchange/change_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class TChangeSender {
THashSet<ui64> CompletedPartitions;
};

void CreateSendersImpl(const TVector<ui64>& partitionIds);
void LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId);
void RegisterSender(ui64 partitionId);
void CreateMissingSenders(const TVector<ui64>& partitionIds);
Expand Down Expand Up @@ -150,7 +151,8 @@ class TChangeSender {
return ChangeServer;
}

void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true);
void CreateSenders(const TVector<ui64>& partitionIds); // creates senders after partitioning changes
void CreateSenders(); // creates senders after connection loss
void KillSenders();

void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records);
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/change_exchange/util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "util.h"

namespace NKikimr::NChangeExchange {

TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(::Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

}
9 changes: 9 additions & 0 deletions ydb/core/change_exchange/util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <ydb/core/scheme/scheme_tabledefs.h>

namespace NKikimr::NChangeExchange {

TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions);

}
1 change: 1 addition & 0 deletions ydb/core/change_exchange/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(
change_sender.cpp
change_sender_monitoring.cpp
resolve_partition.cpp
util.cpp
)

GENERATE_ENUM_SERIALIZATION(change_record.h)
Expand Down
32 changes: 15 additions & 17 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <ydb/core/change_exchange/change_sender.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
Expand Down Expand Up @@ -440,16 +441,6 @@ class TCdcChangeSenderMain
return false;
}

static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

/// ResolveCdcStream

void ResolveCdcStream() {
Expand Down Expand Up @@ -571,6 +562,14 @@ class TCdcChangeSenderMain
return;
}

const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
if (TopicVersion && TopicVersion == topicVersion) {
CreateSenders();
return Become(&TThis::StateMain);
}

TopicVersion = topicVersion;

const auto& pqDesc = entry.PQGroupInfo->Description;
const auto& pqConfig = pqDesc.GetPQTabletConfig();

Expand All @@ -579,12 +578,7 @@ class TCdcChangeSenderMain
PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId());
}

const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
TopicVersion = topicVersion;

auto topicAutoPartitioning = ::NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED != pqConfig.GetPartitionStrategy().GetPartitionStrategyType();

const bool topicAutoPartitioning = IsTopicAutoPartitioningEnabled(pqConfig.GetPartitionStrategy().GetPartitionStrategyType());
Y_ABORT_UNLESS(topicAutoPartitioning || entry.PQGroupInfo->Schema);
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema);
Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning);
Expand All @@ -598,10 +592,14 @@ class TCdcChangeSenderMain
SetPartitionResolver(new TMd5PartitionResolver(KeyDesc->GetPartitions().size()));
}

CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged);
CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning));
Become(&TThis::StateMain);
}

static bool IsTopicAutoPartitioningEnabled(NKikimrPQ::TPQTabletConfig::TPartitionStrategyType strategy) {
return strategy != NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
}

/// Main

STATEFN(StateMain) {
Expand Down
21 changes: 7 additions & 14 deletions ydb/core/tx/datashard/change_sender_table_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "change_exchange_helpers.h"

#include <ydb/core/change_exchange/util.h>
#include <ydb/core/tablet_flat/flat_row_state.h>
#include <ydb/core/tx/scheme_cache/helpers.h>

Expand Down Expand Up @@ -151,6 +152,11 @@ class TResolveTargetTableState
return;
}

if (AsDerived()->TargetTableVersion && AsDerived()->TargetTableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
AsDerived()->CreateSenders();
return AsDerived()->Serve();
}

AsDerived()->TagMap.clear();
TVector<NScheme::TTypeInfo> keyColumnTypes;

Expand Down Expand Up @@ -181,7 +187,6 @@ class TResolveTargetTableState
);

AsDerived()->SetPartitionResolver(CreateDefaultPartitionResolver(*AsDerived()->KeyDesc.Get()));

AsDerived()->NextState(TStateTag{});
}
};
Expand Down Expand Up @@ -245,24 +250,12 @@ class TResolveKeysState
return AsDerived()->Retry();
}

const bool versionChanged = !AsDerived()->TargetTableVersion || AsDerived()->TargetTableVersion != entry.GeneralVersion;
AsDerived()->TargetTableVersion = entry.GeneralVersion;

AsDerived()->KeyDesc = std::move(entry.KeyDescription);
AsDerived()->CreateSenders(MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()), versionChanged);
AsDerived()->CreateSenders(NChangeExchange::MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()));

AsDerived()->NextState(TStateTag{});
}

static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId); // partition = shard
}

return result;
}
};

template <typename TDerived>
Expand Down
22 changes: 8 additions & 14 deletions ydb/core/tx/replication/service/base_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/change_exchange/change_sender.h>
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
Expand Down Expand Up @@ -268,16 +269,6 @@ class TLocalTableWriter
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected);
}

static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(::Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

void Registered(TActorSystem*, const TActorId&) override {
ChangeServer = SelfId();
}
Expand Down Expand Up @@ -338,6 +329,12 @@ class TLocalTableWriter
return;
}

if (TableVersion && TableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
Y_ABORT_UNLESS(Initialized);
Resolving = false;
return CreateSenders();
}

auto schema = MakeIntrusive<TLightweightSchema>();
if (entry.Self && entry.Self->Info.HasVersion()) {
schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion();
Expand Down Expand Up @@ -370,7 +367,6 @@ class TLocalTableWriter
);

TChangeSender::SetPartitionResolver(CreateResolverFn(*KeyDesc.Get()));

ResolveKeys();
}

Expand Down Expand Up @@ -408,11 +404,9 @@ class TLocalTableWriter
return LogWarnAndRetry("Empty partitions");
}

const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion;
TableVersion = entry.GeneralVersion;

KeyDesc = std::move(entry.KeyDescription);
CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));

if (!Initialized) {
Send(Worker, new TEvWorker::TEvHandshake());
Expand Down

0 comments on commit 91f8a44

Please sign in to comment.