diff --git a/velox/serializers/PrestoBatchVectorSerializer.cpp b/velox/serializers/PrestoBatchVectorSerializer.cpp index 7fec07ad6cfb..fef8750cc971 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.cpp +++ b/velox/serializers/PrestoBatchVectorSerializer.cpp @@ -30,7 +30,6 @@ void PrestoBatchVectorSerializer::serialize( const auto rowType = vector->type(); const auto numChildren = vector->childrenSize(); - StreamArena arena(pool_); std::vector streams; streams.reserve(numChildren); for (int i = 0; i < numChildren; i++) { @@ -38,7 +37,7 @@ void PrestoBatchVectorSerializer::serialize( rowType->childAt(i), std::nullopt, vector->childAt(i), - &arena, + &arena_, numRows, opts_); @@ -48,7 +47,9 @@ void PrestoBatchVectorSerializer::serialize( } flushStreams( - streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream); + streams, numRows, arena_, *codec_, opts_.minCompressionRatio, stream); + + arena_.clear(); } void PrestoBatchVectorSerializer::estimateSerializedSizeImpl( @@ -178,4 +179,51 @@ void PrestoBatchVectorSerializer::estimateSerializedSizeImpl( VELOX_UNSUPPORTED("Unsupported vector encoding {}", vector->encoding()); } } + +void PrestoBatchVectorSerializer::writeHeader( + const TypePtr& type, + BufferedOutputStream* stream) { + auto encoding = typeToEncodingName(type); + writeInt32(stream, encoding.size()); + stream->write(encoding.data(), encoding.size()); +} + +template <> +bool PrestoBatchVectorSerializer::hasNulls( + const VectorPtr& vector, + const folly::Range& ranges) { + if (vector->nulls()) { + for (auto& range : ranges) { + if (!bits::isAllSet( + vector->rawNulls(), range.begin, range.begin + range.size)) { + return true; + } + } + } + + return false; +} + +template <> +bool PrestoBatchVectorSerializer::hasNulls( + const VectorPtr& vector, + const folly::Range& ranges) { + if (vector->nulls()) { + for (auto& range : ranges) { + if (range.isNull || + !bits::isAllSet( + vector->rawNulls(), range.begin, range.begin + range.size)) { + return true; + } + } + } else { + for (auto& range : ranges) { + if (range.isNull) { + return true; + } + } + } + + return false; +} } // namespace facebook::velox::serializer::presto::detail diff --git a/velox/serializers/PrestoBatchVectorSerializer.h b/velox/serializers/PrestoBatchVectorSerializer.h index 83ca7c845705..c1d10940e581 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.h +++ b/velox/serializers/PrestoBatchVectorSerializer.h @@ -16,6 +16,7 @@ #pragma once #include "velox/serializers/PrestoSerializer.h" +#include "velox/serializers/PrestoSerializerSerializationUtils.h" #include "velox/vector/VectorStream.h" namespace facebook::velox::serializer::presto::detail { @@ -24,9 +25,10 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { PrestoBatchVectorSerializer( memory::MemoryPool* pool, const PrestoVectorSerde::PrestoOptions& opts) - : pool_(pool), + : arena_(pool), codec_(common::compressionKindToCodec(opts.compressionKind)), - opts_(opts) {} + opts_(opts), + nulls_(&arena_, true, true, true) {} void serialize( const RowVectorPtr& vector, @@ -43,14 +45,471 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { } private: + static inline constexpr char kZero = 0; + static inline constexpr char kOne = 1; + void estimateSerializedSizeImpl( const VectorPtr& vector, const folly::Range& ranges, vector_size_t** sizes, Scratch& scratch); - memory::MemoryPool* pool_; + void writeHeader(const TypePtr& type, BufferedOutputStream* stream); + + /// Are there any nulls in the Vector or introduced artificially in the + /// ranges. Does not look recursively at values Vectors or children. + template + bool hasNulls( + const VectorPtr& vector, + const folly::Range& ranges); + + /// Write out the null flags to the streams. + template + void writeNulls( + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t numRows, + BufferedOutputStream* stream); + + /// Write out all the null information needed by the PrestoPage, both the + /// hasNulls and isNull flags. + template + inline void writeNullsSegment( + bool hasNulls, + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t numRows, + BufferedOutputStream* stream) { + if (hasNulls) { + // Has-nulls flag. + stream->write(&kOne, 1); + + // Nulls flags. + writeNulls(vector, ranges, numRows, stream); + } else { + // Has-nulls flag. + stream->write(&kZero, 1); + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t< + kind != TypeKind::TIMESTAMP && kind != TypeKind::BOOLEAN && + kind != TypeKind::OPAQUE && kind != TypeKind::UNKNOWN && + !std:: + is_same_v::NativeType, StringView>, + bool> = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + using T = typename TypeTraits::NativeType; + const auto* flatVector = vector->as>(); + const auto* rawValues = flatVector->rawValues(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + if (this->hasNulls(vector, ranges)) { + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls flags. + writeNulls(vector, ranges, numRows, stream); + + // Write out the values. + // This logic merges consecutive ranges of non-null values so we can make + // long consecutive writes to the stream. A range ends when we detect a + // discontinuity between ranges, a null, or the end of the ranges. When + // this happens we write out the range. + + // Tracks the beginning of the current range. + int firstNonNull = -1; + // Tracks the end of the current range. + int lastNonNull = -1; + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (static_cast(range).isNull) { + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + if (firstNonNull == -1) { + // We're at the beginning of a new range. + firstNonNull = i; + lastNonNull = i; + } else if (i == lastNonNull + 1) { + // We're continuing the current range. + lastNonNull = i; + } else { + // We've reached a discontinuity (either because the previous + // value was null or because the ranges are discontinuous). + // Write out the current range and start a new one. + const size_t rangeSize = (1 + lastNonNull - firstNonNull); + stream->write( + reinterpret_cast(&rawValues[firstNonNull]), + rangeSize * sizeof(T)); + firstNonNull = i; + lastNonNull = i; + } + } + } + } + // There's no more data, if we had a range waiting to be written out, do + // so. + if (firstNonNull != -1) { + const size_t rangeSize = (1 + lastNonNull - firstNonNull); + stream->write( + reinterpret_cast(&rawValues[firstNonNull]), + rangeSize * sizeof(T)); + } + } else { + // Write out the has-nulls flag. + stream->write(&kZero, 1); + + // Write out the values. Since there are no nulls, we optimistically + // assume the ranges are long enough that the overhead of merging + // consecutive ranges is not worth it. + for (auto& range : ranges) { + stream->write( + reinterpret_cast(&rawValues[range.begin]), + range.size * sizeof(T)); + } + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* flatVector = vector->as>(); + const auto* rawValues = flatVector->rawValues(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + if (this->hasNulls(vector, ranges)) { + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls flags. + writeNulls(vector, ranges, numRows, stream); + + // Write out the values. + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (static_cast(range).isNull) { + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + if (opts_.useLosslessTimestamp) { + writeInt64(stream, rawValues[i].getSeconds()); + writeInt64(stream, rawValues[i].getNanos()); + } else { + writeInt64(stream, rawValues[i].toMillis()); + } + } + } + } + } else { + // Write out the has-nulls flag. + stream->write(&kZero, 1); + + // Write out the values. + for (auto& range : ranges) { + if (opts_.useLosslessTimestamp) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + writeInt64(stream, rawValues[i].getSeconds()); + writeInt64(stream, rawValues[i].getNanos()); + } + } else { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + writeInt64(stream, rawValues[i].toMillis()); + } + } + } + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t< + std::is_same_v::NativeType, StringView>, + bool> = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* flatVector = vector->as>(); + const auto* rawValues = flatVector->rawValues(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + if (this->hasNulls(vector, ranges)) { + // The total number of bytes we'll write out for the strings. + int32_t numBytes = 0; + + // Write out the offsets. + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (range.isNull) { + // If it's a range of nulls, we just write the last offset out n + // times. + for (int i = 0; i < range.size; i++) { + writeInt32(stream, numBytes); + } + + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + numBytes += rawValues[i].size(); + } + writeInt32(stream, numBytes); + } + } + + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls flags. + writeNulls(vector, ranges, numRows, stream); + + // Write out the total number of bytes. + writeInt32(stream, numBytes); + + // Write out the values. + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (static_cast(range).isNull) { + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + stream->write(rawValues[i].data(), rawValues[i].size()); + } + } + } + } else { + // Write out the offsets. + int32_t numBytes = 0; + for (const auto& range : ranges) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + numBytes += rawValues[i].size(); + writeInt32(stream, numBytes); + } + } + + // Write out the has-nulls flag. + stream->write(&kZero, 1); + + // Write out the total number of bytes. + writeInt32(stream, numBytes); + + // Write out the values. + for (auto& range : ranges) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + stream->write(rawValues[i].data(), rawValues[i].size()); + } + } + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* flatVector = vector->as>(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + if (this->hasNulls(vector, ranges)) { + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls flags. + writeNulls(vector, ranges, numRows, stream); + + // Write out the values. + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (static_cast(range).isNull) { + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!vector->isNullAt(i)) { + stream->write(flatVector->valueAtFast(i) ? &kOne : &kZero, 1); + } + } + } + } else { + // Write out the has-nulls flag. + stream->write(&kZero, 1); + + // Write out the values. + for (const auto& range : ranges) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + stream->write(flatVector->valueAtFast(i) ? &kOne : &kZero, 1); + } + } + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* flatVector = vector->as>>(); + const auto* rawValues = flatVector->rawValues(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + int32_t numBytes = 0; + + // To avoid serializng the values twice, we hold the serialized data here + // until we reach the point in the stream where we can write it out. + ScratchPtr valuesHolder(scratch_); + std::string* mutableValues = valuesHolder.get(numRows); + size_t valuesIndex = 0; + + auto serializer = vector->type()->asOpaque().getSerializeFunc(); + + const bool hasNulls = flatVector->rawValues(); + + // Write out the offsets and serialize the values. + if (hasNulls) { + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (range.isNull) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + writeInt32(stream, numBytes); + } + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (!flatVector->isNullAt(i)) { + mutableValues[valuesIndex] = serializer(rawValues[i]); + numBytes += mutableValues[valuesIndex].size(); + valuesIndex++; + } + + writeInt32(stream, numBytes); + } + } + } else { + for (const auto& range : ranges) { + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + mutableValues[valuesIndex] = serializer(rawValues[i]); + numBytes += mutableValues[valuesIndex].size(); + valuesIndex++; + + writeInt32(stream, numBytes); + } + } + } + + // Write out the nulls flag and nulls. + writeNullsSegment(hasNulls, vector, ranges, numRows, stream); + + // Write out the total number of bytes. + writeInt32(stream, numBytes); + + // Write out the serialized values. + for (size_t i = 0; i < valuesIndex; ++i) { + stream->write(mutableValues[i].data(), mutableValues[i].size()); + } + } + + template < + TypeKind kind, + typename RangeType, + typename std::enable_if_t = true> + void serializeFlatVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + VELOX_CHECK_NOT_NULL(vector->rawNulls()); + + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + + // Write out the has-nulls flag. + stream->write(&kOne, 1); + + // Write out the nulls. + nulls_.startWrite(bits::nbytes(numRows)); + nulls_.appendBool(bits::kNull, numRows); + nulls_.flush(stream); + } + + StreamArena arena_; const std::unique_ptr codec_; - PrestoVectorSerde::PrestoOptions opts_; + const PrestoVectorSerde::PrestoOptions opts_; + + // A scratch space for writing null bits, this is a frequent operation that + // the OutputStream interface is not well suited for. + // + // Since this is shared/reused, it is important that the usage of nulls_ + // once started when serializing a Vector is finished before serializing any + // children. This can be guaranteed by using the writeNullsSegment or + // writeNulls functions. + ByteOutputStream nulls_; + Scratch scratch_; }; } // namespace facebook::velox::serializer::presto::detail diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index f9d9b6790c50..a3e1b4882f68 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -33,6 +33,18 @@ struct IndexRange { vector_size_t size; }; +// A flavor of IndexRange that allows us to add artificial nulls. This is useful +// when for example, when flatteneing a DictionaryVector, the DictionaryVector +// may introduce nulls that do not exist in the values Vector, and so need to +// get introduced artificially. +struct IndexRangeWithNulls { + vector_size_t begin; + vector_size_t size; + + // Whether we should "pretend" the values in this range are null. + bool isNull; +}; + namespace row { class CompactRow; class UnsafeRowFast;