diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 758957ac2a7c3..f0fd1d964a3ee 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -127,7 +127,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - &options_); + options_.get()); resultOffset = result_->size(); } } @@ -154,7 +154,7 @@ RowVectorPtr Exchange::getOutput() { outputType_, &result_, resultOffset, - &options_); + options_.get()); // We expect the row-wise deserialization to consume all the input into one // output vector. VELOX_CHECK(inputStream->atEnd()); diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 0aa69fa9f9509..cab700f2076db 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -54,7 +54,13 @@ class Exchange : public SourceOperator { serdeKind_(exchangeNode->serdeKind()), processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} { - options_.compressionKind = + if (serdeKind_ == VectorSerde::Kind::kPresto) { + options_ = std::make_unique< + serializer::presto::PrestoVectorSerde::PrestoOptions>(); + } else { + options_ = std::make_unique(); + } + options_->compressionKind = OutputBufferManager::getInstance().lock()->compressionKind(); } @@ -115,7 +121,7 @@ class Exchange : public SourceOperator { std::vector> currentPages_; bool atEnd_{false}; std::default_random_engine rng_{std::random_device{}()}; - serializer::presto::PrestoVectorSerde::PrestoOptions options_; + std::unique_ptr options_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/OperatorTraceReader.h b/velox/exec/OperatorTraceReader.h index 60860e0e691e4..6a522355e5125 100644 --- a/velox/exec/OperatorTraceReader.h +++ b/velox/exec/OperatorTraceReader.h @@ -43,6 +43,7 @@ class OperatorTraceInputReader { const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{ true, common::CompressionKind_ZSTD, // TODO: Use trace config. + 0.8, /*_nullsFirst=*/true}; const std::shared_ptr fs_; const RowTypePtr dataType_; diff --git a/velox/exec/OperatorTraceWriter.h b/velox/exec/OperatorTraceWriter.h index 451b87f792bb0..6e9b50a95e395 100644 --- a/velox/exec/OperatorTraceWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -58,6 +58,7 @@ class OperatorTraceInputWriter { const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = { true, common::CompressionKind::CompressionKind_ZSTD, + 0.8, /*nullsFirst=*/true}; const std::shared_ptr fs_; memory::MemoryPool* const pool_; diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 67504891ec9e2..59794de58289f 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -19,7 +19,38 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { +namespace { +std::unique_ptr getVectorSerdeOptions( + VectorSerde::Kind kind) { + std::unique_ptr options = + kind == VectorSerde::Kind::kPresto + ? std::make_unique() + : std::make_unique(); + options->compressionKind = + OutputBufferManager::getInstance().lock()->compressionKind(); + options->minCompressionRatio = PartitionedOutput::minCompressionRatio(); + return options; +} +} // namespace + namespace detail { +Destination::Destination( + const std::string& taskId, + int destination, + VectorSerde* serde, + memory::MemoryPool* pool, + bool eagerFlush, + std::function recordEnqueued) + : taskId_(taskId), + destination_(destination), + serde_(serde), + options_(getVectorSerdeOptions(serde_->kind())), + pool_(pool), + eagerFlush_(eagerFlush), + recordEnqueued_(std::move(recordEnqueued)) { + setTargetSizePct(); +} + BlockingReason Destination::advance( uint64_t maxBytes, const std::vector& sizes, @@ -57,15 +88,7 @@ BlockingReason Destination::advance( if (current_ == nullptr) { current_ = std::make_unique(pool_, serde_); const auto rowType = asRowType(output->type()); - if (serde_->kind() == VectorSerde::Kind::kPresto) { - serializer::presto::PrestoVectorSerde::PrestoOptions options; - options.compressionKind = - OutputBufferManager::getInstance().lock()->compressionKind(); - options.minCompressionRatio = PartitionedOutput::minCompressionRatio(); - current_->createStreamTree(rowType, rowsInCurrent_, &options); - } else { - current_->createStreamTree(rowType, rowsInCurrent_); - } + current_->createStreamTree(rowType, rowsInCurrent_, options_.get()); } const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow); diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index fecdaf57c9c59..1b3f6ba96bd4e 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -35,15 +35,7 @@ class Destination { VectorSerde* serde, memory::MemoryPool* pool, bool eagerFlush, - std::function recordEnqueued) - : taskId_(taskId), - destination_(destination), - serde_(serde), - pool_(pool), - eagerFlush_(eagerFlush), - recordEnqueued_(std::move(recordEnqueued)) { - setTargetSizePct(); - } + std::function recordEnqueued); /// Resets the destination before starting a new batch. void beginBatch() { @@ -112,6 +104,7 @@ class Destination { const std::string taskId_; const int destination_; VectorSerde* const serde_; + const std::unique_ptr options_; memory::MemoryPool* const pool_; const bool eagerFlush_; const std::function recordEnqueued_; diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index 6b5e4f464b925..f2d3b07db7598 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -175,7 +175,10 @@ uint64_t SpillWriter::write( NanosecondTimer timer(&timeNs); if (batch_ == nullptr) { serializer::presto::PrestoVectorSerde::PrestoOptions options = { - kDefaultUseLosslessTimestamp, compressionKind_, true /*nullsFirst*/}; + kDefaultUseLosslessTimestamp, + compressionKind_, + 0.8, + true /*nullsFirst*/}; batch_ = std::make_unique(pool_, serde_); batch_->createStreamTree( std::static_pointer_cast(rows->type()), @@ -300,6 +303,7 @@ SpillReadFile::SpillReadFile( readOptions_{ kDefaultUseLosslessTimestamp, compressionKind_, + 0.8, /*nullsFirst=*/true}, pool_(pool), serde_(getNamedVectorSerde(VectorSerde::Kind::kPresto)), diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index b57a55e8fee9a..db2fa3267f6ab 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -2408,22 +2408,18 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) { test(100'000, 1); } else if (GetParam() == VectorSerde::Kind::kCompactRow) { test(1, 1'000); - test(1'000, 28); - test(10'000, 3); + test(1'000, 38); + test(10'000, 4); test(100'000, 1); } else { test(1, 1'000); - test(1'000, 63); + test(1'000, 72); test(10'000, 7); test(100'000, 1); } } TEST_P(MultiFragmentTest, compression) { - // NOTE: only presto format supports compression for now - if (GetParam() != VectorSerde::Kind::kPresto) { - return; - } bufferManager_->testingSetCompression( common::CompressionKind::CompressionKind_LZ4); auto guard = folly::makeGuard([&]() { diff --git a/velox/serializers/CompactRowSerializer.cpp b/velox/serializers/CompactRowSerializer.cpp index 6a1324dd433d9..b2471e09fbeae 100644 --- a/velox/serializers/CompactRowSerializer.cpp +++ b/velox/serializers/CompactRowSerializer.cpp @@ -23,10 +23,60 @@ namespace facebook::velox::serializer { namespace { using TRowSize = uint32_t; +void writeInt32(OutputStream* out, int32_t value) { + out->write(reinterpret_cast(&value), sizeof(value)); +} + +void writeBool(OutputStream* out, bool value) { + char writeValue = value ? 1 : 0; + out->write(reinterpret_cast(&writeValue), sizeof(char)); +} + +VectorSerde::Options toValidOptions(const VectorSerde::Options* options) { + if (options == nullptr) { + return VectorSerde::Options(); + } + return *options; +} + +struct CompactRowHeader { + int32_t uncompressedSize; + int32_t compressedSize; + bool compressed; + + static CompactRowHeader read(ByteInputStream* source) { + CompactRowHeader header; + header.uncompressedSize = source->read(); + header.compressedSize = source->read(); + header.compressed = source->read(); + + VELOX_CHECK_GE(header.uncompressedSize, 0); + VELOX_CHECK_GE(header.compressedSize, 0); + + return header; + } + + static size_t size() { + return sizeof(int32_t) * 2 + sizeof(char); + } + + std::string debugString() const { + return fmt::format( + "uncompressedSize: {}, compressedSize: {}, compressed: {}", + uncompressedSize, + compressedSize, + compressed); + } +}; + class CompactRowVectorSerializer : public IterativeVectorSerializer { public: - explicit CompactRowVectorSerializer(StreamArena* streamArena) - : pool_{streamArena->pool()} {} + CompactRowVectorSerializer( + StreamArena* streamArena, + const VectorSerde::Options& opts) + : pool_{streamArena->pool()}, + codec_(common::compressionKindToCodec(opts.compressionKind)), + opts_(opts) {} void append( const RowVectorPtr& vector, @@ -120,25 +170,106 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer { } size_t maxSerializedSize() const override { - size_t totalSize = 0; - for (const auto& buffer : buffers_) { - totalSize += buffer->size(); + const auto size = uncompressedSize(); + if (!needCompression()) { + return CompactRowHeader::size() + size; } - return totalSize; + VELOX_CHECK_LE( + size, + codec_->maxUncompressedLength(), + "UncompressedSize exceeds limit"); + return CompactRowHeader::size() + codec_->maxCompressedLength(size); } + // The serialization format is | uncompressedSize | compressedSize | data | + // when compressed. void flush(OutputStream* stream) override { - for (const auto& buffer : buffers_) { - stream->write(buffer->as(), buffer->size()); + constexpr int32_t kMaxCompressionAttemptsToSkip = 30; + const auto size = uncompressedSize(); + if (!needCompression()) { + flushUncompressed(size, stream); + } else if (numCompressionToSkip_ > 0) { + flushUncompressed(size, stream); + stats_.compressionSkippedBytes += size; + --numCompressionToSkip_; + ++stats_.numCompressionSkipped; + } else { + // Compress the buffer if satisfied condition. + IOBufOutputStream out( + *pool_, nullptr, buffers_.size() * sizeof(std::string_view)); + for (const auto& buffer : buffers_) { + out.write(buffer->asMutable(), buffer->size()); + } + const auto compressedBuffer = codec_->compress(out.getIOBuf().get()); + const auto compressedSize = compressedBuffer->length(); + stats_.compressionInputBytes += size; + stats_.compressedBytes += compressedSize; + if (compressedSize > opts_.minCompressionRatio * size) { + // Skip this compression. + numCompressionToSkip_ = std::min( + kMaxCompressionAttemptsToSkip, 1 + stats_.numCompressionSkipped); + flushUncompressed(size, stream); + } else { + // Do the compression. + writeInt32(stream, size); + writeInt32(stream, compressedSize); + writeBool(stream, true); + for (auto range : *compressedBuffer) { + stream->write( + reinterpret_cast(range.data()), range.size()); + } + } } + buffers_.clear(); } + std::unordered_map runtimeStats() override { + std::unordered_map map; + map.insert( + {{"compressedBytes", + RuntimeCounter(stats_.compressedBytes, RuntimeCounter::Unit::kBytes)}, + {"compressionInputBytes", + RuntimeCounter( + stats_.compressionInputBytes, RuntimeCounter::Unit::kBytes)}, + {"compressionSkippedBytes", + RuntimeCounter( + stats_.compressionSkippedBytes, RuntimeCounter::Unit::kBytes)}}); + return map; + } + void clear() override {} private: + int32_t uncompressedSize() const { + int32_t totalSize = 0; + for (const auto& buffer : buffers_) { + totalSize += buffer->size(); + } + return totalSize; + } + + bool needCompression() const { + return codec_->type() != folly::io::CodecType::NO_COMPRESSION; + } + + void flushUncompressed(int32_t size, OutputStream* stream) { + writeInt32(stream, size); + writeInt32(stream, size); + writeBool(stream, false); + for (const auto& buffer : buffers_) { + stream->write(buffer->as(), buffer->size()); + } + } + memory::MemoryPool* const pool_; + const std::unique_ptr codec_; + const VectorSerde::Options opts_; + std::vector buffers_; + // Count of forthcoming compressions to skip. + int32_t numCompressionToSkip_{0}; + CompressionStats stats_; }; // Read from the stream until the full row is concatenated. @@ -173,8 +304,9 @@ CompactRowVectorSerde::createIterativeSerializer( RowTypePtr /* type */, int32_t /* numRows */, StreamArena* streamArena, - const Options* /* options */) { - return std::make_unique(streamArena); + const Options* options) { + return std::make_unique( + streamArena, toValidOptions(options)); } void CompactRowVectorSerde::deserialize( @@ -182,27 +314,57 @@ void CompactRowVectorSerde::deserialize( velox::memory::MemoryPool* pool, RowTypePtr type, RowVectorPtr* result, - const Options* /* options */) { + const Options* options) { std::vector serializedRows; + std::vector concatenatedRows; + const auto opts = toValidOptions(options); std::vector> serializedBuffers; while (!source->atEnd()) { - // First read row size in big endian order. - const auto rowSize = folly::Endian::big(source->read()); - auto serializedBuffer = std::make_unique(); - serializedBuffer->reserve(rowSize); - - const auto row = source->nextView(rowSize); - serializedBuffer->append(row.data(), row.size()); - // If we couldn't read the entire row at once, we need to concatenate it - // in a different buffer. - if (serializedBuffer->size() < rowSize) { - concatenatePartialRow(source, rowSize, *serializedBuffer); + std::unique_ptr uncompressedBuf = nullptr; + const auto header = CompactRowHeader::read(source); + if (opts.compressionKind != common::CompressionKind::CompressionKind_NONE && + header.compressed) { + auto compressBuf = folly::IOBuf::create(header.compressedSize); + source->readBytes(compressBuf->writableData(), header.compressedSize); + compressBuf->append(header.compressedSize); + + // Process chained uncompressed results IOBufs. + const auto codec = common::compressionKindToCodec(opts.compressionKind); + uncompressedBuf = + codec->uncompress(compressBuf.get(), header.uncompressedSize); } + std::unique_ptr uncompressedStream; + ByteInputStream* uncompressedSource; + if (uncompressedBuf == nullptr) { + uncompressedSource = source; + } else { + uncompressedStream = std::make_unique( + byteRangesFromIOBuf(uncompressedBuf.get())); + uncompressedSource = uncompressedStream.get(); + } + const std::streampos initialSize = uncompressedSource->tellp(); + while (uncompressedSource->tellp() - initialSize < + header.uncompressedSize) { + // First read row size in big endian order. + const auto rowSize = + folly::Endian::big(uncompressedSource->read()); + + auto serializedBuffer = std::make_unique(); + serializedBuffer->reserve(rowSize); - VELOX_CHECK_EQ(serializedBuffer->size(), rowSize); - serializedBuffers.emplace_back(std::move(serializedBuffer)); - serializedRows.push_back(std::string_view( - serializedBuffers.back()->data(), serializedBuffers.back()->size())); + const auto row = uncompressedSource->nextView(rowSize); + serializedBuffer->append(row.data(), row.size()); + // If we couldn't read the entire row at once, we need to concatenate it + // in a different buffer. + if (serializedBuffer->size() < rowSize) { + concatenatePartialRow(uncompressedSource, rowSize, *serializedBuffer); + } + + VELOX_CHECK_EQ(serializedBuffer->size(), rowSize); + serializedBuffers.emplace_back(std::move(serializedBuffer)); + serializedRows.push_back(std::string_view( + serializedBuffers.back()->data(), serializedBuffers.back()->size())); + } } if (serializedRows.empty()) { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 7e09c038a4bdb..891b6f6a97944 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4230,21 +4230,6 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } private: - struct CompressionStats { - // Number of times compression was not attempted. - int32_t numCompressionSkipped{0}; - - // uncompressed size for which compression was attempted. - int64_t compressionInputBytes{0}; - - // Compressed bytes. - int64_t compressedBytes{0}; - - // Bytes for which compression was not attempted because of past - // non-performance. - int64_t compressionSkippedBytes{0}; - }; - const SerdeOpts opts_; StreamArena* const streamArena_; const std::unique_ptr codec_; diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index b6b2385e3d72d..f396dea5c1e09 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -54,9 +54,10 @@ class PrestoVectorSerde : public VectorSerde { PrestoOptions( bool _useLosslessTimestamp, common::CompressionKind _compressionKind, + float _minCompressionRatio = 0.8, bool _nullsFirst = false, bool _preserveEncodings = false) - : VectorSerde::Options(_compressionKind), + : VectorSerde::Options(_compressionKind, _minCompressionRatio), useLosslessTimestamp(_useLosslessTimestamp), nullsFirst(_nullsFirst), preserveEncodings(_preserveEncodings) {} @@ -74,11 +75,6 @@ class PrestoVectorSerde : public VectorSerde { /// structs. bool nullsFirst{false}; - /// Minimum achieved compression if compression is enabled. Compressing less - /// than this causes subsequent compression attempts to be skipped. The more - /// times compression misses the target the less frequently it is tried. - float minCompressionRatio{0.8}; - /// If true, the serializer will not employ any optimizations that can /// affect the encoding of the input vectors. This is only relevant when /// using BatchVectorSerializer. diff --git a/velox/serializers/UnsafeRowSerializer.cpp b/velox/serializers/UnsafeRowSerializer.cpp index b675f6adce091..4c46a16c39f3b 100644 --- a/velox/serializers/UnsafeRowSerializer.cpp +++ b/velox/serializers/UnsafeRowSerializer.cpp @@ -21,13 +21,62 @@ namespace facebook::velox::serializer::spark { namespace { using TRowSize = uint32_t; + +void writeInt32(OutputStream* out, int32_t value) { + out->write(reinterpret_cast(&value), sizeof(int32_t)); } -namespace { +void writeBool(OutputStream* out, bool value) { + char writeValue = value ? 1 : 0; + out->write(reinterpret_cast(&writeValue), sizeof(char)); +} + +VectorSerde::Options toValidOptions(const VectorSerde::Options* options) { + if (options == nullptr) { + return VectorSerde::Options(); + } + return *options; +} + +// The compressedSize is equal to uncompressedSize when not compressed. +struct UnsafeRowHeader { + int32_t uncompressedSize; + int32_t compressedSize; + bool compressed; + + static UnsafeRowHeader read(ByteInputStream* source) { + UnsafeRowHeader header; + header.uncompressedSize = source->read(); + header.compressedSize = source->read(); + header.compressed = source->read(); + + VELOX_CHECK_GE(header.uncompressedSize, 0); + VELOX_CHECK_GE(header.compressedSize, 0); + + return header; + } + + static size_t size() { + return sizeof(int32_t) * 2 + sizeof(char); + } + + std::string debugString() const { + return fmt::format( + "uncompressedSize: {}, compressedSize: {}, compressed: {}", + uncompressedSize, + compressedSize, + compressed); + } +}; + class UnsafeRowVectorSerializer : public IterativeVectorSerializer { public: - explicit UnsafeRowVectorSerializer(StreamArena* streamArena) - : pool_{streamArena->pool()} {} + UnsafeRowVectorSerializer( + StreamArena* streamArena, + const VectorSerde::Options& opts) + : pool_{streamArena->pool()}, + codec_(common::compressionKindToCodec(opts.compressionKind)), + opts_(opts) {} void append( const RowVectorPtr& vector, @@ -100,25 +149,106 @@ class UnsafeRowVectorSerializer : public IterativeVectorSerializer { } size_t maxSerializedSize() const override { - size_t totalSize = 0; - for (const auto& buffer : buffers_) { - totalSize += buffer->size(); + const auto size = uncompressedSize(); + if (!needCompression()) { + return size + UnsafeRowHeader::size(); } - return totalSize; + VELOX_CHECK_LE( + size, + codec_->maxUncompressedLength(), + "UncompressedSize exceeds limit"); + return UnsafeRowHeader::size() + codec_->maxCompressedLength(size); } + // The serialization format is | uncompressedSize | compressedSize | data | + // when compressed. void flush(OutputStream* stream) override { - for (const auto& buffer : buffers_) { - stream->write(buffer->as(), buffer->size()); + constexpr int32_t kMaxCompressionAttemptsToSkip = 30; + const auto size = uncompressedSize(); + if (!needCompression()) { + flushUncompressed(size, stream); + } else if (numCompressionToSkip_ > 0) { + flushUncompressed(size, stream); + stats_.compressionSkippedBytes += size; + --numCompressionToSkip_; + ++stats_.numCompressionSkipped; + } else { + // Compress the buffer if satisfied condition. + IOBufOutputStream out( + *pool_, nullptr, buffers_.size() * sizeof(std::string_view)); + for (const auto& buffer : buffers_) { + out.write(buffer->asMutable(), buffer->size()); + } + const auto compressedBuffer = codec_->compress(out.getIOBuf().get()); + const auto compressedSize = compressedBuffer->length(); + stats_.compressionInputBytes += size; + stats_.compressedBytes += compressedSize; + if (compressedSize > opts_.minCompressionRatio * size) { + // Skip this compression. + numCompressionToSkip_ = std::min( + kMaxCompressionAttemptsToSkip, 1 + stats_.numCompressionSkipped); + flushUncompressed(size, stream); + } else { + // Do the compression. + writeInt32(stream, size); + writeInt32(stream, compressedSize); + writeBool(stream, true); + for (auto range : *compressedBuffer) { + stream->write( + reinterpret_cast(range.data()), range.size()); + } + } } + buffers_.clear(); } + std::unordered_map runtimeStats() override { + std::unordered_map map; + map.insert( + {{"compressedBytes", + RuntimeCounter(stats_.compressedBytes, RuntimeCounter::Unit::kBytes)}, + {"compressionInputBytes", + RuntimeCounter( + stats_.compressionInputBytes, RuntimeCounter::Unit::kBytes)}, + {"compressionSkippedBytes", + RuntimeCounter( + stats_.compressionSkippedBytes, RuntimeCounter::Unit::kBytes)}}); + return map; + } + void clear() override {} private: + int32_t uncompressedSize() const { + int32_t totalSize = 0; + for (const auto& buffer : buffers_) { + totalSize += buffer->size(); + } + return totalSize; + } + + bool needCompression() const { + return codec_->type() != folly::io::CodecType::NO_COMPRESSION; + } + + void flushUncompressed(int32_t size, OutputStream* stream) { + writeInt32(stream, size); + writeInt32(stream, size); + writeBool(stream, false); + for (const auto& buffer : buffers_) { + stream->write(buffer->as(), buffer->size()); + } + } + memory::MemoryPool* const pool_; + const std::unique_ptr codec_; + const VectorSerde::Options opts_; + std::vector buffers_; + // Count of forthcoming compressions to skip. + int32_t numCompressionToSkip_{0}; + CompressionStats stats_; }; // Read from the stream until the full row is concatenated. @@ -152,8 +282,9 @@ UnsafeRowVectorSerde::createIterativeSerializer( RowTypePtr /* type */, int32_t /* numRows */, StreamArena* streamArena, - const Options* /* options */) { - return std::make_unique(streamArena); + const Options* options) { + return std::make_unique( + streamArena, toValidOptions(options)); } void UnsafeRowVectorSerde::deserialize( @@ -161,28 +292,55 @@ void UnsafeRowVectorSerde::deserialize( velox::memory::MemoryPool* pool, RowTypePtr type, RowVectorPtr* result, - const Options* /* options */) { + const Options* options) { std::vector> serializedRows; std::vector> serializedBuffers; - + const auto opts = toValidOptions(options); while (!source->atEnd()) { - // First read row size in big endian order. - const auto rowSize = folly::Endian::big(source->read()); - auto serializedBuffer = std::make_unique(); - serializedBuffer->reserve(rowSize); - - const auto row = source->nextView(rowSize); - serializedBuffer->append(row.data(), row.size()); - // If we couldn't read the entire row at once, we need to concatenate it - // in a different buffer. - if (serializedBuffer->size() < rowSize) { - concatenatePartialRow(source, rowSize, *serializedBuffer); + std::unique_ptr uncompressedBuf = nullptr; + const auto header = UnsafeRowHeader::read(source); + if (opts.compressionKind != common::CompressionKind::CompressionKind_NONE && + header.compressed) { + auto compressBuf = folly::IOBuf::create(header.compressedSize); + source->readBytes(compressBuf->writableData(), header.compressedSize); + compressBuf->append(header.compressedSize); + + // Process chained uncompressed results IOBufs. + const auto codec = common::compressionKindToCodec(opts.compressionKind); + uncompressedBuf = + codec->uncompress(compressBuf.get(), header.uncompressedSize); + } + std::unique_ptr uncompressedStream; + ByteInputStream* uncompressedSource; + if (uncompressedBuf == nullptr) { + uncompressedSource = source; + } else { + uncompressedStream = std::make_unique( + byteRangesFromIOBuf(uncompressedBuf.get())); + uncompressedSource = uncompressedStream.get(); } + const std::streampos initialSize = uncompressedSource->tellp(); + while (uncompressedSource->tellp() - initialSize < + header.uncompressedSize) { + // First read row size in big endian order. + const auto rowSize = + folly::Endian::big(uncompressedSource->read()); + auto serializedBuffer = std::make_unique(); + serializedBuffer->reserve(rowSize); - VELOX_CHECK_EQ(serializedBuffer->size(), rowSize); - serializedBuffers.emplace_back(std::move(serializedBuffer)); - serializedRows.push_back(std::string_view( - serializedBuffers.back()->data(), serializedBuffers.back()->size())); + const auto row = uncompressedSource->nextView(rowSize); + serializedBuffer->append(row.data(), row.size()); + // If we couldn't read the entire row at once, we need to concatenate it + // in a different buffer. + if (serializedBuffer->size() < rowSize) { + concatenatePartialRow(uncompressedSource, rowSize, *serializedBuffer); + } + + VELOX_CHECK_EQ(serializedBuffer->size(), rowSize); + serializedBuffers.emplace_back(std::move(serializedBuffer)); + serializedRows.push_back(std::string_view( + serializedBuffers.back()->data(), serializedBuffers.back()->size())); + } } if (serializedRows.empty()) { diff --git a/velox/serializers/UnsafeRowSerializer.h b/velox/serializers/UnsafeRowSerializer.h index e859d8f2cc0ff..80f732156ace8 100644 --- a/velox/serializers/UnsafeRowSerializer.h +++ b/velox/serializers/UnsafeRowSerializer.h @@ -23,6 +23,7 @@ class UnsafeRowVectorSerde : public VectorSerde { public: UnsafeRowVectorSerde() : VectorSerde(VectorSerde::Kind::kUnsafeRow) {} + // We do not implement this method since it is not used in production code. void estimateSerializedSize( const row::UnsafeRowFast* unsafeRow, const folly::Range& rows, diff --git a/velox/serializers/tests/CompactRowSerializerTest.cpp b/velox/serializers/tests/CompactRowSerializerTest.cpp index 1de38c781165f..14012cf66ece1 100644 --- a/velox/serializers/tests/CompactRowSerializerTest.cpp +++ b/velox/serializers/tests/CompactRowSerializerTest.cpp @@ -23,9 +23,29 @@ namespace facebook::velox::serializer { namespace { +struct TestParam { + common::CompressionKind compressionKind; + bool appendRow; + + TestParam(common::CompressionKind _compressionKind, bool _appendRow) + : compressionKind(_compressionKind), appendRow(_appendRow) {} +}; + class CompactRowSerializerTest : public ::testing::Test, public velox::test::VectorTestBase, - public testing::WithParamInterface { + public testing::WithParamInterface { + public: + static std::vector getTestParams() { + static std::vector testParams = { + {common::CompressionKind::CompressionKind_NONE, false}, + {common::CompressionKind::CompressionKind_ZLIB, true}, + {common::CompressionKind::CompressionKind_SNAPPY, false}, + {common::CompressionKind::CompressionKind_ZSTD, true}, + {common::CompressionKind::CompressionKind_LZ4, false}, + {common::CompressionKind::CompressionKind_GZIP, true}}; + return testParams; + } + protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -41,6 +61,9 @@ class CompactRowSerializerTest : public ::testing::Test, ASSERT_EQ( getNamedVectorSerde(VectorSerde::Kind::kCompactRow)->kind(), VectorSerde::Kind::kCompactRow); + appendRow_ = GetParam().appendRow; + compressionKind_ = GetParam().compressionKind; + options_ = std::make_unique(compressionKind_, 0.8); } void TearDown() override { @@ -56,7 +79,7 @@ class CompactRowSerializerTest : public ::testing::Test, vector_size_t offset = 0; vector_size_t rangeSize = 1; std::unique_ptr compactRow; - if (GetParam()) { + if (appendRow_) { compactRow = std::make_unique(rowVector); } while (offset < numRows) { @@ -69,10 +92,10 @@ class CompactRowSerializerTest : public ::testing::Test, auto arena = std::make_unique(pool_.get()); auto rowType = asRowType(rowVector->type()); auto serializer = getVectorSerde()->createIterativeSerializer( - rowType, numRows, arena.get()); + rowType, numRows, arena.get(), options_.get()); Scratch scratch; - if (GetParam()) { + if (appendRow_) { std::vector serializedRowSizes(numRows); std::vector serializedRowSizesPtr(numRows); for (auto i = 0; i < numRows; ++i) { @@ -97,7 +120,11 @@ class CompactRowSerializerTest : public ::testing::Test, auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out); - ASSERT_EQ(size, output->tellp()); + if (!needCompression()) { + ASSERT_EQ(size, output->tellp()); + } else { + ASSERT_GT(size, output->tellp()); + } } std::unique_ptr toByteStream( @@ -127,7 +154,7 @@ class CompactRowSerializerTest : public ::testing::Test, RowVectorPtr result; getVectorSerde()->deserialize( - byteStream.get(), pool_.get(), rowType, &result); + byteStream.get(), pool_.get(), rowType, &result, options_.get()); return result; } @@ -141,6 +168,16 @@ class CompactRowSerializerTest : public ::testing::Test, } std::shared_ptr pool_; + + private: + bool needCompression() { + return compressionKind_ != common::CompressionKind::CompressionKind_NONE; + } + + static constexpr int32_t kHeaderSize = sizeof(int32_t) * 2 + sizeof(char); + common::CompressionKind compressionKind_; + std::unique_ptr options_; + bool appendRow_; }; TEST_P(CompactRowSerializerTest, fuzz) { @@ -187,6 +224,6 @@ TEST_P(CompactRowSerializerTest, fuzz) { VELOX_INSTANTIATE_TEST_SUITE_P( CompactRowSerializerTest, CompactRowSerializerTest, - testing::Values(false, true)); + testing::ValuesIn(CompactRowSerializerTest::getTestParams())); } // namespace } // namespace facebook::velox::serializer diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 7dc4859abbf05..65bd73a555dd1 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -120,7 +120,7 @@ class PrestoSerializerTest const bool preserveEncodings = serdeOptions == nullptr ? false : serdeOptions->preserveEncodings; serializer::presto::PrestoVectorSerde::PrestoOptions paramOptions{ - useLosslessTimestamp, kind, nullsFirst, preserveEncodings}; + useLosslessTimestamp, kind, 0.8, nullsFirst, preserveEncodings}; return paramOptions; } diff --git a/velox/serializers/tests/UnsafeRowSerializerTest.cpp b/velox/serializers/tests/UnsafeRowSerializerTest.cpp index 2bcd3f9dae601..08041f58c9051 100644 --- a/velox/serializers/tests/UnsafeRowSerializerTest.cpp +++ b/velox/serializers/tests/UnsafeRowSerializerTest.cpp @@ -23,9 +23,29 @@ using namespace facebook; using namespace facebook::velox; +struct TestParam { + common::CompressionKind compressionKind; + bool appendRow; + + TestParam(common::CompressionKind _compressionKind, bool _appendRow) + : compressionKind(_compressionKind), appendRow(_appendRow) {} +}; + class UnsafeRowSerializerTest : public ::testing::Test, public velox::test::VectorTestBase, - public testing::WithParamInterface { + public testing::WithParamInterface { + public: + static std::vector getTestParams() { + static std::vector testParams = { + {common::CompressionKind::CompressionKind_NONE, false}, + {common::CompressionKind::CompressionKind_ZLIB, true}, + {common::CompressionKind::CompressionKind_SNAPPY, false}, + {common::CompressionKind::CompressionKind_ZSTD, true}, + {common::CompressionKind::CompressionKind_LZ4, false}, + {common::CompressionKind::CompressionKind_GZIP, true}}; + return testParams; + } + protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -41,6 +61,9 @@ class UnsafeRowSerializerTest : public ::testing::Test, ASSERT_EQ( getNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)->kind(), VectorSerde::Kind::kUnsafeRow); + appendRow_ = GetParam().appendRow; + compressionKind_ = GetParam().compressionKind; + options_ = std::make_unique(compressionKind_, 0.8); } void TearDown() override { @@ -49,6 +72,7 @@ class UnsafeRowSerializerTest : public ::testing::Test, } void serialize(RowVectorPtr rowVector, std::ostream* output) { + const auto streamInitialSize = output->tellp(); const auto numRows = rowVector->size(); std::vector ranges(numRows); @@ -64,7 +88,7 @@ class UnsafeRowSerializerTest : public ::testing::Test, for (auto i = 0; i < numRows; ++i) { serializedRowSizesPtr[i] = &serializedRowSizes[i]; } - if (GetParam()) { + if (appendRow_) { unsafeRow = std::make_unique(rowVector); getVectorSerde()->estimateSerializedSize( unsafeRow.get(), rows, serializedRowSizesPtr.data()); @@ -73,9 +97,9 @@ class UnsafeRowSerializerTest : public ::testing::Test, auto arena = std::make_unique(pool_.get()); auto rowType = std::dynamic_pointer_cast(rowVector->type()); auto serializer = getVectorSerde()->createIterativeSerializer( - rowType, numRows, arena.get()); + rowType, numRows, arena.get(), options_.get()); - if (GetParam()) { + if (appendRow_) { serializer->append(*unsafeRow, rows, serializedRowSizes); } else { Scratch scratch; @@ -86,7 +110,11 @@ class UnsafeRowSerializerTest : public ::testing::Test, auto size = serializer->maxSerializedSize(); OStreamOutputStream out(output); serializer->flush(&out); - ASSERT_EQ(size, output->tellp()); + if (!needCompression()) { + ASSERT_EQ(size, output->tellp() - streamInitialSize); + } else { + ASSERT_GT(size, output->tellp() - streamInitialSize); + } } std::unique_ptr toByteStream( @@ -110,7 +138,7 @@ class UnsafeRowSerializerTest : public ::testing::Test, RowVectorPtr result; getVectorSerde()->deserialize( - byteStream.get(), pool_.get(), rowType, &result); + byteStream.get(), pool_.get(), rowType, &result, options_.get()); return result; } @@ -127,13 +155,36 @@ class UnsafeRowSerializerTest : public ::testing::Test, testSerialize(RowVectorPtr rowVector, int8_t* expectedData, size_t dataSize) { std::ostringstream out; serialize(rowVector, &out); - EXPECT_EQ(std::memcmp(expectedData, out.str().data(), dataSize), 0); + if (!needCompression()) { + // Check the data after header. + EXPECT_EQ( + std::memcmp(expectedData, out.str().data() + kHeaderSize, dataSize), + 0); + } } void testDeserialize( const std::vector& input, RowVectorPtr expectedVector) { - auto results = deserialize(asRowType(expectedVector->type()), input); + if (needCompression()) { + return; + } + // Construct the header to make deserialization work. + std::vector uncompressedInput = input; + char header[kHeaderSize] = {0}; + int32_t uncompressedSize = 0; + for (const auto& in : input) { + uncompressedSize += in.size(); + } + auto* headerPtr = reinterpret_cast(&header); + headerPtr[0] = uncompressedSize; + headerPtr[1] = uncompressedSize; + header[kHeaderSize - 1] = 0; + + uncompressedInput.insert( + uncompressedInput.begin(), std::string_view(header, kHeaderSize)); + auto results = + deserialize(asRowType(expectedVector->type()), uncompressedInput); test::assertEqualVectors(expectedVector, results); } @@ -144,7 +195,17 @@ class UnsafeRowSerializerTest : public ::testing::Test, expectedVector); } + bool needCompression() { + return compressionKind_ != common::CompressionKind::CompressionKind_NONE; + } + std::shared_ptr pool_; + + private: + static constexpr int32_t kHeaderSize = sizeof(int32_t) * 2 + sizeof(char); + common::CompressionKind compressionKind_; + std::unique_ptr options_; + bool appendRow_; }; // These expected binary buffers were samples taken using Spark's java code. @@ -290,6 +351,11 @@ TEST_P(UnsafeRowSerializerTest, splitRow) { } TEST_P(UnsafeRowSerializerTest, incompleteRow) { + // The test data is for non-compression, and we don't know the compressed size + // to construct header. If the row is incomplete, readBytes will fail. + if (needCompression()) { + return; + } int8_t data[20] = {0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 62, 28, -36, -33, 2, 0, 0, 0}; auto expected = @@ -320,7 +386,7 @@ TEST_P(UnsafeRowSerializerTest, incompleteRow) { buffers = {{rawData, 2}}; VELOX_ASSERT_RUNTIME_THROW( testDeserialize(buffers, expected), - "(1 vs. 1) Reading past end of BufferInputStream"); + "(2 vs. 2) Reading past end of BufferInputStream"); } TEST_P(UnsafeRowSerializerTest, types) { @@ -433,7 +499,20 @@ TEST_P(UnsafeRowSerializerTest, decimalVector) { testRoundTrip(rowVectorArray); } +TEST_P(UnsafeRowSerializerTest, multiPage) { + auto input = + makeRowVector({makeFlatVector(std::vector{12345678910, 123})}); + std::ostringstream out; + serialize(input, &out); + serialize(input, &out); + auto expected = makeRowVector({makeFlatVector( + std::vector{12345678910, 123, 12345678910, 123})}); + auto rowType = std::dynamic_pointer_cast(input->type()); + auto deserialized = deserialize(rowType, {out.str()}); + test::assertEqualVectors(deserialized, expected); +} + VELOX_INSTANTIATE_TEST_SUITE_P( UnsafeRowSerializerTest, UnsafeRowSerializerTest, - testing::Values(false, true)); + testing::ValuesIn(UnsafeRowSerializerTest::getTestParams())); diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index bb9b1a563beaa..6094ef7830b84 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -38,6 +38,21 @@ class CompactRow; class UnsafeRowFast; }; // namespace row +struct CompressionStats { + // Number of times compression was not attempted. + int32_t numCompressionSkipped{0}; + + // Uncompressed size for which compression was attempted. + int64_t compressionInputBytes{0}; + + // Compressed bytes. + int64_t compressedBytes{0}; + + // Bytes for which compression was not attempted because of past + // non-performance. + int64_t compressionSkippedBytes{0}; +}; + /// Serializer that can iteratively build up a buffer of serialized rows from /// one or more RowVectors. /// @@ -171,13 +186,20 @@ class VectorSerde { struct Options { Options() = default; - explicit Options(common::CompressionKind _compressionKind) - : compressionKind(_compressionKind) {} + Options( + common::CompressionKind _compressionKind, + float _minCompressionRatio) + : compressionKind(_compressionKind), + minCompressionRatio(_minCompressionRatio) {} virtual ~Options() = default; common::CompressionKind compressionKind{ common::CompressionKind::CompressionKind_NONE}; + /// Minimum achieved compression if compression is enabled. Compressing less + /// than this causes subsequent compression attempts to be skipped. The more + /// times compression misses the target the less frequently it is tried. + float minCompressionRatio{0.8}; }; Kind kind() const {