Skip to content

Commit

Permalink
add implementation for common cases in local partition reader (ydb-pl…
Browse files Browse the repository at this point in the history
  • Loading branch information
Enjection authored Jul 15, 2024
1 parent 0bf2ed6 commit d8fc0d3
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 46 deletions.
25 changes: 21 additions & 4 deletions ydb/core/backup/impl/local_partition_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
using namespace NActors;
using namespace NKikimr::NReplication::NService;

namespace {

constexpr static char OFFLOAD_ACTOR_CLIENT_ID[] = "__OFFLOAD_ACTOR__";
constexpr static ui64 READ_TIMEOUT_MS = 1000;
constexpr static ui64 READ_LIMIT_BYTES = 1_MB;

} // anonymous namespace

namespace NKikimr::NBackup::NImpl {

Expand Down Expand Up @@ -50,6 +56,10 @@ class TLocalPartitionReader
return request;
}

void HandleInit() {
Send(PQTablet, CreateGetOffsetRequest().Release());
}

void HandleInit(TEvWorker::TEvHandshake::TPtr& ev) {
Worker = ev->Sender;
LOG_D("Handshake"
Expand All @@ -62,8 +72,8 @@ class TLocalPartitionReader
LOG_D("Handle " << ev->Get()->ToString());
auto& record = ev->Get()->Record;
if (record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
// TODO reschedule
Y_ABORT("Unimplemented!");
Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup);
return;
}
Y_VERIFY_S(record.GetErrorCode() == NPersQueue::NErrorCode::OK, "Unimplemented!");
Y_VERIFY_S(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdGetClientOffsetResult(), "Unimplemented!");
Expand All @@ -85,8 +95,8 @@ class TLocalPartitionReader
auto& read = *req.MutableCmdRead();
read.SetOffset(Offset);
read.SetClientId(OFFLOAD_ACTOR_CLIENT_ID);
read.SetTimeoutMs(0);
read.SetBytes(1_MB);
read.SetTimeoutMs(READ_TIMEOUT_MS);
read.SetBytes(READ_LIMIT_BYTES);

return request;
}
Expand Down Expand Up @@ -114,6 +124,12 @@ class TLocalPartitionReader

const auto& readResult = record.GetPartitionResponse().GetCmdReadResult();

if (!readResult.ResultSize()) {
Y_ABORT_UNLESS(PQTablet);
Send(PQTablet, CreateReadRequest().Release());
return;
}

auto gotOffset = Offset;
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(readResult.ResultSize()));

Expand Down Expand Up @@ -147,6 +163,7 @@ class TLocalPartitionReader
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, HandleInit);
hFunc(TEvPersQueue::TEvResponse, HandleInit);
sFunc(TEvents::TEvWakeup, HandleInit);
sFunc(TEvents::TEvPoison, PassAway);
default:
Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite()
Expand Down
220 changes: 178 additions & 42 deletions ydb/core/backup/impl/local_partition_reader_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,90 @@ using namespace NReplication::NService;
using namespace NPersQueue;

