Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filestore] Implement a tool which replays filestore-vhost's profile log #2124

Open
wants to merge 57 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
cde1052
[Filestore] Implement a tool which replays filestore-vhost's profile …
proller Sep 20, 2024
c57e0b5
upload dirty
proller Sep 20, 2024
f79ae3b
clean
proller Sep 23, 2024
21221af
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Sep 23, 2024
83339bd
clean
proller Sep 23, 2024
b503a1e
clean
proller Sep 24, 2024
99c051b
wip
proller Sep 24, 2024
a1f564b
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Sep 24, 2024
2b76fdc
wip (listnodes,...)
proller Sep 24, 2024
a79bd29
correct structure
proller Sep 24, 2024
a297662
merge to loadtest
proller Sep 25, 2024
7b4c62b
move fix
proller Sep 25, 2024
86c763b
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Sep 25, 2024
ff0574b
better
proller Sep 25, 2024
823e1b7
grpc fix
proller Sep 26, 2024
e3da729
wip
proller Sep 27, 2024
a0c0eff
fix resize
proller Sep 27, 2024
d3d1f67
Test scripts
proller Sep 27, 2024
472f048
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Sep 30, 2024
1af310a
support links
proller Sep 30, 2024
c2e7c5c
Better
proller Sep 30, 2024
db3be77
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 1, 2024
42fa690
deduplicate code
proller Oct 1, 2024
8f45bfd
clean
proller Oct 1, 2024
2d3a2d1
Fix dirs
proller Oct 2, 2024
ec9b25b
better names
proller Oct 3, 2024
944b952
temporary remove grpc
proller Oct 3, 2024
940f7b1
clean
proller Oct 3, 2024
ed9f9d5
clean
proller Oct 3, 2024
64d954c
Clean
proller Oct 3, 2024
4d9f9f7
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 3, 2024
9919e94
clean
proller Oct 3, 2024
65e298d
better
proller Oct 4, 2024
f0979bd
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 4, 2024
8c445e0
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 9, 2024
1e27a1e
Some fixes
proller Oct 9, 2024
4929286
Describe vars
proller Oct 9, 2024
d9e9b8d
Fixes
proller Oct 9, 2024
a9f6818
Fixing
proller Oct 9, 2024
262932c
fmt
proller Oct 10, 2024
82ce83c
Some fixes
proller Oct 10, 2024
a6ad099
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 10, 2024
7d53ad9
fixes
proller Oct 11, 2024
6071e1c
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 11, 2024
277106b
fixes
proller Oct 11, 2024
bd9b7de
No stop result
proller Oct 12, 2024
369246d
comment
proller Oct 12, 2024
8fb5141
remoove TargetFilesystemId <- return me!
proller Oct 12, 2024
8a6c2bc
clean
proller Oct 12, 2024
6a5aa09
clean
proller Oct 12, 2024
134d730
read write
proller Oct 12, 2024
a2b4ab0
enum
proller Oct 13, 2024
12b6349
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 13, 2024
80f5609
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 16, 2024
1d41d3e
use fspath
proller Oct 17, 2024
a4b51b0
Merge remote-tracking branch 'upstream/main' into issues-1733-replay
proller Oct 17, 2024
7a02c7e
fixes
proller Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloud/filestore/libs/client/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ FILESTORE_SESSION_FORWARD(FILESTORE_IMPLEMENT_METHOD)
with_lock (SessionLock) {
if (!HasError(response)) {
STORAGE_INFO(LogTag(GetSessionId(response), GetSessionSeqNo(response))
<< " session established" << GetSessionState(response).size());
<< " session established " << GetSessionState(response).size());

SessionState = SessionEstablished;

Expand Down
22 changes: 20 additions & 2 deletions cloud/filestore/tools/testing/loadtest/lib/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace NCloud::NFileStore::NLoadTest {

struct TCompletedRequest
{
NProto::EAction Action;
NProto::EAction Action{};
TDuration Elapsed;
proller marked this conversation as resolved.
Show resolved Hide resolved
NProto::TError Error;

Expand All @@ -41,8 +41,19 @@ struct IRequestGenerator
virtual ~IRequestGenerator() = default;

virtual bool HasNextRequest() = 0;
virtual TInstant NextRequestAt() = 0;
virtual NThreading::TFuture<TCompletedRequest> ExecuteNextRequest() = 0;

// With false collect request futures and process them in bulk
// With true process every request future immediately after ExecuteNextRequest
virtual bool ShouldImmediatelyProcessQueue()
proller marked this conversation as resolved.
Show resolved Hide resolved
{
return false;
}

virtual bool ShouldFailOnError()
{
return true;
}
};

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -61,4 +72,11 @@ IRequestGeneratorPtr CreateDataRequestGenerator(
TString filesystemId,
NProto::THeaders headers);

IRequestGeneratorPtr CreateReplayRequestGeneratorFs(
NProto::TReplaySpec spec,
ILoggingServicePtr logging,
NClient::ISessionPtr session,
TString filesystemId,
NProto::THeaders headers);

} // namespace NCloud::NFileStore::NLoadTest
5 changes: 0 additions & 5 deletions cloud/filestore/tools/testing/loadtest/lib/request_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,6 @@ class TDataRequestGenerator final
return true;
}

