Skip to content

Commit

Permalink
CR3
Browse files Browse the repository at this point in the history
  • Loading branch information
budevg committed Oct 17, 2024
1 parent 7dc90ac commit 2e7d568
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 154 deletions.
4 changes: 3 additions & 1 deletion cloud/filestore/libs/service_local/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <library/cpp/monlib/service/pages/templates.h>

#include <util/generic/size_literals.h>

namespace NCloud::NFileStore {

namespace {
Expand All @@ -18,7 +20,7 @@ namespace {
xxx(MaxInodeCount, ui32, 1000000 )\
xxx(MaxHandlePerSessionCount, ui32, 10000 )\
xxx(DirectIoEnabled, bool, false )\
xxx(DirectIoAlign, ui32, 4096 )\
xxx(DirectIoAlign, ui32, 4_KB )\
// FILESTORE_SERVICE_CONFIG

#define FILESTORE_SERVICE_DECLARE_CONFIG(name, type, value) \
Expand Down
12 changes: 6 additions & 6 deletions cloud/filestore/libs/service_local/fs_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,16 @@ TFuture<NProto::TReadDataResponse> TLocalFileSystem::ReadDataAsync(
auto b = std::make_shared<TAlignedBuffer>(request.GetLength(), align);
NSan::Unpoison(b->Begin(), b->Size());

TArrayRef<char> data((char*)b->Begin(), (char*)b->End());
TArrayRef<char> data(b->Begin(), b->End());
auto promise = NewPromise<NProto::TReadDataResponse>();
FileIOService->AsyncRead(*handle, request.GetOffset(), data).Subscribe(
[b = std::move(b), promise] (const TFuture<ui32>& f) mutable {
NProto::TReadDataResponse response;
try {
auto bytesRead = f.GetValue();
b->TrimSize(bytesRead);
response.SetBuffer(std::move(b->GetBuffer()));
response.SetBufferOffset(b->AlignedDataOffset());
response.SetBuffer(std::move(b->AccessBuffer()));
} catch (const TServiceError& e) {
*response.MutableError() = MakeError(MAKE_FILESTORE_ERROR(
ErrnoToFileStoreError(STATUS_FROM_CODE(e.GetCode()))));
Expand All @@ -135,10 +136,9 @@ TFuture<NProto::TWriteDataResponse> TLocalFileSystem::WriteDataAsync(
TErrorResponse(ErrorInvalidHandle(request.GetHandle())));
}

auto align = Config->GetDirectIoEnabled() ? Config->GetDirectIoAlign() : 0;

auto b = std::make_shared<TAlignedBuffer>(std::move(*request.MutableBuffer()), align);
TArrayRef<char> data((char*)b->Begin(), (char*)b->End());
auto b = std::move(*request.MutableBuffer());
auto offset = request.GetBufferOffset();
TArrayRef<char> data(b.begin() + offset, b.vend());
auto promise = NewPromise<NProto::TWriteDataResponse>();
FileIOService->AsyncWrite(*handle, request.GetOffset(), data).Subscribe(
[b = std::move(b), promise] (const TFuture<ui32>& f) mutable {
Expand Down
50 changes: 16 additions & 34 deletions cloud/filestore/libs/service_local/service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -761,24 +761,6 @@ struct TTestBootstrap

#undef FILESTORE_DECLARE_METHOD

NProto::TReadDataResponse
ReadDataAligned(ui64 handle, ui64 offset, ui32 len, ui32 align)
{
auto request = CreateReadDataRequest(handle, offset, len);
auto dbg = request->ShortDebugString();
auto response = Store->ReadData(Ctx, std::move(request)).GetValueSync();

UNIT_ASSERT_C(
SUCCEEDED(response.GetError().GetCode()),
response.GetError().GetMessage() + "@" + dbg);

TAlignedBuffer alignedBuffer(
std::move(*response.MutableBuffer()),
align);
response.SetBuffer(alignedBuffer.Begin(), alignedBuffer.Size());
return response;
}

NProto::TWriteDataResponse WriteDataAligned(
ui64 handle,
ui64 offset,
Expand All @@ -794,7 +776,8 @@ struct TTestBootstrap
(void*)(alignedBuffer.Begin()),
(void*)buffer.data(),
buffer.size());
request->SetBuffer(std::move(alignedBuffer.GetBuffer()));
request->SetBufferOffset(alignedBuffer.AlignedDataOffset());
request->SetBuffer(std::move(alignedBuffer.AccessBuffer()));

auto dbg = request->ShortDebugString();
auto response =
Expand Down Expand Up @@ -822,7 +805,8 @@ struct TTestBootstrap
(void*)(alignedBuffer.Begin()),
(void*)buffer.data(),
buffer.size());
request->SetBuffer(std::move(alignedBuffer.GetBuffer()));
request->SetBufferOffset(alignedBuffer.AlignedDataOffset());
request->SetBuffer(std::move(alignedBuffer.AccessBuffer()));

auto dbg = request->ShortDebugString();
auto response =
Expand Down Expand Up @@ -2020,7 +2004,8 @@ Y_UNIT_TEST_SUITE(LocalFileStore)
"file",
TCreateHandleArgs::CREATE | TCreateHandleArgs::DIRECT)
.GetHandle();
auto data = bootstrap.ReadData(handle, 0, 100).GetBuffer();
auto readRsp = bootstrap.ReadData(handle, 0, 100);
auto data = readRsp.GetBuffer().substr(readRsp.GetBufferOffset());
UNIT_ASSERT_VALUES_EQUAL(data.size(), 0);

data = "aaaabbbbcccccdddddeeee";
Expand All @@ -2038,26 +2023,23 @@ Y_UNIT_TEST_SUITE(LocalFileStore)
data.append(directIoAlign, 'y');
bootstrap.WriteDataAligned(handle, 0, data, directIoAlign);

auto readDataWithOffset =
[&bootstrap](ui64 handle, ui64 offset, ui32 len)
{
auto rsp = bootstrap.ReadData(handle, offset, len);
return rsp.GetBuffer().substr(rsp.GetBufferOffset());
};

// read [0, 2*align]
auto buffer =
bootstrap.ReadDataAligned(handle, 0, data.size(), directIoAlign)
.GetBuffer();
auto buffer = readDataWithOffset(handle, 0, data.size());
UNIT_ASSERT_VALUES_EQUAL(buffer, data);

// read [0, align]
buffer =
bootstrap.ReadDataAligned(handle, 0, directIoAlign, directIoAlign)
.GetBuffer();
buffer = readDataWithOffset(handle, 0, directIoAlign);
UNIT_ASSERT_VALUES_EQUAL(buffer, data.substr(0, directIoAlign));

// read [align, align]
buffer = bootstrap
.ReadDataAligned(
handle,
directIoAlign,
directIoAlign,
directIoAlign)
.GetBuffer();
buffer = readDataWithOffset(handle, directIoAlign, directIoAlign);
UNIT_ASSERT_VALUES_EQUAL(
buffer,
data.substr(directIoAlign, directIoAlign));
Expand Down
2 changes: 1 addition & 1 deletion cloud/filestore/libs/vfs_fuse/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace {
xxx(AsyncHandleOperationPeriod, TDuration, TDuration::MilliSeconds(50) )\
\
xxx(DirectIoEnabled, bool, false )\
xxx(DirectIoAlign, ui32, 4096 )\
xxx(DirectIoAlign, ui32, 4_KB )\
// FILESTORE_FUSE_CONFIG

#define FILESTORE_FILESYSTEM_DECLARE_CONFIG(name, type, value) \
Expand Down
16 changes: 8 additions & 8 deletions cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,14 @@ void TFileSystem::Read(
.Subscribe([=, ptr = weak_from_this()] (const auto& future) {
const auto& response = future.GetValue();
if (auto self = ptr.lock(); CheckResponse(self, *callContext, req, response)) {
auto align = Config->GetDirectIoEnabled() ? Config->GetDirectIoAlign() : 0;
auto [alignedData, size] = TAlignedBuffer::ExtractAlignedData(
response.GetBuffer(),
align);
const auto& buffer = response.GetBuffer();
ui32 bufferOffset = response.GetBufferOffset();
self->ReplyBuf(
*callContext,
response.GetError(),
req,
alignedData,
size);
buffer.Data() + bufferOffset,
buffer.Size() - bufferOffset);
}
});
}
Expand Down Expand Up @@ -236,7 +234,8 @@ void TFileSystem::Write(
auto request = StartRequest<NProto::TWriteDataRequest>(ino);
request->SetHandle(fi->fh);
request->SetOffset(offset);
request->SetBuffer(std::move(alignedBuffer.GetBuffer()));
request->SetBufferOffset(alignedBuffer.AlignedDataOffset());
request->SetBuffer(std::move(alignedBuffer.AccessBuffer()));

const auto handle = fi->fh;
const auto reqId = callContext->RequestId;
Expand Down Expand Up @@ -300,7 +299,8 @@ void TFileSystem::WriteBuf(
auto request = StartRequest<NProto::TWriteDataRequest>(ino);
request->SetHandle(fi->fh);
request->SetOffset(offset);
request->SetBuffer(std::move(alignedBuffer.GetBuffer()));
request->SetBufferOffset(alignedBuffer.AlignedDataOffset());
request->SetBuffer(std::move(alignedBuffer.AccessBuffer()));

const auto handle = fi->fh;
const auto reqId = callContext->RequestId;
Expand Down
6 changes: 6 additions & 0 deletions cloud/filestore/public/api/protos/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ message TReadDataResponse
// Buffer with bytes read.
bytes Buffer = 2;

// Bytes read start at buffer offset.
uint32 BufferOffset = 3;

// Optional response headers.
TResponseHeaders Headers = 1000;
}
Expand All @@ -171,6 +174,9 @@ message TWriteDataRequest

// Buffer with bytes to write.
bytes Buffer = 6;

// Bytes to write start at buffer offset.
uint32 BufferOffset = 7;
}

message TWriteDataResponse
Expand Down
119 changes: 119 additions & 0 deletions cloud/storage/core/libs/common/aligned_buffer.cpp
Original file line number Diff line number Diff line change
@@ -1 +1,120 @@
#include "aligned_buffer.h"

#include <cloud/storage/core/libs/common/error.h>

#include <util/system/align.h>

namespace NCloud {

////////////////////////////////////////////////////////////////////////////////

TStringBuf TAlignedBuffer::ExtractAlignedData(
const TString& buffer,
ui32 align)
{
auto alignedData = buffer.begin();
if (align) {
alignedData = AlignUp(buffer.data(), align);
if (alignedData > buffer.end()) {
ythrow TServiceError(E_ARGUMENT)
<< "Extracting unaligned buffer " << (void*)buffer.begin()
<< " with alignment " << align
<< " with size " << buffer.size();
}
}

return TStringBuf(alignedData, buffer.end() - alignedData);
}

TAlignedBuffer::TAlignedBuffer()
: AlignedData(Buffer.begin())
{}

TAlignedBuffer::TAlignedBuffer(TAlignedBuffer&& other)
: Buffer(std::move(other.Buffer))
, AlignedData(std::move(other.AlignedData))
{
other.AlignedData = other.Buffer.begin();
}

TAlignedBuffer& TAlignedBuffer::operator=(TAlignedBuffer&& other)
{
Buffer = std::move(other.Buffer);
AlignedData = std::move(other.AlignedData);
other.AlignedData = other.Buffer.begin();
return *this;
}

TAlignedBuffer::TAlignedBuffer(ui32 size, ui32 align)
: Buffer(TString::Uninitialized(size + align))
, AlignedData(Buffer.begin())
{
if (align) {
Y_DEBUG_ABORT_UNLESS(IsPowerOf2(align)); // align should be power of 2
AlignedData = AlignUp(Buffer.data(), align);
Buffer.resize(AlignedData + size - Buffer.begin());
}
}

TAlignedBuffer::TAlignedBuffer(TString&& buffer, ui32 align)
: Buffer(std::move(buffer))
, AlignedData(Buffer.begin())
{
if (align) {
Y_DEBUG_ABORT_UNLESS(IsPowerOf2(align)); // align should be power of 2
AlignedData = AlignUp(Buffer.data(), align);
if (AlignedData > Buffer.end()) {
ythrow TServiceError(E_ARGUMENT)
<< "Initializing from unaligned buffer "
<< (void*)Buffer.begin()
<< " with alignment " << align
<< " with size " << Buffer.size();
}
}
}

size_t TAlignedBuffer::AlignedDataOffset() const
{
return AlignedData - Buffer.begin();
}

char* TAlignedBuffer::Begin()
{
return const_cast<char*>(AlignedData);
}

const char* TAlignedBuffer::Begin() const
{
return AlignedData;
}

char* TAlignedBuffer::End()
{
return const_cast<char*>(Buffer.end());
}

const char* TAlignedBuffer::End() const
{
return Buffer.end();
}

size_t TAlignedBuffer::Size() const
{
return End() - Begin();
}

void TAlignedBuffer::TrimSize(size_t size)
{
if (size > Size()) {
ythrow TServiceError(E_ARGUMENT)
<< "Tried to trim to size " << size << " > " << Size();
}
Buffer.resize(AlignedDataOffset() + size);
}

TString& TAlignedBuffer::AccessBuffer()
{
return Buffer;
}

} // namespace NCloud
Loading

0 comments on commit 2e7d568

Please sign in to comment.