Skip to content

Commit

Permalink
issue-1928: filestore-client: using ISession instead of IClient for t…
Browse files Browse the repository at this point in the history
…he Session-aware methods to properly process E_FS_INVALID_SESSION errors; existing tests are more or less sufficient to ensure that this code is correct (#2265)
  • Loading branch information
qkrorlqr authored Oct 12, 2024
1 parent 529076e commit a962fb0
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 81 deletions.
53 changes: 23 additions & 30 deletions cloud/filestore/apps/client/lib/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,46 +311,38 @@ void TFileStoreCommand::Stop()
TFileStoreServiceCommand::Stop();
}

TFileStoreCommand::TSessionGuard TFileStoreCommand::CreateSession()
TFileStoreCommand::TSessionGuard TFileStoreCommand::CreateCustomSession(
TString fsId,
TString clientId)
{
// TODO use ISession instead
auto request = std::make_shared<NProto::TCreateSessionRequest>();
request->SetFileSystemId(FileSystemId);
request->MutableHeaders()->SetClientId(ClientId);
request->SetRestoreClientSession(true);

TCallContextPtr ctx = MakeIntrusive<TCallContext>();
auto response = WaitFor(Client->CreateSession(ctx, std::move(request)));
NProto::TSessionConfig protoConfig;
protoConfig.SetFileSystemId(std::move(fsId));
protoConfig.SetClientId(std::move(clientId));
auto config = std::make_shared<TSessionConfig>(protoConfig);
auto session =
NClient::CreateSession(Logging, Timer, Scheduler, Client, config);
// extracting the value will break the internal state of this session object
auto response = WaitFor(session->CreateSession(), /* extract */ false);
CheckResponse(response);

auto sessionId = response.GetSession().GetSessionId();

Headers.SetClientId(ClientId);
Headers.SetSessionId(sessionId);
Headers.SetDisableMultiTabletForwarding(DisableMultiTabletForwarding);

return TSessionGuard(*this);
return TSessionGuard(*this, std::move(session));
}

void TFileStoreCommand::DestroySession()
TFileStoreCommand::TSessionGuard TFileStoreCommand::CreateSession()
{
if (Headers.GetSessionId().Empty()) {
return;
}

auto request = std::make_shared<NProto::TDestroySessionRequest>();
request->SetFileSystemId(FileSystemId);
request->MutableHeaders()->SetSessionId(Headers.GetSessionId());
request->MutableHeaders()->SetClientId(Headers.GetClientId());
return CreateCustomSession(FileSystemId, ClientId);
}

TCallContextPtr ctx = MakeIntrusive<TCallContext>();
auto response = WaitFor(Client->DestroySession(ctx, std::move(request)));
void TFileStoreCommand::DestroySession(ISession& session)
{
auto response = WaitFor(session.DestroySession());
CheckResponse(response);
}

////////////////////////////////////////////////////////////////////////////////