Y_UNIT_TEST_SUITE(LocalPartitionReader) {
constexpr static ui64 INITIAL_OFFSET = 3;
constexpr static ui64 PARTITION = 7;
constexpr static const char* PARTITION_STR = "7";
constexpr static const char* OFFLOAD_ACTOR_NAME = "__OFFLOAD_ACTOR__";

void GrabInitialPQRequest(TTestActorRuntime& runtime) {
TAutoPtr<IEventHandle> handle;
auto getOffset = runtime.GrabEdgeEventRethrow<TEvPersQueue::TEvRequest>(handle);
UNIT_ASSERT(getOffset->Record.HasPartitionRequest());
UNIT_ASSERT_VALUES_EQUAL(getOffset->Record.GetPartitionRequest().GetPartition(), PARTITION);
UNIT_ASSERT(getOffset->Record.GetPartitionRequest().HasCmdGetClientOffset());
auto getOffsetCmd = getOffset->Record.GetPartitionRequest().GetCmdGetClientOffset();
UNIT_ASSERT_VALUES_EQUAL(getOffsetCmd.GetClientId(), OFFLOAD_ACTOR_NAME);
}

void GrabReadPQRequest(TTestActorRuntime& runtime, ui32 expectedOffset) {
TAutoPtr<IEventHandle> handle;
auto read = runtime.GrabEdgeEventRethrow<TEvPersQueue::TEvRequest>(handle);
UNIT_ASSERT(read->Record.HasPartitionRequest());
UNIT_ASSERT_VALUES_EQUAL(read->Record.GetPartitionRequest().GetPartition(), PARTITION);
UNIT_ASSERT(read->Record.GetPartitionRequest().HasCmdRead());
auto readCmd = read->Record.GetPartitionRequest().GetCmdRead();
UNIT_ASSERT_VALUES_EQUAL(readCmd.GetClientId(), OFFLOAD_ACTOR_NAME);
UNIT_ASSERT_VALUES_EQUAL(readCmd.GetOffset(), expectedOffset);
}

void GrabDataEvent(TTestActorRuntime& runtime, ui32 dataPatternCookie) {
TAutoPtr<IEventHandle> handle;
auto data = runtime.GrabEdgeEventRethrow<TEvWorker::TEvData>(handle);
UNIT_ASSERT_VALUES_EQUAL(data->Source, PARTITION_STR);
UNIT_ASSERT_VALUES_EQUAL(data->Records.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Offset, INITIAL_OFFSET + dataPatternCookie * 2);
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Data, Sprintf("1-%d", dataPatternCookie));
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Offset, INITIAL_OFFSET + dataPatternCookie * 2 + 1);
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Data, Sprintf("2-%d", dataPatternCookie));
}

TEvPersQueue::TEvResponse* GenerateData(ui32 dataPatternCookie) {
auto* readResponse = new TEvPersQueue::TEvResponse;
readResponse->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
auto& cmdReadResult = *readResponse->Record.MutablePartitionResponse()->MutableCmdReadResult();
auto& readResult1 = *cmdReadResult.AddResult();
NKikimrPQClient::TDataChunk msg1;
msg1.SetData(Sprintf("1-%d", dataPatternCookie));
TString msg1Str;
UNIT_ASSERT(msg1.SerializeToString(&msg1Str));
readResult1.SetOffset(INITIAL_OFFSET + dataPatternCookie * 2);
readResult1.SetData(msg1Str);
auto& readResult2 = *cmdReadResult.AddResult();
NKikimrPQClient::TDataChunk msg2;
msg2.SetData(Sprintf("2-%d", dataPatternCookie));
TString msg2Str;
UNIT_ASSERT(msg2.SerializeToString(&msg2Str));
readResult2.SetOffset(INITIAL_OFFSET + dataPatternCookie * 2 + 1);
readResult2.SetData(msg2Str);

return readResponse;
}

TEvPersQueue::TEvResponse* GenerateEmptyData() {
auto* readResponse = new TEvPersQueue::TEvResponse;
readResponse->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
readResponse->Record.MutablePartitionResponse()->MutableCmdReadResult();

return readResponse;
}

