Skip to content

Commit

Permalink
feat: Optimize PrestoBatchVectorSerializer [3/7]: Serialize ArrayVect…
Browse files Browse the repository at this point in the history
…ors (facebookincubator#12062)

Summary:
Pull Request resolved: facebookincubator#12062

Context:
This is a series of diffs in which I reimplement PrestoBatchVectorSerializer to write directly to the output stream,
rather than the indirect route it currently uses via VectorStreams. Reusing VectorStreams and much of the code
for PrestoIterativeVectorSerializer prevented us from capturing all of the performance benefits of writing data in
batches rather than row by row. These changes combined will speed up PrestoBatchVectorSerializer 2-3x (as
measured in Presto queries and other use cases).

In the final diff I will integrate the new serialization functions into PrestoBatchVectorSerializer's serialize
function which will switch it to the new optimized writing path, therefore I will land these changes as a stack.

In this diff:
I provide the implementations for serializing ArrayVectors.

Differential Revision: D68047629
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Jan 13, 2025
1 parent 8d8d742 commit bdc8bc0
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 1 deletion.
129 changes: 129 additions & 0 deletions velox/serializers/PrestoBatchVectorSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,76 @@
#include "velox/serializers/VectorStream.h"

namespace facebook::velox::serializer::presto::detail {
namespace {
// Populates mutableOffsets with the starting offset of each collection and the
// total size of all collections.
//
// Populates mutableSelectedRanges with the ranges to write from the children.
//
// Populates rangeIndex with the number of ranges to write from the children.
template <typename VectorType, typename RangeType>
void computeCollectionRangesAndOffsets(
const VectorType* vector,
const folly::Range<const RangeType*>& ranges,
bool hasNulls,
int32_t* mutableOffsets,
IndexRange* mutableSelectedRanges,
size_t& rangeIndex) {
auto* rawSizes = vector->rawSizes();
auto* rawOffsets = vector->rawOffsets();

// The first offset is always 0.
mutableOffsets[0] = 0;
// The index of the next offset to write in mutableOffsets.
size_t offsetsIndex = 1;
// The length all the collections in ranges seen so far. This is the offset to
// write for the next collection.
int32_t totalLength = 0;
if (hasNulls) {
for (const auto& range : ranges) {
if constexpr (std::is_same_v<RangeType, IndexRangeWithNulls>) {
if (range.isNull) {
std::fill_n(&mutableOffsets[offsetsIndex], range.size, totalLength);
offsetsIndex += range.size;

continue;
}
}

for (int32_t i = range.begin; i < range.begin + range.size; ++i) {
if (vector->isNullAt(i)) {
mutableOffsets[offsetsIndex++] = totalLength;
} else {
auto length = rawSizes[i];
totalLength += length;
mutableOffsets[offsetsIndex++] = totalLength;

// We only have to write anything from the children if the collection
// is non-empty.
if (length > 0) {
mutableSelectedRanges[rangeIndex++] = {rawOffsets[i], rawSizes[i]};
}
}
}
}
} else {
for (const auto& range : ranges) {
for (auto i = range.begin; i < range.begin + range.size; ++i) {
auto length = rawSizes[i];
totalLength += length;
mutableOffsets[offsetsIndex++] = totalLength;

// We only have to write anything from the children if the collection is
// non-empty.
if (length > 0) {
mutableSelectedRanges[rangeIndex++] = {rawOffsets[i], length};
}
}
}
}
}
} // namespace

void PrestoBatchVectorSerializer::serialize(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Expand Down Expand Up @@ -369,4 +439,63 @@ void PrestoBatchVectorSerializer::serializeRowVector(
writeNullsSegment(hasNulls, vector, ranges, numRows, stream);
}
}

template <>
void PrestoBatchVectorSerializer::serializeArrayVector(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
BufferedOutputStream* stream);

template <>
void PrestoBatchVectorSerializer::serializeArrayVector(
const VectorPtr& vector,
const folly::Range<const IndexRangeWithNulls*>& ranges,
BufferedOutputStream* stream);

template <typename RangeType>
void PrestoBatchVectorSerializer::serializeArrayVector(
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges,
BufferedOutputStream* stream) {
const auto* arrayVector = vector->as<ArrayVector>();
const auto numRows = rangesTotalSize(ranges);

// Write out the header.
writeHeader(vector->type(), stream);

bool hasNulls = this->hasNulls(vector, ranges);

// This is used to hold the ranges of the elements Vector to write out.
ScratchPtr<IndexRange, 64> selectedRangesHolder(scratch_);
IndexRange* mutableSelectedRanges = selectedRangesHolder.get(numRows);
// This is used to hold the offsets of the arrays which we write out towards
// the end.
ScratchPtr<int32_t, 64> offsetsHolder(scratch_);
int32_t* mutableOffsets = offsetsHolder.get(numRows + 1);
// The number of ranges to write out from the elements Vector. This is equal
// to the number of non-empty, non-null arrays in ranges.
size_t rangesSize = 0;
computeCollectionRangesAndOffsets(
arrayVector,
ranges,
hasNulls,
mutableOffsets,
mutableSelectedRanges,
rangesSize);

// Write out the elements.
serializeColumn(
arrayVector->elements(),
folly::Range<const IndexRange*>(mutableSelectedRanges, rangesSize),
stream);

// Write out the number of rows.
writeInt32(stream, numRows);
// Write out the offsets.
stream->write(
reinterpret_cast<char*>(mutableOffsets), (numRows + 1) * sizeof(int32_t));

// Write out the hasNull and isNUll flags.
writeNullsSegment(hasNulls, vector, ranges, numRows, stream);
}
} // namespace facebook::velox::serializer::presto::detail
8 changes: 7 additions & 1 deletion velox/serializers/PrestoBatchVectorSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
serializeRowVector(vector, ranges, stream);
break;
case VectorEncoding::Simple::ARRAY:
// serializeArrayVector(vector, ranges, stream);
serializeArrayVector(vector, ranges, stream);
break;
case VectorEncoding::Simple::MAP:
// serializeMapVector(vector, ranges, stream);
Expand Down Expand Up @@ -547,6 +547,12 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
const folly::Range<const RangeType*>& ranges,
BufferedOutputStream* stream);

template <typename RangeType>
void serializeArrayVector(
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges,
BufferedOutputStream* stream);

StreamArena arena_;
const std::unique_ptr<folly::io::Codec> codec_;
const PrestoVectorSerde::PrestoOptions opts_;
Expand Down

0 comments on commit bdc8bc0

Please sign in to comment.