diff --git a/cloud/filestore/config/storage.proto b/cloud/filestore/config/storage.proto index 8712331284..c478a00b18 100644 --- a/cloud/filestore/config/storage.proto +++ b/cloud/filestore/config/storage.proto @@ -240,4 +240,7 @@ message TStorageConfig // to release it in case of a client disconnect, this timeout is used. optional uint32 GenerateBlobIdsReleaseCollectBarrierTimeout = 345; + // If ThreeStageWriteEnabled is true, writes that exceed this threshold + // will use the three-stage write path. Similar to WriteBlobThreshold + optional uint32 ThreeStageWriteThreshold = 346; } diff --git a/cloud/filestore/libs/storage/api/service.h b/cloud/filestore/libs/storage/api/service.h index 9c23e78734..99c8ce2882 100644 --- a/cloud/filestore/libs/storage/api/service.h +++ b/cloud/filestore/libs/storage/api/service.h @@ -55,11 +55,11 @@ namespace NCloud::NFileStore::NStorage { xxx(ReleaseLock, __VA_ARGS__) \ xxx(TestLock, __VA_ARGS__) \ \ - xxx(WriteData, __VA_ARGS__) \ xxx(AllocateData, __VA_ARGS__) \ // FILESTORE_SERVICE_REQUESTS_HANDLE #define FILESTORE_SERVICE_REQUESTS_NO_HANDLE(xxx, ...) \ + xxx(WriteData, __VA_ARGS__) \ xxx(ReadData, __VA_ARGS__) \ // FILESTORE_SERVICE_REQUESTS_NO_HANDLE diff --git a/cloud/filestore/libs/storage/core/config.cpp b/cloud/filestore/libs/storage/core/config.cpp index d30c966af5..42cb543e9a 100644 --- a/cloud/filestore/libs/storage/core/config.cpp +++ b/cloud/filestore/libs/storage/core/config.cpp @@ -140,6 +140,7 @@ namespace { \ xxx(TwoStageReadEnabled, bool, false )\ xxx(ThreeStageWriteEnabled, bool, false )\ + xxx(ThreeStageWriteThreshold, ui32, 64_KB )\ xxx(EntryTimeout, TDuration, TDuration::Zero() )\ xxx(NegativeEntryTimeout, TDuration, TDuration::Zero() )\ xxx(AttrTimeout, TDuration, TDuration::Zero() )\ diff --git a/cloud/filestore/libs/storage/core/config.h b/cloud/filestore/libs/storage/core/config.h index 015ca29137..e6fb1312bf 100644 --- a/cloud/filestore/libs/storage/core/config.h +++ b/cloud/filestore/libs/storage/core/config.h @@ -182,6 +182,7 @@ class TStorageConfig bool GetTwoStageReadEnabled() const; bool GetThreeStageWriteEnabled() const; + ui32 GetThreeStageWriteThreshold() const; TDuration GetEntryTimeout() const; TDuration GetNegativeEntryTimeout() const; TDuration GetAttrTimeout() const; diff --git a/cloud/filestore/libs/storage/service/service_actor_writedata.cpp b/cloud/filestore/libs/storage/service/service_actor_writedata.cpp new file mode 100644 index 0000000000..866569c9cb --- /dev/null +++ b/cloud/filestore/libs/storage/service/service_actor_writedata.cpp @@ -0,0 +1,358 @@ +#include "service_actor.h" + +#include +#include +#include +#include + +#include + +#include +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; + +using namespace NKikimr; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TWriteDataActor final: public TActorBootstrapped +{ +private: + // Original request + NProto::TWriteDataRequest WriteRequest; + const TRequestInfoPtr RequestInfo; + + // generated blob id and associated data + NProtoPrivate::TGenerateBlobIdsResponse GenerateBlobIdsResponse; + + // WriteData state + ui32 RemainingBlobsToWrite = 0; + +public: + TWriteDataActor( + NProto::TWriteDataRequest request, + TRequestInfoPtr requestInfo) + : WriteRequest(std::move(request)) + , RequestInfo(std::move(requestInfo)) + {} + + void Bootstrap(const TActorContext& ctx) + { + auto request = + std::make_unique(); + + request->Record.MutableHeaders()->CopyFrom(WriteRequest.GetHeaders()); + request->Record.SetFileSystemId(WriteRequest.GetFileSystemId()); + request->Record.SetNodeId(WriteRequest.GetNodeId()); + request->Record.SetHandle(WriteRequest.GetHandle()); + request->Record.SetOffset(WriteRequest.GetOffset()); + request->Record.SetLength(WriteRequest.GetBuffer().size()); + + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "WriteDataActor started, data size: %lu, offset: %lu", + WriteRequest.GetBuffer().size(), + WriteRequest.GetOffset()); + + ctx.Send(MakeIndexTabletProxyServiceId(), request.release()); + + Become(&TThis::StateWork); + } + +private: + STFUNC(StateWork) + { + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc( + TEvIndexTablet::TEvGenerateBlobIdsResponse, + HandleGenerateBlobIdsResponse); + + HFunc(TEvBlobStorage::TEvPutResult, HandleWriteBlobResponse); + + HFunc(TEvIndexTablet::TEvAddDataResponse, HandleAddDataResponse); + + HFunc(TEvService::TEvWriteDataResponse, HandleWriteDataResponse); + + default: + HandleUnexpectedEvent(ev, TFileStoreComponents::SERVICE_WORKER); + break; + } + } + + void HandleGenerateBlobIdsResponse( + const TEvIndexTablet::TEvGenerateBlobIdsResponse::TPtr& ev, + const TActorContext& ctx) + { + const auto* msg = ev->Get(); + + if (HasError(msg->GetError())) { + WriteData(ctx, msg->GetError()); + return; + } + + GenerateBlobIdsResponse.CopyFrom(msg->Record); + + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "GenerateBlobIds response received: %s", + GenerateBlobIdsResponse.DebugString().Quote().c_str()); + + WriteBlobs(ctx); + } + + void WriteBlobs(const TActorContext& ctx) + { + RemainingBlobsToWrite = GenerateBlobIdsResponse.BlobsSize(); + ui64 offset = 0; + + for (const auto& blob: GenerateBlobIdsResponse.GetBlobs()) { + NKikimr::TLogoBlobID blobId = + LogoBlobIDFromLogoBlobID(blob.GetBlobId()); + std::unique_ptr request; + if (GenerateBlobIdsResponse.BlobsSize() == 1) { + // do not copy the buffer if there is only one blob + request = std::make_unique( + blobId, + WriteRequest.GetBuffer(), + TInstant::Max()); + } else { + request = std::make_unique( + blobId, + TString( + WriteRequest.GetBuffer().Data() + offset, + blobId.BlobSize()), + TInstant::Max()); + } + NKikimr::TActorId proxy = + MakeBlobStorageProxyID(blob.GetBSGroupId()); + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "Sending TEvPut request to blob storage, blobId: %s, proxy: %s", + blobId.ToString().c_str(), + proxy.ToString().c_str()); + SendToBSProxy(ctx, proxy, request.release(), blobId.Cookie()); + offset += blobId.BlobSize(); + } + } + + void HandleWriteBlobResponse( + const TEvBlobStorage::TEvPutResult::TPtr& ev, + const TActorContext& ctx) + { + const auto* msg = ev->Get(); + + if (msg->Status != NKikimrProto::OK) { + const auto error = + MakeError(MAKE_KIKIMR_ERROR(msg->Status), msg->ErrorReason); + LOG_WARN( + ctx, + TFileStoreComponents::SERVICE, + "WriteData error: %s", + msg->ErrorReason.Quote().c_str()); + // We still may receive some responses, but we do not want to + // process them + RemainingBlobsToWrite = std::numeric_limits::max(); + return WriteData(ctx, error); + } + + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "TEvPutResult response received: %s", + msg->ToString().c_str()); + + --RemainingBlobsToWrite; + if (RemainingBlobsToWrite == 0) { + AddData(ctx); + } + } + + void AddData(const TActorContext& ctx) + { + auto request = std::make_unique(); + + request->Record.MutableHeaders()->CopyFrom(WriteRequest.GetHeaders()); + request->Record.SetFileSystemId(WriteRequest.GetFileSystemId()); + request->Record.SetNodeId(WriteRequest.GetNodeId()); + request->Record.SetHandle(WriteRequest.GetHandle()); + request->Record.SetOffset(WriteRequest.GetOffset()); + request->Record.SetLength(WriteRequest.GetBuffer().size()); + for (auto& blob: *GenerateBlobIdsResponse.MutableBlobs()) { + request->Record.AddBlobIds()->Swap(blob.MutableBlobId()); + } + request->Record.SetCommitId(GenerateBlobIdsResponse.GetCommitId()); + + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "Sending AddData request to tablet: %s", + request->Record.DebugString().Quote().c_str()); + + ctx.Send(MakeIndexTabletProxyServiceId(), request.release()); + } + + void HandleAddDataResponse( + const TEvIndexTablet::TEvAddDataResponse::TPtr& ev, + const TActorContext& ctx) + { + auto* msg = ev->Get(); + + if (HasError(msg->GetError())) { + return WriteData(ctx, msg->GetError()); + } + + auto response = std::make_unique(); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + + Die(ctx); + } + + /** + * @brief Fallback to regular write if two-stage write fails for any reason + */ + void WriteData(const TActorContext& ctx, const NProto::TError& error) + { + LOG_WARN( + ctx, + TFileStoreComponents::SERVICE, + "Falling back to WriteData for %lu, %lu, %lu (%lu bytes). Message: " + "%s", + WriteRequest.GetNodeId(), + WriteRequest.GetHandle(), + WriteRequest.GetOffset(), + WriteRequest.GetBuffer().size(), + FormatError(error).Quote().c_str()); + + auto request = std::make_unique(); + request->Record = std::move(WriteRequest); + + // forward request through tablet proxy + ctx.Send(MakeIndexTabletProxyServiceId(), request.release()); + } + + void HandleWriteDataResponse( + const TEvService::TEvWriteDataResponse::TPtr& ev, + const TActorContext& ctx) + { + auto* msg = ev->Get(); + + if (HasError(msg->GetError())) { + HandleError(ctx, msg->GetError()); + return; + } + + LOG_DEBUG(ctx, TFileStoreComponents::SERVICE, "WriteData succeeded"); + + auto response = std::make_unique(); + response->Record = std::move(msg->Record); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + + Die(ctx); + } + + void ReplyAndDie(const TActorContext& ctx) + { + auto response = std::make_unique(); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + + Die(ctx); + } + + void HandleError(const TActorContext& ctx, const NProto::TError& error) + { + auto response = + std::make_unique(error); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + Die(ctx); + } + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) + { + Y_UNUSED(ev); + HandleError(ctx, MakeError(E_REJECTED, "request cancelled")); + } +}; + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TStorageServiceActor::HandleWriteData( + const TEvService::TEvWriteDataRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + const auto& clientId = GetClientId(msg->Record); + const auto& sessionId = GetSessionId(msg->Record); + const ui64 seqNo = GetSessionSeqNo(msg->Record); + + auto* session = State->FindSession(sessionId, seqNo); + if (!session || session->ClientId != clientId || !session->SessionActor) { + auto response = std::make_unique( + ErrorInvalidSession(clientId, sessionId, seqNo)); + return NCloud::Reply(ctx, *ev, std::move(response)); + } + const NProto::TFileStore& filestore = session->FileStore; + + if (!filestore.GetFeatures().GetThreeStageWriteEnabled()) { + // If three-stage write is disabled, forward the request to the tablet + // in the same way as all other requests. + ForwardRequest(ctx, ev); + return; + } + + auto [cookie, inflight] = CreateInFlightRequest( + TRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), + session->MediaKind, + session->RequestStats, + ctx.Now()); + + InitProfileLogRequestInfo(inflight->ProfileLogRequest, msg->Record); + + ui32 blockSize = filestore.GetBlockSize(); + + // TODO(debnatkh): Consider supporting unaligned writes + if (filestore.GetFeatures().GetThreeStageWriteEnabled() && + msg->Record.GetOffset() % blockSize == 0 && + msg->Record.GetBuffer().Size() % blockSize == 0 && + msg->Record.GetBuffer().Size() >= + filestore.GetFeatures().GetThreeStageWriteThreshold()) + { + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "Using three-stage write for request, size: %lu", + msg->Record.GetBuffer().Size()); + + auto requestInfo = + CreateRequestInfo(SelfId(), cookie, msg->CallContext); + + auto actor = std::make_unique( + std::move(msg->Record), + std::move(requestInfo)); + NCloud::Register(ctx, std::move(actor)); + } else { + LOG_DEBUG( + ctx, + TFileStoreComponents::SERVICE, + "Forwarding WriteData request to tablet"); + return ForwardRequest(ctx, ev); + } +} + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/service/service_ut.cpp b/cloud/filestore/libs/storage/service/service_ut.cpp index 5fdbe969fe..29bce5b55d 100644 --- a/cloud/filestore/libs/storage/service/service_ut.cpp +++ b/cloud/filestore/libs/storage/service/service_ut.cpp @@ -2038,6 +2038,318 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest) UNIT_ASSERT_VALUES_EQUAL(1, reassignedChannels[0]); UNIT_ASSERT_VALUES_EQUAL(4, reassignedChannels[1]); } + + TString GenerateValidateData(ui32 size) + { + TString data(size, 0); + for (ui32 i = 0; i < size; ++i) { + data[i] = 'A' + (i % ('Z' - 'A' + 1)); + } + return data; + } + + Y_UNIT_TEST(ShouldPerformThreeStageWrites) + { + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + + TServiceClient service(env.GetRuntime(), nodeIdx); + const TString fs = "test"; + service.CreateFileStore(fs, 1000); + + { + NProto::TStorageConfig newConfig; + newConfig.SetThreeStageWriteEnabled(true); + newConfig.SetThreeStageWriteThreshold(1); + const auto response = + ExecuteChangeStorageConfig(std::move(newConfig), service); + UNIT_ASSERT_VALUES_EQUAL( + response.GetStorageConfig().GetThreeStageWriteEnabled(), + true); + UNIT_ASSERT_VALUES_EQUAL( + response.GetStorageConfig().GetThreeStageWriteThreshold(), + 1); + TDispatchOptions options; + env.GetRuntime().DispatchEvents(options, TDuration::Seconds(1)); + } + + auto headers = service.InitSession(fs, "client"); + ui64 nodeId = service + .CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file")) + ->Record.GetNode() + .GetId(); + ui64 handle = service + .CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR) + ->Record.GetHandle(); + + ui32 putRequestCount = 0; + TActorId worker; + env.GetRuntime().SetEventFilter( + [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvGenerateBlobIdsRequest: { + if (!worker) { + worker = event->Sender; + } + break; + } + case TEvBlobStorage::EvPut: { + if (event->Sender == worker && + event->Recipient.IsService() && + event->Recipient.ServiceId().StartsWith("bsproxy")) + { + ++putRequestCount; + } + break; + } + } + return false; + }); + + auto& runtime = env.GetRuntime(); + + auto validateWriteData = + [&](ui64 offset, ui64 size, ui32 expectedPutCount) + { + auto data = GenerateValidateData(size); + + service.WriteData(headers, fs, nodeId, handle, offset, data); + auto readDataResult = + service + .ReadData(headers, fs, nodeId, handle, offset, data.Size()); + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsRequest)); + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvAddDataRequest)); + UNIT_ASSERT_VALUES_EQUAL(1, runtime.GetCounter(TEvIndexTabletPrivate::EvAddBlobRequest)); + UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTabletPrivate::EvWriteBlobRequest)); + UNIT_ASSERT_VALUES_EQUAL(1, runtime.GetCounter(TEvService::EvWriteDataResponse)); + UNIT_ASSERT_VALUES_EQUAL(expectedPutCount, putRequestCount); + // clang-format on + runtime.ClearCounters(); + putRequestCount = 0; + worker = TActorId(); + }; + + validateWriteData(0, DefaultBlockSize, 1); + validateWriteData(DefaultBlockSize, DefaultBlockSize, 1); + validateWriteData(0, DefaultBlockSize * BlockGroupSize, 1); + validateWriteData(0, DefaultBlockSize * BlockGroupSize * 2, 2); + validateWriteData( + DefaultBlockSize, + DefaultBlockSize * BlockGroupSize * 10, + 11); + validateWriteData(0, DefaultBlockSize * BlockGroupSize * 3, 3); + // Currently the data is written from 0th to (1 + BlockGroupSize * 10) = 641th block + // Therefore, the next write should fail + + auto data = + GenerateValidateData(DefaultBlockSize * 360); + + auto response = + service.AssertWriteDataFailed(headers, fs, nodeId, handle, DefaultBlockSize * 641, data); + auto error = STATUS_FROM_CODE(response->GetError().GetCode()); + UNIT_ASSERT_VALUES_EQUAL((ui32)NProto::E_FS_NOSPC, error); + } + + Y_UNIT_TEST(ShouldNotUseThreeStageWriteForSmallOrUnalignedRequests) + { + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + + TServiceClient service(env.GetRuntime(), nodeIdx); + const TString fs = "test"; + service.CreateFileStore(fs, 1000); + + { + NProto::TStorageConfig newConfig; + newConfig.SetThreeStageWriteEnabled(true); + const auto response = + ExecuteChangeStorageConfig(std::move(newConfig), service); + UNIT_ASSERT_VALUES_EQUAL( + response.GetStorageConfig().GetThreeStageWriteEnabled(), + true); + TDispatchOptions options; + env.GetRuntime().DispatchEvents(options, TDuration::Seconds(1)); + } + + auto headers = service.InitSession(fs, "client"); + ui64 nodeId = service + .CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file")) + ->Record.GetNode() + .GetId(); + ui64 handle = service + .CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR) + ->Record.GetHandle(); + + auto& runtime = env.GetRuntime(); + + auto validateWriteData = [&](ui64 offset, ui64 size) + { + auto data = GenerateValidateData(size); + + service.WriteData(headers, fs, nodeId, handle, offset, data); + auto readDataResult = + service + .ReadData(headers, fs, nodeId, handle, offset, data.Size()); + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsRequest)); + UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTablet::EvAddDataRequest)); + UNIT_ASSERT_VALUES_EQUAL(3, runtime.GetCounter(TEvService::EvWriteDataRequest)); + // clang-format on + runtime.ClearCounters(); + }; + + validateWriteData(0, 4_KB); + validateWriteData(4_KB, 4_KB); + validateWriteData(1, 128_KB); + } + + Y_UNIT_TEST(ShouldFallbackThreeStageWriteToSimpleWrite) + { + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + + TServiceClient service(env.GetRuntime(), nodeIdx); + const TString fs = "test"; + service.CreateFileStore(fs, 1000); + + NProto::TError error; + error.SetCode(E_REJECTED); + + env.GetRuntime().SetEventFilter( + [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvGenerateBlobIdsResponse: { + auto* msg = event->template Get< + TEvIndexTablet::TEvGenerateBlobIdsResponse>(); + msg->Record.MutableError()->CopyFrom(error); + break; + } + } + return false; + }); + + { + NProto::TStorageConfig newConfig; + newConfig.SetThreeStageWriteEnabled(true); + const auto response = + ExecuteChangeStorageConfig(std::move(newConfig), service); + UNIT_ASSERT_VALUES_EQUAL( + response.GetStorageConfig().GetThreeStageWriteEnabled(), + true); + TDispatchOptions options; + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(1)); + } + + auto headers = service.InitSession(fs, "client"); + ui64 nodeId = service + .CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file")) + ->Record.GetNode() + .GetId(); + ui64 handle = service + .CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR) + ->Record.GetHandle(); + + // GenerateBlobIdsResponse fails + TString data = GenerateValidateData(256_KB); + service.WriteData(headers, fs, nodeId, handle, 0, data); + auto readDataResult = + service.ReadData(headers, fs, nodeId, handle, 0, data.Size()); + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + auto& runtime = env.GetRuntime(); + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsResponse)); + UNIT_ASSERT_VALUES_EQUAL(3, runtime.GetCounter(TEvService::EvWriteDataResponse)); + // clang-format on + runtime.ClearCounters(); + + // AddDataResponse fails + env.GetRuntime().SetEventFilter( + [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvAddDataResponse: { + auto* msg = event->template Get< + TEvIndexTablet::TEvAddDataResponse>(); + msg->Record.MutableError()->CopyFrom(error); + break; + } + } + return false; + }); + data = GenerateValidateData(256_KB); + service.WriteData(headers, fs, nodeId, handle, 0, data); + readDataResult = + service.ReadData(headers, fs, nodeId, handle, 0, data.Size()); + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvAddDataResponse)); + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsResponse)); + UNIT_ASSERT_VALUES_EQUAL(3, runtime.GetCounter(TEvService::EvWriteDataResponse)); + // clang-format on + + // TEvGet fails + + runtime.ClearCounters(); + + TActorId worker; + ui32 evPuts = 0; + env.GetRuntime().SetEventFilter( + [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvIndexTablet::EvGenerateBlobIdsRequest: { + if (!worker) { + worker = event->Sender; + } + break; + } + case TEvBlobStorage::EvPutResult: { + auto* msg = + event->template Get(); + if (event->Recipient == worker) { + if (evPuts == 0) { + msg->Status = NKikimrProto::ERROR; + } + ++evPuts; + } + break; + } + } + + return false; + }); + + data = GenerateValidateData(256_KB); + service.WriteData(headers, fs, nodeId, handle, 0, data); + readDataResult = + service.ReadData(headers, fs, nodeId, handle, 0, data.Size()); + UNIT_ASSERT_VALUES_EQUAL(readDataResult->Record.GetBuffer(), data); + + // clang-format off + UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTablet::EvAddDataResponse)); + UNIT_ASSERT_VALUES_EQUAL(2, runtime.GetCounter(TEvIndexTablet::EvGenerateBlobIdsResponse)); + UNIT_ASSERT_VALUES_EQUAL(3, runtime.GetCounter(TEvService::EvWriteDataResponse)); + UNIT_ASSERT_VALUES_EQUAL(1, evPuts); + // clang-format on + } } } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/service/ya.make b/cloud/filestore/libs/storage/service/ya.make index eb27a0f3bf..46eac23382 100644 --- a/cloud/filestore/libs/storage/service/ya.make +++ b/cloud/filestore/libs/storage/service/ya.make @@ -28,6 +28,7 @@ SRCS( service_actor_readdata.cpp service_actor_statfs.cpp service_actor_update_stats.cpp + service_actor_writedata.cpp service_state.cpp ) diff --git a/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp index d7a08cda88..6c9b8960d4 100644 --- a/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp +++ b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp @@ -83,18 +83,13 @@ void TAddDataActor::ReplyAndDie( const TActorContext& ctx, const NProto::TError& error) { - // notify tablet - NCloud::Send( - ctx, - // We try to release commit barrier twice: once for the lock - // acquired by the GenerateBlob request and once for the lock - // acquired by the AddData request. Though, the first lock is - // scheduled to be released, it is better to release it as early - // as possible. - Tablet, - std::make_unique( - CommitId, - 2)); + { + // notify tablet + using TCompletion = TEvIndexTabletPrivate::TEvAddDataCompleted; + auto response = std::make_unique(error); + response->CommitId = CommitId; + NCloud::Send(ctx, Tablet, std::move(response)); + } FILESTORE_TRACK( ResponseSent_TabletWorker, diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor.cpp index 5a9c61146f..dc563b70ea 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor.cpp @@ -605,6 +605,7 @@ STFUNC(TIndexTabletActor::StateWork) switch (ev->GetTypeRewrite()) { HFunc(TEvIndexTabletPrivate::TEvReadDataCompleted, HandleReadDataCompleted); HFunc(TEvIndexTabletPrivate::TEvWriteDataCompleted, HandleWriteDataCompleted); + HFunc(TEvIndexTabletPrivate::TEvAddDataCompleted, HandleAddDataCompleted); HFunc(TEvIndexTabletPrivate::TEvUpdateCounters, HandleUpdateCounters); HFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters, HandleUpdateLeakyBucketCounters); @@ -652,6 +653,7 @@ STFUNC(TIndexTabletActor::StateZombie) IgnoreFunc(TEvIndexTabletPrivate::TEvReadDataCompleted); IgnoreFunc(TEvIndexTabletPrivate::TEvWriteDataCompleted); + IgnoreFunc(TEvIndexTabletPrivate::TEvAddDataCompleted); // tablet related requests IgnoreFunc(TEvents::TEvPoisonPill); @@ -688,6 +690,7 @@ STFUNC(TIndexTabletActor::StateBroken) IgnoreFunc(TEvIndexTabletPrivate::TEvReadDataCompleted); IgnoreFunc(TEvIndexTabletPrivate::TEvWriteDataCompleted); + IgnoreFunc(TEvIndexTabletPrivate::TEvAddDataCompleted); IgnoreFunc(TEvHiveProxy::TEvReassignTabletResponse); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor.h b/cloud/filestore/libs/storage/tablet/tablet_actor.h index ce3f2e2296..588f94d4a7 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor.h +++ b/cloud/filestore/libs/storage/tablet/tablet_actor.h @@ -417,6 +417,10 @@ class TIndexTabletActor final const TEvIndexTabletPrivate::TEvWriteDataCompleted::TPtr& ev, const NActors::TActorContext& ctx); + void HandleAddDataCompleted( + const TEvIndexTabletPrivate::TEvAddDataCompleted::TPtr& ev, + const NActors::TActorContext& ctx); + bool HandleRequests(STFUNC_SIG); bool RejectRequests(STFUNC_SIG); bool RejectRequestsByBrokenTablet(STFUNC_SIG); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp index e6a5b2c5dc..d476f0de99 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp @@ -369,4 +369,24 @@ void TIndexTabletActor::HandleAddData( txStarted = true; } +//////////////////////////////////////////////////////////////////////////////// + +void TIndexTabletActor::HandleAddDataCompleted( + const TEvIndexTabletPrivate::TEvAddDataCompleted::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + // We try to release commit barrier twice: once for the lock + // acquired by the GenerateBlob request and once for the lock + // acquired by the AddData request. Though, the first lock is + // scheduled to be released, it is better to release it as early + // as possible. + TABLET_VERIFY(TryReleaseCollectBarrier(msg->CommitId)); + TryReleaseCollectBarrier(msg->CommitId); + + WorkerActors.erase(ev->Sender); + EnqueueBlobIndexOpIfNeeded(ctx); +} + } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp index 378831c527..cc5cf820ba 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp @@ -20,6 +20,8 @@ void FillFeatures(const TStorageConfig& config, NProto::TFileStore& fileStore) features->SetNegativeEntryTimeout( config.GetNegativeEntryTimeout().MilliSeconds()); features->SetAttrTimeout(config.GetAttrTimeout().MilliSeconds()); + features->SetThreeStageWriteEnabled(config.GetThreeStageWriteEnabled()); + features->SetThreeStageWriteThreshold(config.GetThreeStageWriteThreshold()); } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/filestore/libs/storage/tablet/tablet_private.h b/cloud/filestore/libs/storage/tablet/tablet_private.h index 9952c87094..6118dfd6c8 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_private.h +++ b/cloud/filestore/libs/storage/tablet/tablet_private.h @@ -252,6 +252,15 @@ struct TEvIndexTabletPrivate { }; + // + // AddData completion + // + + struct TAddDataCompleted + { + ui64 CommitId = 0; + }; + // // AddBlob // @@ -613,6 +622,7 @@ struct TEvIndexTabletPrivate EvReadDataCompleted, EvWriteDataCompleted, + EvAddDataCompleted, EvReleaseCollectBarrier, @@ -633,6 +643,7 @@ struct TEvIndexTabletPrivate using TEvReadDataCompleted = TResponseEvent; using TEvWriteDataCompleted = TResponseEvent; + using TEvAddDataCompleted = TResponseEvent; }; } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp b/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp index 8970bc1347..888f8e1787 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_ut_sessions.cpp @@ -824,16 +824,24 @@ Y_UNIT_TEST_SUITE(TIndexTabletTest_Sessions) { NProto::TStorageConfig config; NProto::TFileStoreFeatures features; + features.SetThreeStageWriteThreshold(64_KB); + DoTestShouldReturnFeaturesInCreateSessionResponse(config, features); config.SetTwoStageReadEnabled(true); + config.SetThreeStageWriteEnabled(true); + config.SetThreeStageWriteThreshold(10_MB); config.SetEntryTimeout(TDuration::Seconds(10).MilliSeconds()); config.SetNegativeEntryTimeout(TDuration::Seconds(1).MilliSeconds()); config.SetAttrTimeout(TDuration::Seconds(20).MilliSeconds()); + features.SetTwoStageReadEnabled(true); features.SetEntryTimeout(TDuration::Seconds(10).MilliSeconds()); features.SetNegativeEntryTimeout(TDuration::Seconds(1).MilliSeconds()); features.SetAttrTimeout(TDuration::Seconds(20).MilliSeconds()); + features.SetThreeStageWriteEnabled(true); + features.SetThreeStageWriteThreshold(10_MB); + DoTestShouldReturnFeaturesInCreateSessionResponse(config, features); } } diff --git a/cloud/filestore/public/api/protos/fs.proto b/cloud/filestore/public/api/protos/fs.proto index cdf7b5a546..0fee40de77 100644 --- a/cloud/filestore/public/api/protos/fs.proto +++ b/cloud/filestore/public/api/protos/fs.proto @@ -18,6 +18,7 @@ message TFileStoreFeatures uint32 NegativeEntryTimeout = 3; uint32 AttrTimeout = 4; bool ThreeStageWriteEnabled = 5; + uint32 ThreeStageWriteThreshold = 6; } message TFileStore diff --git a/cloud/filestore/tests/loadtest/service-kikimr-newfeatures-test/nfs-storage.txt b/cloud/filestore/tests/loadtest/service-kikimr-newfeatures-test/nfs-storage.txt index 14fae8613d..f51f3cd113 100644 --- a/cloud/filestore/tests/loadtest/service-kikimr-newfeatures-test/nfs-storage.txt +++ b/cloud/filestore/tests/loadtest/service-kikimr-newfeatures-test/nfs-storage.txt @@ -1,3 +1,4 @@ TwoStageReadEnabled: true NewCompactionEnabled: true NewCleanupEnabled: true +ThreeStageWriteEnabled: true