Y_UNIT_TEST(Simple) {
TTestActorRuntime runtime;
runtime.Initialize(NKikimr::TAppPrepare().Unwrap());

TActorId pqtablet = runtime.AllocateEdgeActor();
TActorId reader = runtime.Register(CreateLocalPartitionReader(pqtablet, 7));
TActorId reader = runtime.Register(CreateLocalPartitionReader(pqtablet, PARTITION));
runtime.EnableScheduleForActor(reader);
TActorId worker = runtime.AllocateEdgeActor();
TAutoPtr<IEventHandle> handle;

runtime.Send(new IEventHandle(reader, worker, new TEvWorker::TEvHandshake));

auto getOffset = runtime.GrabEdgeEventRethrow<TEvPersQueue::TEvRequest>(handle);
UNIT_ASSERT(getOffset->Record.HasPartitionRequest());
UNIT_ASSERT_VALUES_EQUAL(getOffset->Record.GetPartitionRequest().GetPartition(), 7);
UNIT_ASSERT(getOffset->Record.GetPartitionRequest().HasCmdGetClientOffset());
auto getOffsetCmd = getOffset->Record.GetPartitionRequest().GetCmdGetClientOffset();
UNIT_ASSERT_VALUES_EQUAL(getOffsetCmd.GetClientId(), "__OFFLOAD_ACTOR__"); // hardcoded for now
GrabInitialPQRequest(runtime);

auto* getOffsetResponse = new TEvPersQueue::TEvResponse;
getOffsetResponse->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
getOffsetResponse->Record.MutablePartitionResponse()->MutableCmdGetClientOffsetResult()->SetOffset(3);
getOffsetResponse->Record.MutablePartitionResponse()->MutableCmdGetClientOffsetResult()->SetOffset(INITIAL_OFFSET);
runtime.Send(new IEventHandle(reader, pqtablet, getOffsetResponse));

auto handshake = runtime.GrabEdgeEventRethrow<TEvWorker::TEvHandshake>(handle);
Expand All @@ -44,40 +107,113 @@ Y_UNIT_TEST_SUITE(LocalPartitionReader) {
for (auto i = 0; i < 3; ++i) {
runtime.Send(new IEventHandle(reader, worker, new TEvWorker::TEvPoll));

auto read = runtime.GrabEdgeEventRethrow<TEvPersQueue::TEvRequest>(handle);
UNIT_ASSERT(read->Record.HasPartitionRequest());
UNIT_ASSERT_VALUES_EQUAL(read->Record.GetPartitionRequest().GetPartition(), 7);
UNIT_ASSERT(read->Record.GetPartitionRequest().HasCmdRead());
auto readCmd = read->Record.GetPartitionRequest().GetCmdRead();
UNIT_ASSERT_VALUES_EQUAL(readCmd.GetClientId(), "__OFFLOAD_ACTOR__"); // hardcoded for now
UNIT_ASSERT_VALUES_EQUAL(readCmd.GetOffset(), 3 + i * 2);

auto* readResponse = new TEvPersQueue::TEvResponse;
readResponse->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
auto& cmdReadResult = *readResponse->Record.MutablePartitionResponse()->MutableCmdReadResult();
auto& readResult1 = *cmdReadResult.AddResult();
NKikimrPQClient::TDataChunk msg1;
msg1.SetData(Sprintf("1-%d", i));
TString msg1Str;
UNIT_ASSERT(msg1.SerializeToString(&msg1Str));
readResult1.SetOffset(3 + i * 2);
readResult1.SetData(msg1Str);
auto& readResult2 = *cmdReadResult.AddResult();
NKikimrPQClient::TDataChunk msg2;
msg2.SetData(Sprintf("2-%d", i));
TString msg2Str;
UNIT_ASSERT(msg2.SerializeToString(&msg2Str));
readResult2.SetOffset(3 + i * 2 + 1);
readResult2.SetData(msg2Str);
runtime.Send(new IEventHandle(reader, pqtablet, readResponse));

auto data = runtime.GrabEdgeEventRethrow<TEvWorker::TEvData>(handle);
UNIT_ASSERT_VALUES_EQUAL(data->Source, "7");
UNIT_ASSERT_VALUES_EQUAL(data->Records.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Offset, 3 + i * 2);
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Data, Sprintf("1-%d", i));
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Offset, 3 + i * 2 + 1);
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Data, Sprintf("2-%d", i));
GrabReadPQRequest(runtime, INITIAL_OFFSET + i * 2);

runtime.Send(new IEventHandle(reader, pqtablet, GenerateData(i)));

GrabDataEvent(runtime, i);

// TODO check commit
}
}

