From 29bb319cdbcde8b670e3bedcef7daaf24e008cca Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Mon, 13 Jan 2025 10:00:16 -0800 Subject: [PATCH] feat: Optimize PrestoBatchVectorSerializer [1/7]: Serialize FlatVectors (#12060) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12060 Context: This is a series of diffs in which I reimplement PrestoBatchVectorSerializer to write directly to the output stream, rather than the indirect route it currently uses via VectorStreams. Reusing VectorStreams and much of the code for PrestoIterativeVectorSerializer prevented us from capturing all of the performance benefits of writing data in batches rather than row by row. These changes combined will speed up PrestoBatchVectorSerializer 2-3x (as measured in Presto queries and other use cases). In the final diff I will integrate the new serialization functions into PrestoBatchVectorSerializer's serialize function which will switch it to the new optimized writing path, therefore I will land these changes as a stack. In this diff: I provide the implementations for serializing FlatVectors. Differential Revision: D68037258 --- .../PrestoBatchVectorSerializer.cpp | 54 +- .../serializers/PrestoBatchVectorSerializer.h | 467 +++++++++++++++++- velox/vector/VectorStream.h | 12 + 3 files changed, 526 insertions(+), 7 deletions(-) diff --git a/velox/serializers/PrestoBatchVectorSerializer.cpp b/velox/serializers/PrestoBatchVectorSerializer.cpp index 7fec07ad6cfb..fef8750cc971 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.cpp +++ b/velox/serializers/PrestoBatchVectorSerializer.cpp @@ -30,7 +30,6 @@ void PrestoBatchVectorSerializer::serialize( const auto rowType = vector->type(); const auto numChildren = vector->childrenSize(); - StreamArena arena(pool_); std::vector streams; streams.reserve(numChildren); for (int i = 0; i < numChildren; i++) { @@ -38,7 +37,7 @@ void PrestoBatchVectorSerializer::serialize( rowType->childAt(i), std::nullopt, vector->childAt(i), - &arena, + &arena_, numRows, opts_); @@ -48,7 +47,9 @@ void PrestoBatchVectorSerializer::serialize( } flushStreams( - streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream); + streams, numRows, arena_, *codec_, opts_.minCompressionRatio, stream); + + arena_.clear(); } void PrestoBatchVectorSerializer::estimateSerializedSizeImpl( @@ -178,4 +179,51 @@ void PrestoBatchVectorSerializer::estimateSerializedSizeImpl( VELOX_UNSUPPORTED("Unsupported vector encoding {}", vector->encoding()); } } + +void PrestoBatchVectorSerializer::writeHeader( + const TypePtr& type, + BufferedOutputStream* stream) { + auto encoding = typeToEncodingName(type); + writeInt32(stream, encoding.size()); + stream->write(encoding.data(), encoding.size()); +} + +template <> +bool PrestoBatchVectorSerializer::hasNulls( + const VectorPtr& vector, + const folly::Range& ranges) { + if (vector->nulls()) { + for (auto& range : ranges) { + if (!bits::isAllSet( + vector->rawNulls(), range.begin, range.begin + range.size)) { + return true; + } + } + } + + return false; +} + +template <> +bool PrestoBatchVectorSerializer::hasNulls( + const VectorPtr& vector, + const folly::Range& ranges) { + if (vector->nulls()) { + for (auto& range : ranges) { + if (range.isNull || + !bits::isAllSet( + vector->rawNulls(), range.begin, range.begin + range.size)) { + return true; + } + } + } else { + for (auto& range : ranges) { + if (range.isNull) { + return true; + } + } + } + + return false; +} } // namespace facebook::velox::serializer::presto::detail diff --git a/velox/serializers/PrestoBatchVectorSerializer.h b/velox/serializers/PrestoBatchVectorSerializer.h index 83ca7c845705..c1d10940e581 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.h +++ b/velox/serializers/PrestoBatchVectorSerializer.h @@ -16,6 +16,7 @@ #pragma once #include "velox/serializers/PrestoSerializer.h" +#include "velox/serializers/PrestoSerializerSerializationUtils.h" #include "velox/vector/VectorStream.h" namespace facebook::velox::serializer::presto::detail { @@ -24,9 +25,10 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { PrestoBatchVectorSerializer( memory::MemoryPool* pool, const PrestoVectorSerde::PrestoOptions& opts) - : pool_(pool), + : arena_(pool), codec_(common::compressionKindToCodec(opts.compressionKind)), - opts_(opts) {} + opts_(opts), + nulls_(&arena_, true, true, true) {} void serialize( const RowVectorPtr& vector, @@ -43,14 +45,471 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { } private: + static inline constexpr char kZero = 0; + static inline constexpr char kOne = 1; + void estimateSerializedSizeImpl( const VectorPtr& vector, const folly::Range& ranges, vector_size_t** sizes, Scratch& scratch); - memory::MemoryPool* pool_; + void writeHeader(const TypePtr& type, BufferedOutputStream* stream); + + /// Are there any nulls in the Vector or introduced artificially in the + /// ranges. Does not look recursively at values Vectors or children. + template + bool hasNulls( + const VectorPtr& vector, + const folly::Range& ranges); + + /// Write out the null flags to the streams. + template + void writeNulls( + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t numRows, + BufferedOutputStream* stream); + + /// Write out all the null information needed by the PrestoPage, both the + /// hasNulls and isNull flags. + template + inline void writeNullsSegment( + bool hasNulls, + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t numRows, + BufferedOutputStream* stream) { + if (hasNulls) { + // Has-nulls flag. + stream->write(&kOne, 1); + + // Nulls flags. + writeNulls(vector, ranges, numRows, stream); + } else { + // Has-nulls flag. + stream->write(&kZero, 1); + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t< + kind != TypeKind::TIMESTAMP && kind != TypeKind::BOOLEAN && + kind != TypeKind::OPAQUE && kind != TypeKind::UNKNOWN && + !std:: + is_same_v::NativeType, StringView>, + bool> = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + using T = typename TypeTraits::NativeType; + const auto* flatVector = vector->as>(); + const auto* rawValues = flatVector->rawValues(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + if (this->hasNulls(vector, ranges)) { + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls flags. + writeNulls(vector, ranges, numRows, stream); + + // Write out the values. + // This logic merges consecutive ranges of non-null values so we can make + // long consecutive writes to the stream. A range ends when we detect a + // discontinuity between ranges, a null, or the end of the ranges. When + // this happens we write out the range. + + // Tracks the beginning of the current range. + int firstNonNull = -1; + // Tracks the end of the current range. + int lastNonNull = -1; + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (static_cast(range).isNull) { + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + if (firstNonNull == -1) { + // We're at the beginning of a new range. + firstNonNull = i; + lastNonNull = i; + } else if (i == lastNonNull + 1) { + // We're continuing the current range. + lastNonNull = i; + } else { + // We've reached a discontinuity (either because the previous + // value was null or because the ranges are discontinuous). + // Write out the current range and start a new one. + const size_t rangeSize = (1 + lastNonNull - firstNonNull); + stream->write( + reinterpret_cast(&rawValues[firstNonNull]), + rangeSize * sizeof(T)); + firstNonNull = i; + lastNonNull = i; + } + } + } + } + // There's no more data, if we had a range waiting to be written out, do + // so. + if (firstNonNull != -1) { + const size_t rangeSize = (1 + lastNonNull - firstNonNull); + stream->write( + reinterpret_cast(&rawValues[firstNonNull]), + rangeSize * sizeof(T)); + } + } else { + // Write out the has-nulls flag. + stream->write(&kZero, 1); + + // Write out the values. Since there are no nulls, we optimistically + // assume the ranges are long enough that the overhead of merging + // consecutive ranges is not worth it. + for (auto& range : ranges) { + stream->write( + reinterpret_cast(&rawValues[range.begin]), + range.size * sizeof(T)); + } + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* flatVector = vector->as>(); + const auto* rawValues = flatVector->rawValues(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + if (this->hasNulls(vector, ranges)) { + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls flags. + writeNulls(vector, ranges, numRows, stream); + + // Write out the values. + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (static_cast(range).isNull) { + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + if (opts_.useLosslessTimestamp) { + writeInt64(stream, rawValues[i].getSeconds()); + writeInt64(stream, rawValues[i].getNanos()); + } else { + writeInt64(stream, rawValues[i].toMillis()); + } + } + } + } + } else { + // Write out the has-nulls flag. + stream->write(&kZero, 1); + + // Write out the values. + for (auto& range : ranges) { + if (opts_.useLosslessTimestamp) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + writeInt64(stream, rawValues[i].getSeconds()); + writeInt64(stream, rawValues[i].getNanos()); + } + } else { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + writeInt64(stream, rawValues[i].toMillis()); + } + } + } + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t< + std::is_same_v::NativeType, StringView>, + bool> = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* flatVector = vector->as>(); + const auto* rawValues = flatVector->rawValues(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + if (this->hasNulls(vector, ranges)) { + // The total number of bytes we'll write out for the strings. + int32_t numBytes = 0; + + // Write out the offsets. + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (range.isNull) { + // If it's a range of nulls, we just write the last offset out n + // times. + for (int i = 0; i < range.size; i++) { + writeInt32(stream, numBytes); + } + + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + numBytes += rawValues[i].size(); + } + writeInt32(stream, numBytes); + } + } + + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls flags. + writeNulls(vector, ranges, numRows, stream); + + // Write out the total number of bytes. + writeInt32(stream, numBytes); + + // Write out the values. + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (static_cast(range).isNull) { + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + stream->write(rawValues[i].data(), rawValues[i].size()); + } + } + } + } else { + // Write out the offsets. + int32_t numBytes = 0; + for (const auto& range : ranges) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + numBytes += rawValues[i].size(); + writeInt32(stream, numBytes); + } + } + + // Write out the has-nulls flag. + stream->write(&kZero, 1); + + // Write out the total number of bytes. + writeInt32(stream, numBytes); + + // Write out the values. + for (auto& range : ranges) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + stream->write(rawValues[i].data(), rawValues[i].size()); + } + } + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* flatVector = vector->as>(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + if (this->hasNulls(vector, ranges)) { + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls flags. + writeNulls(vector, ranges, numRows, stream); + + // Write out the values. + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (static_cast(range).isNull) { + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!vector->isNullAt(i)) { + stream->write(flatVector->valueAtFast(i) ? &kOne : &kZero, 1); + } + } + } + } else { + // Write out the has-nulls flag. + stream->write(&kZero, 1); + + // Write out the values. + for (const auto& range : ranges) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + stream->write(flatVector->valueAtFast(i) ? &kOne : &kZero, 1); + } + } + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* flatVector = vector->as>>(); + const auto* rawValues = flatVector->rawValues(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + int32_t numBytes = 0; + + // To avoid serializng the values twice, we hold the serialized data here + // until we reach the point in the stream where we can write it out. + ScratchPtr valuesHolder(scratch_); + std::string* mutableValues = valuesHolder.get(numRows); + size_t valuesIndex = 0; + + auto serializer = vector->type()->asOpaque().getSerializeFunc(); + + const bool hasNulls = flatVector->rawValues(); + + // Write out the offsets and serialize the values. + if (hasNulls) { + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (range.isNull) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + writeInt32(stream, numBytes); + } + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + mutableValues[valuesIndex] = serializer(rawValues[i]); + numBytes += mutableValues[valuesIndex].size(); + valuesIndex++; + } + + writeInt32(stream, numBytes); + } + } + } else { + for (const auto& range : ranges) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + mutableValues[valuesIndex] = serializer(rawValues[i]); + numBytes += mutableValues[valuesIndex].size(); + valuesIndex++; + + writeInt32(stream, numBytes); + } + } + } + + // Write out the nulls flag and nulls. + writeNullsSegment(hasNulls, vector, ranges, numRows, stream); + + // Write out the total number of bytes. + writeInt32(stream, numBytes); + + // Write out the serialized values. + for (size_t i = 0; i < valuesIndex; ++i) { + stream->write(mutableValues[i].data(), mutableValues[i].size()); + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + VELOX_CHECK_NOT_NULL(vector->rawNulls()); + + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls. + nulls_.startWrite(bits::nbytes(numRows)); + nulls_.appendBool(bits::kNull, numRows); + nulls_.flush(stream); + } + + StreamArena arena_; const std::unique_ptr codec_; - PrestoVectorSerde::PrestoOptions opts_; + const PrestoVectorSerde::PrestoOptions opts_; + + // A scratch space for writing null bits, this is a frequent operation that + // the OutputStream interface is not well suited for. + // + // Since this is shared/reused, it is important that the usage of nulls_ + // once started when serializing a Vector is finished before serializing any + // children. This can be guaranteed by using the writeNullsSegment or + // writeNulls functions. + ByteOutputStream nulls_; + Scratch scratch_; }; } // namespace facebook::velox::serializer::presto::detail diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index f9d9b6790c50..a3e1b4882f68 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -33,6 +33,18 @@ struct IndexRange { vector_size_t size; }; +// A flavor of IndexRange that allows us to add artificial nulls. This is useful +// when for example, when flatteneing a DictionaryVector, the DictionaryVector +// may introduce nulls that do not exist in the values Vector, and so need to +// get introduced artificially. +struct IndexRangeWithNulls { + vector_size_t begin; + vector_size_t size; + + // Whether we should "pretend" the values in this range are null. + bool isNull; +}; + namespace row { class CompactRow; class UnsafeRowFast;