diff --git a/velox/serializers/PrestoBatchVectorSerializer.cpp b/velox/serializers/PrestoBatchVectorSerializer.cpp index 1b693cb29c06..c96b3aba4cda 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.cpp +++ b/velox/serializers/PrestoBatchVectorSerializer.cpp @@ -21,6 +21,76 @@ #include "velox/serializers/VectorStream.h" namespace facebook::velox::serializer::presto::detail { +namespace { +// Populates mutableOffsets with the starting offset of each collection and the +// total size of all collections. +// +// Populates mutableSelectedRanges with the ranges to write from the children. +// +// Populates rangeIndex with the number of ranges to write from the children. +template +void computeCollectionRangesAndOffsets( + const VectorType* vector, + const folly::Range& ranges, + bool hasNulls, + int32_t* mutableOffsets, + IndexRange* mutableSelectedRanges, + size_t& rangeIndex) { + auto* rawSizes = vector->rawSizes(); + auto* rawOffsets = vector->rawOffsets(); + + // The first offset is always 0. + mutableOffsets[0] = 0; + // The index of the next offset to write in mutableOffsets. + size_t offsetsIndex = 1; + // The length all the collections in ranges seen so far. This is the offset to + // write for the next collection. + int32_t totalLength = 0; + if (hasNulls) { + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (range.isNull) { + std::fill_n(&mutableOffsets[offsetsIndex], range.size, totalLength); + offsetsIndex += range.size; + + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (vector->isNullAt(i)) { + mutableOffsets[offsetsIndex++] = totalLength; + } else { + auto length = rawSizes[i]; + totalLength += length; + mutableOffsets[offsetsIndex++] = totalLength; + + // We only have to write anything from the children if the collection + // is non-empty. + if (length > 0) { + mutableSelectedRanges[rangeIndex++] = {rawOffsets[i], rawSizes[i]}; + } + } + } + } + } else { + for (const auto& range : ranges) { + for (auto i = range.begin; i < range.begin + range.size; ++i) { + auto length = rawSizes[i]; + totalLength += length; + mutableOffsets[offsetsIndex++] = totalLength; + + // We only have to write anything from the children if the collection is + // non-empty. + if (length > 0) { + mutableSelectedRanges[rangeIndex++] = {rawOffsets[i], length}; + } + } + } + } +} +} // namespace + void PrestoBatchVectorSerializer::serialize( const RowVectorPtr& vector, const folly::Range& ranges, @@ -369,4 +439,63 @@ void PrestoBatchVectorSerializer::serializeRowVector( writeNullsSegment(hasNulls, vector, ranges, numRows, stream); } } + +template <> +void PrestoBatchVectorSerializer::serializeArrayVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream); + +template <> +void PrestoBatchVectorSerializer::serializeArrayVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream); + +template +void PrestoBatchVectorSerializer::serializeArrayVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* arrayVector = vector->as(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + bool hasNulls = this->hasNulls(vector, ranges); + + // This is used to hold the ranges of the elements Vector to write out. + ScratchPtr selectedRangesHolder(scratch_); + IndexRange* mutableSelectedRanges = selectedRangesHolder.get(numRows); + // This is used to hold the offsets of the arrays which we write out towards + // the end. + ScratchPtr offsetsHolder(scratch_); + int32_t* mutableOffsets = offsetsHolder.get(numRows + 1); + // The number of ranges to write out from the elements Vector. This is equal + // to the number of non-empty, non-null arrays in ranges. + size_t rangesSize = 0; + computeCollectionRangesAndOffsets( + arrayVector, + ranges, + hasNulls, + mutableOffsets, + mutableSelectedRanges, + rangesSize); + + // Write out the elements. + serializeColumn( + arrayVector->elements(), + folly::Range(mutableSelectedRanges, rangesSize), + stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + // Write out the offsets. + stream->write( + reinterpret_cast(mutableOffsets), (numRows + 1) * sizeof(int32_t)); + + // Write out the hasNull and isNUll flags. + writeNullsSegment(hasNulls, vector, ranges, numRows, stream); +} } // namespace facebook::velox::serializer::presto::detail diff --git a/velox/serializers/PrestoBatchVectorSerializer.h b/velox/serializers/PrestoBatchVectorSerializer.h index 488b4b1bd8a2..39a65cb8f826 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.h +++ b/velox/serializers/PrestoBatchVectorSerializer.h @@ -122,7 +122,7 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { serializeRowVector(vector, ranges, stream); break; case VectorEncoding::Simple::ARRAY: - // serializeArrayVector(vector, ranges, stream); + serializeArrayVector(vector, ranges, stream); break; case VectorEncoding::Simple::MAP: // serializeMapVector(vector, ranges, stream); @@ -547,6 +547,12 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { const folly::Range& ranges, BufferedOutputStream* stream); + template + void serializeArrayVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream); + StreamArena arena_; const std::unique_ptr codec_; const PrestoVectorSerde::PrestoOptions opts_;