NProto::TNodeAttr TFileStoreCommand::ResolveNode(
ISession& session,
ui64 parentNodeId,
TString name,
bool ignoreMissing)
Expand All @@ -372,7 +364,7 @@ NProto::TNodeAttr TFileStoreCommand::ResolveNode(
request->SetNodeId(parentNodeId);
request->SetName(std::move(name));

auto response = WaitFor(Client->GetNodeAttr(
auto response = WaitFor(session.GetNodeAttr(
PrepareCallContext(),
std::move(request)));

Expand All @@ -387,6 +379,7 @@ NProto::TNodeAttr TFileStoreCommand::ResolveNode(
}

TVector<TFileStoreCommand::TPathEntry> TFileStoreCommand::ResolvePath(
ISession& session,
TStringBuf path,
bool ignoreMissing)
{
Expand All @@ -401,10 +394,10 @@ TVector<TFileStoreCommand::TPathEntry> TFileStoreCommand::ResolvePath(
while (it.NextTok('/', tok)) {
if (tok) {
auto node = ResolveNode(
session,
result.back().Node.GetId(),
ToString(tok),
ignoreMissing
);
ignoreMissing);
result.push_back({node, tok});
}
}
Expand Down
30 changes: 21 additions & 9 deletions cloud/filestore/apps/client/lib/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <cloud/filestore/libs/client/client.h>
#include <cloud/filestore/libs/client/config.h>
#include <cloud/filestore/libs/client/session.h>
#include <cloud/filestore/libs/service/context.h>
#include <cloud/filestore/libs/service/endpoint.h>
#include <cloud/filestore/libs/service/filestore.h>
Expand Down Expand Up @@ -96,15 +97,15 @@ class TCommand
}

template <typename T>
T WaitFor(NThreading::TFuture<T> future)
T WaitFor(NThreading::TFuture<T> future, bool extract = true)
{
if (!future.HasValue()) {
auto* ptr = reinterpret_cast<NThreading::TFuture<void>*>(&future);
if (!WaitForI(*ptr)) {
return TErrorResponse(E_REJECTED, "request cancelled");
}
}
return future.ExtractValue();
return extract ? future.ExtractValue() : future.GetValue();
}

private:
Expand All @@ -123,8 +124,6 @@ class TFileStoreServiceCommand

IFileStoreServicePtr Client;

NProto::THeaders Headers;

public:
TFileStoreServiceCommand() = default;

Expand Down Expand Up @@ -152,13 +151,14 @@ class TFileStoreCommand
{
auto request = std::make_shared<T>();
request->SetFileSystemId(FileSystemId);
request->MutableHeaders()->CopyFrom(Headers);
request->MutableHeaders()->SetDisableMultiTabletForwarding(
DisableMultiTabletForwarding);

return request;
}

template <typename T>
void CheckResponse(const T& response)
static void CheckResponse(const T& response)
{
if (HasError(response)) {
throw TServiceError(response.GetError());
Expand All @@ -172,39 +172,51 @@ class TFileStoreCommand
};

NProto::TNodeAttr ResolveNode(
ISession& session,
ui64 parentNodeId,
TString name,
bool ignoreMissing);
TVector<TPathEntry> ResolvePath(
ISession& session,
TStringBuf path,
bool ignoreMissing);

class TSessionGuard final
{
TFileStoreCommand& FileStoreCmd;
ISessionPtr Session;
TLog& Log;

public:
explicit TSessionGuard(TFileStoreCommand& fileStoreCmd)
TSessionGuard(TFileStoreCommand& fileStoreCmd, ISessionPtr session)
: FileStoreCmd(fileStoreCmd)
, Session(std::move(session))
, Log(FileStoreCmd.AccessLog())
{
}

~TSessionGuard()
{
try {
FileStoreCmd.DestroySession();
FileStoreCmd.DestroySession(*Session);
} catch (...) {
STORAGE_ERROR("~TSessionGuard: " << CurrentExceptionMessage());
}
}

ISession& AccessSession()
{
return *Session;
}
};

[[nodiscard]] TSessionGuard CreateSession();
[[nodiscard]] TSessionGuard CreateCustomSession(
TString fsId,
TString clientId);

private:
void DestroySession();
void DestroySession(ISession& session);
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
29 changes: 21 additions & 8 deletions cloud/filestore/apps/client/lib/find_garbage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class TFindGarbageCommand final
.StoreResult(&PageSize);
}

NProto::TListNodesResponse ListAll(const TString& fsId, ui64 parentId)
NProto::TListNodesResponse ListAll(
ISession& session,
const TString& fsId,
ui64 parentId)
{
NProto::TListNodesResponse fullResult;
TString cookie;
Expand All @@ -56,7 +59,7 @@ class TFindGarbageCommand final
request->SetCookie(cookie);
// TODO: async listing

auto response = WaitFor(Client->ListNodes(
auto response = WaitFor(session.ListNodes(
PrepareCallContext(),
std::move(request)));

Expand All @@ -82,18 +85,19 @@ class TFindGarbageCommand final
}

void FetchAll(
ISession& session,
const TString& fsId,
ui64 parentId,
TVector<TNode>* nodes)
{
// TODO: async listing
auto response = ListAll(fsId, parentId);
auto response = ListAll(session, fsId, parentId);

for (ui32 i = 0; i < response.NodesSize(); ++i) {
const auto& node = response.GetNodes(i);
const auto& name = response.GetNames(i);
if (node.GetType() == NProto::E_DIRECTORY_NODE) {
FetchAll(fsId, node.GetId(), nodes);
FetchAll(session, fsId, node.GetId(), nodes);
} else {
nodes->push_back({
node.GetId(),
Expand All @@ -106,6 +110,7 @@ class TFindGarbageCommand final
}

TMaybe<NProto::TNodeAttr> Stat(
ISession& session,
const TString& fsId,
ui64 parentId,
const TString& name)
Expand All @@ -115,7 +120,7 @@ class TFindGarbageCommand final
request->SetNodeId(parentId);
request->SetName(name);

auto response = WaitFor(Client->GetNodeAttr(
auto response = WaitFor(session.GetNodeAttr(
PrepareCallContext(),
std::move(request)));

Expand All @@ -134,13 +139,17 @@ class TFindGarbageCommand final
bool Execute() override
{
auto sessionGuard = CreateSession();
auto& session = sessionGuard.AccessSession();
TMap<TString, TVector<TNode>> shard2Nodes;
for (const auto& shard: Shards) {
FetchAll(shard, RootNodeId, &shard2Nodes[shard]);
auto shardSessionGuard =
CreateCustomSession(shard, shard + "::" + ClientId);
auto& shardSession = shardSessionGuard.AccessSession();
FetchAll(shardSession, shard, RootNodeId, &shard2Nodes[shard]);
}

TVector<TNode> leaderNodes;
FetchAll(FileSystemId, RootNodeId, &leaderNodes);
FetchAll(session, FileSystemId, RootNodeId, &leaderNodes);

THashSet<TString> shardNames;
for (const auto& node: leaderNodes) {
Expand Down Expand Up @@ -169,9 +178,13 @@ class TFindGarbageCommand final
TVector<TResult> results;

for (const auto& [shard, nodes]: shard2Nodes) {
auto shardSessionGuard =
CreateCustomSession(shard, shard + "::" + ClientId);
auto& shardSession = shardSessionGuard.AccessSession();
for (const auto& node: nodes) {
if (!shardNames.contains(node.Name)) {
auto stat = Stat(shard, RootNodeId, node.Name);
auto stat =
Stat(shardSession, shard, RootNodeId, node.Name);

if (stat) {
results.push_back({shard, node.Name, stat->GetSize()});
Expand Down
28 changes: 16 additions & 12 deletions cloud/filestore/apps/client/lib/ls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,21 @@ class TLsCommand final
Opts.MutuallyExclusive(LimitOptionName, AllOptionName);
}

TPageInfo FetchNodesInfo(const NProto::TNodeAttr& node, TStringBuf name = TStringBuf())
TPageInfo FetchNodesInfo(
ISession& session,
const NProto::TNodeAttr& node,
TStringBuf name = TStringBuf())
{
if (node.GetType() != NProto::E_DIRECTORY_NODE) {
return TPageInfo{
.Content = {TNodeInfo{TString(name), node}, },
.Cookie = {},
};
}
return FetchNodesInfo(node.GetId());
return FetchNodesInfo(session, node.GetId());
}

TPageInfo FetchNodesInfo(TNodeId nodeId)
TPageInfo FetchNodesInfo(ISession& session, TNodeId nodeId)
{
TPageInfo page;
page.Cookie = CliArgs.Cookie;
Expand All @@ -246,7 +249,7 @@ class TLsCommand final
request->SetNodeId(nodeId);
request->SetCookie(page.Cookie);

auto response = WaitFor(Client->ListNodes(
auto response = WaitFor(session.ListNodes(
PrepareCallContext(),
std::move(request)));

Expand All @@ -267,43 +270,44 @@ class TLsCommand final
return page;
}

TPageInfo HandlePathMode(TStringBuf path)
TPageInfo HandlePathMode(ISession& session, TStringBuf path)
{
Y_ENSURE(!path.empty(), "Path must be set");

const auto resolvedPath = ResolvePath(path, false).back();
const auto resolvedPath = ResolvePath(session, path, false).back();

return FetchNodesInfo(resolvedPath.Node, resolvedPath.Name);
return FetchNodesInfo(session, resolvedPath.Node, resolvedPath.Name);
}

TPageInfo HandleNodeMode(TNodeId nodeId)
TPageInfo HandleNodeMode(ISession& session, TNodeId nodeId)
{
Y_ENSURE(nodeId != NProto::E_INVALID_NODE_ID, "Path must be set");

auto request = CreateRequest<NProto::TGetNodeAttrRequest>();
request->SetNodeId(nodeId);
request->SetName(TString{});

auto node = WaitFor(Client->GetNodeAttr(
auto node = WaitFor(session.GetNodeAttr(
PrepareCallContext(),
std::move(request))).GetNode();

return FetchNodesInfo(node);
return FetchNodesInfo(session, node);
}

bool Execute() override
{
Y_ENSURE(CliArgs.Mode != EModeType::Unknown, "Mode type is not set");

auto sessionGuard = CreateSession();
auto& session = sessionGuard.AccessSession();

TPageInfo page;
switch (CliArgs.Mode) {
case EModeType::Path:
page = HandlePathMode(CliArgs.Path);
page = HandlePathMode(session, CliArgs.Path);
break;
case EModeType::Node:
page = HandleNodeMode(CliArgs.NodeId);
page = HandleNodeMode(session, CliArgs.NodeId);
break;
case EModeType::Unknown:
break;
Expand Down
Loading

0 comments on commit a962fb0

Please sign in to comment.