TInstant NextRequestAt() override
{
return TInstant::Max();
}

NThreading::TFuture<TCompletedRequest> ExecuteNextRequest() override
{
const auto& action = PeekNextAction();
Expand Down
5 changes: 0 additions & 5 deletions cloud/filestore/tools/testing/loadtest/lib/request_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ class TIndexRequestGenerator final
return true;
}

TInstant NextRequestAt() override
{
return TInstant::Max();
}

NThreading::TFuture<TCompletedRequest> ExecuteNextRequest() override
{
const auto& action = PeekNextAction();
Expand Down
192 changes: 192 additions & 0 deletions cloud/filestore/tools/testing/loadtest/lib/request_replay.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#include "request_replay.h"

#include <cloud/filestore/libs/diagnostics/events/profile_events.ev.pb.h>
#include <cloud/filestore/libs/service/request.h>
#include <cloud/filestore/tools/analytics/libs/event-log/dump.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>

#include <library/cpp/eventlog/eventlog.h>
#include <library/cpp/eventlog/iterator.h>
#include <library/cpp/logger/log.h>

namespace NCloud::NFileStore::NLoadTest {

////////////////////////////////////////////////////////////////////////////////

IReplayRequestGenerator::IReplayRequestGenerator(
proller marked this conversation as resolved.
Show resolved Hide resolved
NProto::TReplaySpec spec,
ILoggingServicePtr logging,
NClient::ISessionPtr session,
TString /*filesystemId*/,
NProto::THeaders headers)
: Spec(std::move(spec))
, Headers(std::move(headers))
, Session(std::move(session))
{
Log = logging->CreateLog(Headers.GetClientId());

NEventLog::TOptions options;
options.FileName = Spec.GetFileName();

// Sort eventlog items by timestamp
options.SetForceStrongOrdering(true);
CurrentEvent = CreateIterator(options);
}

bool IReplayRequestGenerator::HasNextRequest()
{
if (!EventPtr) {
Advance();
}
return !!EventPtr;
}

bool IReplayRequestGenerator::ShouldImmediatelyProcessQueue()
{
return true;
}

bool IReplayRequestGenerator::ShouldFailOnError()
{
return false;
}

void IReplayRequestGenerator::Advance()
{
for (EventPtr = CurrentEvent->Next(); EventPtr;
EventPtr = CurrentEvent->Next())
{
MessagePtr = dynamic_cast<const NProto::TProfileLogRecord*>(
EventPtr->GetProto());

if (!MessagePtr) {
return;
}

const TString fileSystemId{MessagePtr->GetFileSystemId()};
if (!Spec.GetFileSystemIdFilter().empty() &&
fileSystemId != Spec.GetFileSystemIdFilter())
{
STORAGE_DEBUG(
"Skipped event with FileSystemId=%s",
fileSystemId.c_str());
continue;
}

EventMessageNumber = MessagePtr->GetRequests().size();
return;
}
}

TFuture<TCompletedRequest> IReplayRequestGenerator::ProcessRequest(
const NProto::TProfileLogRequestInfo& request)
{
const auto& action = request.GetRequestType();
switch (static_cast<EFileStoreRequest>(action)) {
case EFileStoreRequest::ReadData:
return DoReadData(request);
case EFileStoreRequest::WriteData:
return DoWrite(request);
case EFileStoreRequest::CreateNode:
return DoCreateNode(request);
case EFileStoreRequest::RenameNode:
return DoRenameNode(request);
case EFileStoreRequest::UnlinkNode:
return DoUnlinkNode(request);
case EFileStoreRequest::CreateHandle:
return DoCreateHandle(request);
case EFileStoreRequest::DestroyHandle:
return DoDestroyHandle(request);
case EFileStoreRequest::GetNodeAttr:
return DoGetNodeAttr(request);
case EFileStoreRequest::AccessNode:
return DoAccessNode(request);
case EFileStoreRequest::ListNodes:
return DoListNodes(request);
case EFileStoreRequest::AcquireLock:
return DoAcquireLock(request);
case EFileStoreRequest::ReleaseLock:
return DoReleaseLock(request);

case EFileStoreRequest::ReadBlob:
case EFileStoreRequest::WriteBlob:
case EFileStoreRequest::GenerateBlobIds:
case EFileStoreRequest::PingSession:
case EFileStoreRequest::Ping:
return {};

default:
STORAGE_INFO(
"Uninmplemented action=%u %s",
action,
RequestName(request.GetRequestType()).c_str());
return {};
}
}

NThreading::TFuture<TCompletedRequest>
IReplayRequestGenerator::ExecuteNextRequest()
{
if (!HasNextRequest()) {
return {};
}

for (; EventPtr; Advance()) {
if (!MessagePtr) {
continue;
}

for (; EventMessageNumber > 0;) {
NProto::TProfileLogRequestInfo request =
MessagePtr->GetRequests()[--EventMessageNumber];
{
auto timediff = (request.GetTimestampMcs() - TimestampMcs) *
Spec.GetTimeScale();
TimestampMcs = request.GetTimestampMcs();
if (timediff > MaxSleepMcs) {
timediff = 0;
}

const auto current = TInstant::Now();
auto diff = current - Started;

if (timediff > diff.MicroSeconds()) {
auto sleep =
TDuration::MicroSeconds(timediff - diff.MicroSeconds());
STORAGE_DEBUG(
"Sleep=%lu timediff=%f diff=%lu",
sleep.MicroSeconds(),
timediff,
diff.MicroSeconds());

Sleep(sleep);
}

Started = current;
}

STORAGE_DEBUG(
"Processing message n=%d typename=%s type=%d name=%s data=%s",
EventMessageNumber,
request.GetTypeName().c_str(),
request.GetRequestType(),
RequestName(request.GetRequestType()).c_str(),
request.ShortDebugString().Quote().c_str());

if (const auto future = ProcessRequest(request);
proller marked this conversation as resolved.
Show resolved Hide resolved
future.Initialized())
{
return future;
}
}
}

STORAGE_INFO(
"Profile log finished n=%d hasPtr=%d",
EventMessageNumber,
!!EventPtr);

return {};
}

} // namespace NCloud::NFileStore::NLoadTest
96 changes: 96 additions & 0 deletions cloud/filestore/tools/testing/loadtest/lib/request_replay.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once

#include "request.h"

#include <cloud/filestore/libs/diagnostics/events/profile_events.ev.pb.h>

#include <library/cpp/eventlog/eventlog.h>
#include <library/cpp/eventlog/iterator.h>

namespace NCloud::NFileStore::NLoadTest {

using namespace NThreading;
using namespace NCloud::NFileStore::NClient;

proller marked this conversation as resolved.
Show resolved Hide resolved
////////////////////////////////////////////////////////////////////////////////

class IReplayRequestGenerator: public IRequestGenerator
{
protected:
const ::NCloud::NFileStore::NProto::TReplaySpec Spec;
TLog Log;
TString FileSystemIdFilter;
const ::NCloud::NFileStore::NProto::THeaders Headers;
NClient::ISessionPtr Session;

ui64 TimestampMcs{};
TInstant Started;

// Do not sleep too much if timestamps in log is broken
constexpr static auto MaxSleepMcs = 1000000;

private:
THolder<NEventLog::IIterator> CurrentEvent;
TConstEventPtr EventPtr;
int EventMessageNumber = 0;
proller marked this conversation as resolved.
Show resolved Hide resolved
const NProto::TProfileLogRecord* MessagePtr{};
TFuture<TCompletedRequest> ProcessRequest(
const NProto::TProfileLogRequestInfo& request);

public:
IReplayRequestGenerator(
NProto::TReplaySpec spec,
ILoggingServicePtr logging,
NClient::ISessionPtr session,
TString filesystemId,
NProto::THeaders headers);

bool ShouldImmediatelyProcessQueue() override;

bool ShouldFailOnError() override;

void Advance();

bool HasNextRequest() override;

TFuture<TCompletedRequest> ExecuteNextRequest() override;

virtual TFuture<TCompletedRequest> DoReadData(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoWrite(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoCreateNode(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoRenameNode(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoUnlinkNode(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoCreateHandle(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoDestroyHandle(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoGetNodeAttr(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoAccessNode(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoListNodes(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoAcquireLock(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
virtual TFuture<TCompletedRequest> DoReleaseLock(
const NCloud::NFileStore::NProto::
TProfileLogRequestInfo& /*unused*/) = 0;
};

} // namespace NCloud::NFileStore::NLoadTest
Loading
Loading