Y_UNIT_TEST(Booting) {
TTestActorRuntime runtime;
runtime.Initialize(NKikimr::TAppPrepare().Unwrap());

TActorId pqtablet = runtime.AllocateEdgeActor();
TActorId reader = runtime.Register(CreateLocalPartitionReader(pqtablet, PARTITION));
runtime.EnableScheduleForActor(reader);
TActorId worker = runtime.AllocateEdgeActor();
TAutoPtr<IEventHandle> handle;

runtime.Send(new IEventHandle(reader, worker, new TEvWorker::TEvHandshake));

for (int i = 0; i < 5; ++i) {
GrabInitialPQRequest(runtime);

auto* getOffsetResponse = new TEvPersQueue::TEvResponse;
getOffsetResponse->Record.SetErrorCode(NPersQueue::NErrorCode::INITIALIZING);
runtime.Send(new IEventHandle(reader, pqtablet, getOffsetResponse));
}

GrabInitialPQRequest(runtime);

auto* getOffsetResponse = new TEvPersQueue::TEvResponse;
getOffsetResponse->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
getOffsetResponse->Record.MutablePartitionResponse()->MutableCmdGetClientOffsetResult()->SetOffset(INITIAL_OFFSET);
runtime.Send(new IEventHandle(reader, pqtablet, getOffsetResponse));

auto handshake = runtime.GrabEdgeEventRethrow<TEvWorker::TEvHandshake>(handle);
Y_UNUSED(handshake);
}

Y_UNIT_TEST(FeedSlowly) {
TTestActorRuntime runtime;
runtime.Initialize(NKikimr::TAppPrepare().Unwrap());

TActorId pqtablet = runtime.AllocateEdgeActor();
TActorId reader = runtime.Register(CreateLocalPartitionReader(pqtablet, PARTITION));
runtime.EnableScheduleForActor(reader);
TActorId worker = runtime.AllocateEdgeActor();
TAutoPtr<IEventHandle> handle;

runtime.Send(new IEventHandle(reader, worker, new TEvWorker::TEvHandshake));

GrabInitialPQRequest(runtime);

auto* getOffsetResponse = new TEvPersQueue::TEvResponse;
getOffsetResponse->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
getOffsetResponse->Record.MutablePartitionResponse()->MutableCmdGetClientOffsetResult()->SetOffset(INITIAL_OFFSET);
runtime.Send(new IEventHandle(reader, pqtablet, getOffsetResponse));

auto handshake = runtime.GrabEdgeEventRethrow<TEvWorker::TEvHandshake>(handle);
Y_UNUSED(handshake);

runtime.Send(new IEventHandle(reader, worker, new TEvWorker::TEvPoll));

for (auto i = 0; i < 3; ++i) {
GrabReadPQRequest(runtime, INITIAL_OFFSET);

runtime.SimulateSleep(TDuration::Seconds(1));

runtime.Send(new IEventHandle(reader, pqtablet, GenerateEmptyData()));
}

for (auto i = 0; i < 3; ++i) {
if (i != 0) {
runtime.Send(new IEventHandle(reader, worker, new TEvWorker::TEvPoll));
}

GrabReadPQRequest(runtime, INITIAL_OFFSET + i * 2);

runtime.Send(new IEventHandle(reader, pqtablet, GenerateData(i)));

GrabDataEvent(runtime, i);

// TODO check commit
}

runtime.Send(new IEventHandle(reader, worker, new TEvWorker::TEvPoll));

for (auto i = 0; i < 2; ++i) {
GrabReadPQRequest(runtime, 9);

runtime.SimulateSleep(TDuration::Seconds(1));

runtime.Send(new IEventHandle(reader, pqtablet, GenerateEmptyData()));
}

for (auto i = 3; i < 5; ++i) {
if (i != 3) {
runtime.Send(new IEventHandle(reader, worker, new TEvWorker::TEvPoll));
}

GrabReadPQRequest(runtime, INITIAL_OFFSET + i * 2);

runtime.Send(new IEventHandle(reader, pqtablet, GenerateData(i)));

GrabDataEvent(runtime, i);

// TODO check commit
}
Expand Down

0 comments on commit d8fc0d3

Please sign in to comment.