Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 19, 2024
1 parent 29ecf34 commit 38fe41c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 35 deletions.
33 changes: 11 additions & 22 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,30 +451,19 @@ TEST_P(InputByteStreamTest, iobuf) {

auto byteStream = createStream(byteRanges);
auto bufferStream = dynamic_cast<BufferInputStream*>(byteStream.get());
for (int offset = 0; offset < streamSize;) {
auto iobuf = bufferStream->readBytes(streamSize / 8);
ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8);
for (int i = 0; i < streamSize / 8; ++i, ++offset) {
ASSERT_EQ(iobuf->data()[i], offset % 256);
}
}

for (int offset = 0; offset < streamSize;) {
auto iobuf = bufferStream->readBytes(streamSize / 8);
ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8);
for (int i = 0; i < streamSize / 8; ++i, ++offset) {
ASSERT_EQ(iobuf->data()[i], offset % 13);
for (int offset = 0; offset < streamSize * 2;) {
const int readBytes = std::min(streamSize / 11, streamSize * 2 - offset);
auto iobuf = bufferStream->readBytes(readBytes);
ASSERT_EQ(iobuf->computeChainDataLength(), readBytes);
for (int i = 0; i < readBytes; ++i, ++offset) {
if (offset < streamSize) {
ASSERT_EQ(iobuf->data()[i], offset % 256);
} else {
ASSERT_EQ(iobuf->data()[i], (offset - streamSize) % 13);
}
}
}
ASSERT_TRUE(bufferStream->atEnd());
}

TEST_P(InputByteStreamTest, emptyInputStreamError) {
if (GetParam()) {
VELOX_ASSERT_THROW(createStream({}), "Empty BufferInputStream");
} else {
VELOX_ASSERT_THROW(createStream({}), "(0 vs. 0) Empty FileInputStream");
}
ASSERT_TRUE(byteStream->atEnd());
}

TEST_P(InputByteStreamTest, remainingSize) {
Expand Down
8 changes: 3 additions & 5 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4348,13 +4348,11 @@ void readTopColumns(
std::unique_ptr<folly::IOBuf> uncompressStream(
ByteInputStream* source,
const PrestoHeader& header,
common::CompressionKind compressionKind,
memory::MemoryPool& pool) {
common::CompressionKind compressionKind) {
const auto codec = common::compressionKindToCodec(compressionKind);
if (dynamic_cast<BufferInputStream*>(source)) {
if (auto* bufferSource = dynamic_cast<BufferInputStream*>(source)) {
// If the source is a BufferInputStream, we can avoid copying the data
// by creating an IOBuf from the underlying buffer.
const auto bufferSource = dynamic_cast<BufferInputStream*>(source);
const auto iobuf = bufferSource->readBytes(header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(iobuf.get(), header.uncompressedSize);
Expand Down Expand Up @@ -4418,7 +4416,7 @@ void PrestoVectorSerde::deserialize(
readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions);
} else {
auto uncompress =
uncompressStream(source, header, prestoOptions.compressionKind, *pool);
uncompressStream(source, header, prestoOptions.compressionKind);
auto uncompressedSource = std::make_unique<BufferInputStream>(
byteRangesFromIOBuf(uncompress.get()));
readTopColumns(
Expand Down
28 changes: 20 additions & 8 deletions velox/serializers/RowSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,7 @@ class RowDeserializer {
if (header.compressed) {
VELOX_DCHECK_NE(
compressionKind, common::CompressionKind::CompressionKind_NONE);
auto compressBuf = folly::IOBuf::create(header.compressedSize);
source->readBytes(compressBuf->writableData(), header.compressedSize);
compressBuf->append(header.compressedSize);

// Process chained uncompressed results IOBufs.
const auto codec = common::compressionKindToCodec(compressionKind);
uncompressedBuf =
codec->uncompress(compressBuf.get(), header.uncompressedSize);
uncompressedBuf = uncompressStream(source, header, compressionKind);
}

std::unique_ptr<ByteInputStream> uncompressedStream;
Expand Down Expand Up @@ -352,6 +345,25 @@ class RowDeserializer {
rowBuffer.append(rowFragment.data(), rowFragment.size());
}
}

static std::unique_ptr<folly::IOBuf> uncompressStream(
ByteInputStream* source,
const detail::RowHeader& header,
common::CompressionKind compressionKind) {
const auto codec = common::compressionKindToCodec(compressionKind);
if (auto* bufferSource = dynamic_cast<BufferInputStream*>(source)) {
// If the source is a BufferInputStream, we can avoid copying the data
// by creating an IOBuf from the underlying buffer.
const auto iobuf = bufferSource->readBytes(header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(iobuf.get(), header.uncompressedSize);
}
auto compressBuf = folly::IOBuf::create(header.compressedSize);
source->readBytes(compressBuf->writableData(), header.compressedSize);
compressBuf->append(header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(compressBuf.get(), header.uncompressedSize);
}
};

} // namespace facebook::velox::serializer

0 comments on commit 38fe41c

Please sign in to comment.