From eb34b8849204a40f884bbb37c56ddb03aa8e9bd6 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 12 Dec 2024 11:06:32 +0000 Subject: [PATCH 1/3] feat: Optimize PrestoSerializer compress buffer --- velox/serializers/PrestoSerializer.cpp | 33 +++++++++++++++++++------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 891b6f6a9794..f3c2f2545641 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4344,6 +4344,30 @@ void readTopColumns( readColumns( &source, childTypes, resultOffset, nullptr, 0, pool, opts, children); } + +std::unique_ptr uncompressBuffer( + ByteInputStream* source, + const PrestoHeader& header, + common::CompressionKind compressionKind) { + const auto codec = common::compressionKindToCodec(compressionKind); + std::unique_ptr iobuf; + int32_t readCount = 0; + while (readCount < header.compressedSize) { + const auto remaining = header.compressedSize - readCount; + auto view = source->nextView(remaining); + readCount += view.size(); + auto newBuf = folly::IOBuf::wrapBuffer(view.data(), view.size()); + if (iobuf) { + iobuf->prev()->appendChain(std::move(newBuf)); + } else { + iobuf = std::move(newBuf); + } + } + + VELOX_DCHECK_EQ(readCount, header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(iobuf.get(), header.uncompressedSize); +} } // namespace void PrestoVectorSerde::deserialize( @@ -4354,8 +4378,6 @@ void PrestoVectorSerde::deserialize( vector_size_t resultOffset, const Options* options) { const auto prestoOptions = toPrestoOptions(options); - const auto codec = - common::compressionKindToCodec(prestoOptions.compressionKind); auto maybeHeader = PrestoHeader::read(source); VELOX_CHECK( maybeHeader.hasValue(), @@ -4398,13 +4420,8 @@ void PrestoVectorSerde::deserialize( if (!isCompressedBitSet(header.pageCodecMarker)) { readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions); } else { - auto compressBuf = folly::IOBuf::create(header.compressedSize); - source->readBytes(compressBuf->writableData(), header.compressedSize); - compressBuf->append(header.compressedSize); - - // Process chained uncompressed results IOBufs. auto uncompress = - codec->uncompress(compressBuf.get(), header.uncompressedSize); + uncompressBuffer(source, header, prestoOptions.compressionKind); auto uncompressedSource = std::make_unique( byteRangesFromIOBuf(uncompress.get())); readTopColumns( From 29ecf34d909b2ae498c94d25aabcbd159ca7dbe1 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 13 Dec 2024 11:01:09 +0000 Subject: [PATCH 2/3] only apply for BufferInputStream --- velox/common/memory/ByteStream.cpp | 25 +++++++++++++ velox/common/memory/ByteStream.h | 2 ++ velox/common/memory/tests/ByteStreamTest.cpp | 38 ++++++++++++++++++++ velox/serializers/PrestoSerializer.cpp | 35 +++++++++--------- 4 files changed, 81 insertions(+), 19 deletions(-) diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index c8f489854988..d70886b5cef8 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -172,6 +172,31 @@ std::string_view BufferInputStream::nextView(int32_t size) { reinterpret_cast(current_->buffer) + position, viewSize); } +std::unique_ptr BufferInputStream::readBytes(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); + if (size == 0) { + return nullptr; + } + std::unique_ptr result; + for (;;) { + const int32_t availableBytes = current_->size - current_->position; + const int32_t readBytes = std::min(availableBytes, size); + auto newBuf = folly::IOBuf::wrapBuffer( + current_->buffer + current_->position, readBytes); + if (result) { + result->prev()->appendChain(std::move(newBuf)); + } else { + result = std::move(newBuf); + } + size -= readBytes; + current_->position += readBytes; + if (size == 0) { + return result; + } + nextRange(); + } +} + void BufferInputStream::skip(int32_t size) { VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes"); for (;;) { diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index c8435299c118..931324f02f75 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -191,6 +191,8 @@ class BufferInputStream : public ByteInputStream { void readBytes(uint8_t* bytes, int32_t size) override; + std::unique_ptr readBytes(int32_t size); + std::string_view nextView(int32_t size) override; void skip(int32_t size) override; diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index 0916b412934c..fde48baab388 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -431,6 +431,44 @@ TEST_P(InputByteStreamTest, inputStream) { ASSERT_TRUE(byteStream->atEnd()); } +TEST_P(InputByteStreamTest, iobuf) { + if (!GetParam()) { + return; + } + const auto streamSize = 4096; + std::vector byteRanges; + std::uint8_t buffer[streamSize]; + for (auto i = 0; i < streamSize; ++i) { + buffer[i] = i % 256; + } + byteRanges.push_back(ByteRange{buffer, streamSize, 0}); + + std::uint8_t buffer2[streamSize]; + for (auto i = 0; i < streamSize; ++i) { + buffer2[i] = i % 13; + } + byteRanges.push_back(ByteRange{buffer2, streamSize, 0}); + + auto byteStream = createStream(byteRanges); + auto bufferStream = dynamic_cast(byteStream.get()); + for (int offset = 0; offset < streamSize;) { + auto iobuf = bufferStream->readBytes(streamSize / 8); + ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8); + for (int i = 0; i < streamSize / 8; ++i, ++offset) { + ASSERT_EQ(iobuf->data()[i], offset % 256); + } + } + + for (int offset = 0; offset < streamSize;) { + auto iobuf = bufferStream->readBytes(streamSize / 8); + ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8); + for (int i = 0; i < streamSize / 8; ++i, ++offset) { + ASSERT_EQ(iobuf->data()[i], offset % 13); + } + } + ASSERT_TRUE(bufferStream->atEnd()); +} + TEST_P(InputByteStreamTest, emptyInputStreamError) { if (GetParam()) { VELOX_ASSERT_THROW(createStream({}), "Empty BufferInputStream"); diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index f3c2f2545641..24631b93c0aa 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4345,28 +4345,25 @@ void readTopColumns( &source, childTypes, resultOffset, nullptr, 0, pool, opts, children); } -std::unique_ptr uncompressBuffer( +std::unique_ptr uncompressStream( ByteInputStream* source, const PrestoHeader& header, - common::CompressionKind compressionKind) { + common::CompressionKind compressionKind, + memory::MemoryPool& pool) { const auto codec = common::compressionKindToCodec(compressionKind); - std::unique_ptr iobuf; - int32_t readCount = 0; - while (readCount < header.compressedSize) { - const auto remaining = header.compressedSize - readCount; - auto view = source->nextView(remaining); - readCount += view.size(); - auto newBuf = folly::IOBuf::wrapBuffer(view.data(), view.size()); - if (iobuf) { - iobuf->prev()->appendChain(std::move(newBuf)); - } else { - iobuf = std::move(newBuf); - } - } - - VELOX_DCHECK_EQ(readCount, header.compressedSize); + if (dynamic_cast(source)) { + // If the source is a BufferInputStream, we can avoid copying the data + // by creating an IOBuf from the underlying buffer. + const auto bufferSource = dynamic_cast(source); + const auto iobuf = bufferSource->readBytes(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(iobuf.get(), header.uncompressedSize); + } + auto compressBuf = folly::IOBuf::create(header.compressedSize); + source->readBytes(compressBuf->writableData(), header.compressedSize); + compressBuf->append(header.compressedSize); // Process chained uncompressed results IOBufs. - return codec->uncompress(iobuf.get(), header.uncompressedSize); + return codec->uncompress(compressBuf.get(), header.uncompressedSize); } } // namespace @@ -4421,7 +4418,7 @@ void PrestoVectorSerde::deserialize( readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions); } else { auto uncompress = - uncompressBuffer(source, header, prestoOptions.compressionKind); + uncompressStream(source, header, prestoOptions.compressionKind, *pool); auto uncompressedSource = std::make_unique( byteRangesFromIOBuf(uncompress.get())); readTopColumns( From 38fe41c981db89a9a91351f377fd7207dbfa4de1 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 19 Dec 2024 13:05:01 +0000 Subject: [PATCH 3/3] address comments --- velox/common/memory/tests/ByteStreamTest.cpp | 33 +++++++------------- velox/serializers/PrestoSerializer.cpp | 8 ++--- velox/serializers/RowSerializer.h | 28 ++++++++++++----- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index fde48baab388..0ba7f260aa0d 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -451,30 +451,19 @@ TEST_P(InputByteStreamTest, iobuf) { auto byteStream = createStream(byteRanges); auto bufferStream = dynamic_cast(byteStream.get()); - for (int offset = 0; offset < streamSize;) { - auto iobuf = bufferStream->readBytes(streamSize / 8); - ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8); - for (int i = 0; i < streamSize / 8; ++i, ++offset) { - ASSERT_EQ(iobuf->data()[i], offset % 256); - } - } - - for (int offset = 0; offset < streamSize;) { - auto iobuf = bufferStream->readBytes(streamSize / 8); - ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8); - for (int i = 0; i < streamSize / 8; ++i, ++offset) { - ASSERT_EQ(iobuf->data()[i], offset % 13); + for (int offset = 0; offset < streamSize * 2;) { + const int readBytes = std::min(streamSize / 11, streamSize * 2 - offset); + auto iobuf = bufferStream->readBytes(readBytes); + ASSERT_EQ(iobuf->computeChainDataLength(), readBytes); + for (int i = 0; i < readBytes; ++i, ++offset) { + if (offset < streamSize) { + ASSERT_EQ(iobuf->data()[i], offset % 256); + } else { + ASSERT_EQ(iobuf->data()[i], (offset - streamSize) % 13); + } } } - ASSERT_TRUE(bufferStream->atEnd()); -} - -TEST_P(InputByteStreamTest, emptyInputStreamError) { - if (GetParam()) { - VELOX_ASSERT_THROW(createStream({}), "Empty BufferInputStream"); - } else { - VELOX_ASSERT_THROW(createStream({}), "(0 vs. 0) Empty FileInputStream"); - } + ASSERT_TRUE(byteStream->atEnd()); } TEST_P(InputByteStreamTest, remainingSize) { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 24631b93c0aa..85630b9dfab2 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4348,13 +4348,11 @@ void readTopColumns( std::unique_ptr uncompressStream( ByteInputStream* source, const PrestoHeader& header, - common::CompressionKind compressionKind, - memory::MemoryPool& pool) { + common::CompressionKind compressionKind) { const auto codec = common::compressionKindToCodec(compressionKind); - if (dynamic_cast(source)) { + if (auto* bufferSource = dynamic_cast(source)) { // If the source is a BufferInputStream, we can avoid copying the data // by creating an IOBuf from the underlying buffer. - const auto bufferSource = dynamic_cast(source); const auto iobuf = bufferSource->readBytes(header.compressedSize); // Process chained uncompressed results IOBufs. return codec->uncompress(iobuf.get(), header.uncompressedSize); @@ -4418,7 +4416,7 @@ void PrestoVectorSerde::deserialize( readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions); } else { auto uncompress = - uncompressStream(source, header, prestoOptions.compressionKind, *pool); + uncompressStream(source, header, prestoOptions.compressionKind); auto uncompressedSource = std::make_unique( byteRangesFromIOBuf(uncompress.get())); readTopColumns( diff --git a/velox/serializers/RowSerializer.h b/velox/serializers/RowSerializer.h index ae062b658fd7..e209e126f5a5 100644 --- a/velox/serializers/RowSerializer.h +++ b/velox/serializers/RowSerializer.h @@ -288,14 +288,7 @@ class RowDeserializer { if (header.compressed) { VELOX_DCHECK_NE( compressionKind, common::CompressionKind::CompressionKind_NONE); - auto compressBuf = folly::IOBuf::create(header.compressedSize); - source->readBytes(compressBuf->writableData(), header.compressedSize); - compressBuf->append(header.compressedSize); - - // Process chained uncompressed results IOBufs. - const auto codec = common::compressionKindToCodec(compressionKind); - uncompressedBuf = - codec->uncompress(compressBuf.get(), header.uncompressedSize); + uncompressedBuf = uncompressStream(source, header, compressionKind); } std::unique_ptr uncompressedStream; @@ -352,6 +345,25 @@ class RowDeserializer { rowBuffer.append(rowFragment.data(), rowFragment.size()); } } + + static std::unique_ptr uncompressStream( + ByteInputStream* source, + const detail::RowHeader& header, + common::CompressionKind compressionKind) { + const auto codec = common::compressionKindToCodec(compressionKind); + if (auto* bufferSource = dynamic_cast(source)) { + // If the source is a BufferInputStream, we can avoid copying the data + // by creating an IOBuf from the underlying buffer. + const auto iobuf = bufferSource->readBytes(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(iobuf.get(), header.uncompressedSize); + } + auto compressBuf = folly::IOBuf::create(header.compressedSize); + source->readBytes(compressBuf->writableData(), header.compressedSize); + compressBuf->append(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(compressBuf.get(), header.uncompressedSize); + } }; } // namespace facebook::velox::serializer