Skip to content

Commit

Permalink
feat: Optimize PrestoBatchVectorSerializer [2/7]: Serialize RowVectors (
Browse files Browse the repository at this point in the history
facebookincubator#12072)

Summary:

Context:
This is a series of diffs in which I reimplement PrestoBatchVectorSerializer to write directly to the output stream, 
rather than the indirect route it currently uses via VectorStreams. Reusing VectorStreams and much of the code
for PrestoIterativeVectorSerializer prevented us from capturing all of the performance benefits of writing data in
batches rather than row by row. These changes combined will speed up PrestoBatchVectorSerializer 2-3x (as
measured in Presto queries and other use cases).

In the final diff I will integrate the new serialization functions into PrestoBatchVectorSerializer's serialize 
function which will switch it to the new optimized writing path, therefore I will land these changes as a stack.

In this diff:
I provide the implementations for serializing RowVectors.  Note, I will uncomment serializeColumn as I continue
to implement the functions it calls.

Differential Revision: D68044877
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Jan 14, 2025
1 parent 1de6e9d commit aa95c76
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 1 deletion.
192 changes: 191 additions & 1 deletion velox/serializers/PrestoBatchVectorSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ void PrestoBatchVectorSerializer::serialize(
opts_);

if (numRows > 0) {
serializeColumn(vector->childAt(i), ranges, &streams[i], scratch);
velox::serializer::presto::detail::serializeColumn(
vector->childAt(i), ranges, &streams[i], scratch);
}
}

Expand Down Expand Up @@ -260,4 +261,193 @@ void PrestoBatchVectorSerializer::writeNulls(
}
nulls_.flush(stream);
}

template <typename RangeType>
void PrestoBatchVectorSerializer::serializeRowVector(
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges,
BufferedOutputStream* stream) {
const auto* rowVector = vector->as<RowVector>();
const auto numRows = rangesTotalSize(ranges);

// Write out the header.
writeHeader(vector->type(), stream);

const bool hasNulls = this->hasNulls(vector, ranges);

// The ranges to write of the child Vectors, this is the same as the ranges
// of this RowVector to write except positions where the row is null.
folly::Range<const IndexRange*> childRanges;
// PrestoPage requires us to write out for each row 0 if the row is null or
// i if the row is the i'th non-null row. We track these values here.
ScratchPtr<int32_t, 64> offsetsHolder(scratch_);
int32_t* mutableOffsets = offsetsHolder.get(numRows + 1);
// The first offset is always 0, this in addition to the offset per row.
mutableOffsets[0] = 0;
// The index at which we should write the next value in mutableOffsets.
size_t offsetsIndex = 1;
// The value of "offset" to write for the next non-null row.
int32_t rowOffset = 1;

// We use this to construct contiguous ranges to write for the children,
// excluding any null rows.
ScratchPtr<IndexRange, 64> selectedRangesHolder(scratch_);

if (hasNulls) {
IndexRange* mutableSelectedRanges = selectedRangesHolder.get(numRows);
// The index in mutableSelectedRanges to write the next range.
size_t rangeIndex = 0;

for (const auto& range : ranges) {
if constexpr (std::is_same_v<RangeType, IndexRangeWithNulls>) {
if (range.isNull) {
std::fill_n(&mutableOffsets[offsetsIndex], range.size, 0);
offsetsIndex += range.size;

continue;
}
}

if (vector->mayHaveNulls() &&
!bits::isAllSet(
vector->rawNulls(), range.begin, range.begin + range.size)) {
// The start of the current contiguous range.
int rangeStart = -1;
// The length of the current contiguous range.
int rangeSize = 0;
for (auto i = range.begin; i < range.begin + range.size; ++i) {
if (!vector->isNullAt(i)) {
mutableOffsets[offsetsIndex++] = rowOffset++;

// If we aren't already in a contiguous range, mark the beginning.
if (rangeStart == -1) {
rangeStart = i;
}
// Continue the contiguous range.
rangeSize++;
} else {
mutableOffsets[offsetsIndex++] = 0;

// If we were in a contiguous range, write it out to the scratch
// buffer and indicate we are no longer in one.
if (rangeStart != -1) {
mutableSelectedRanges[rangeIndex++] =
IndexRange{rangeStart, rangeSize};
rangeStart = -1;
rangeSize = 0;
}
}
}

// If we were in a contigous range, write out the last one.
if (rangeStart != -1) {
mutableSelectedRanges[rangeIndex++] =
IndexRange{rangeStart, rangeSize};
}
} else {
// There are now nulls in this range, write out the offsets and copy
// the range to the scratch buffer.
std::iota(
&mutableOffsets[offsetsIndex],
&mutableOffsets[offsetsIndex + range.size],
rowOffset);
rowOffset += range.size;
offsetsIndex += range.size;

mutableSelectedRanges[rangeIndex++] =
IndexRange{range.begin, range.size};
}
}

// Lastly update child ranges to exclude any null rows.
childRanges =
folly::Range<const IndexRange*>(mutableSelectedRanges, rangeIndex);
} else {
// There are no null rows, so offsets is just an incrementing series and
// we can reuse ranges for the children.
std::iota(&mutableOffsets[1], &mutableOffsets[numRows + 1], rowOffset);

if constexpr (std::is_same_v<RangeType, IndexRangeWithNulls>) {
IndexRange* mutableSelectedRanges = selectedRangesHolder.get(numRows);
// The index in mutableSelectedRanges to write the next range.
size_t rangeIndex = 0;
for (const auto& range : ranges) {
mutableSelectedRanges[rangeIndex++] = {range.begin, range.size};
}

childRanges =
folly::Range<const IndexRange*>(mutableSelectedRanges, ranges.size());
} else {
childRanges = ranges;
}
}

if (opts_.nullsFirst) {
// Write out the number of rows.
writeInt32(stream, numRows);
// Write out the hasNull and isNull flags.
writeNullsSegment(hasNulls, vector, ranges, numRows, stream);
}

// Write out the number of children.
writeInt32(stream, vector->type()->size());
// Write out the children.
for (int32_t i = 0; i < rowVector->childrenSize(); ++i) {
serializeColumn(rowVector->childAt(i), childRanges, stream);
}

if (!opts_.nullsFirst) {
// Write out the number of rows.
writeInt32(stream, numRows);
// Write out the offsets.
stream->write(
reinterpret_cast<char*>(mutableOffsets),
(numRows + 1) * sizeof(int32_t));
// Write out the hasNull and isNull flags.
writeNullsSegment(hasNulls, vector, ranges, numRows, stream);
}
}

template <typename RangeType>
void PrestoBatchVectorSerializer::serializeColumn(
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges,
BufferedOutputStream* stream) {
switch (vector->encoding()) {
case VectorEncoding::Simple::FLAT:
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
serializeFlatVector, vector->typeKind(), vector, ranges, stream);
break;
case VectorEncoding::Simple::CONSTANT:
// VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
// serializeConstantVector,
// vector->typeKind(),
// vector,
// ranges,
// stream);
break;
case VectorEncoding::Simple::DICTIONARY:
// VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
// serializeDictionaryVector,
// vector->typeKind(),
// vector,
// ranges,
// stream);
break;
case VectorEncoding::Simple::ROW:
serializeRowVector(vector, ranges, stream);
break;
case VectorEncoding::Simple::ARRAY:
// serializeArrayVector(vector, ranges, stream);
break;
case VectorEncoding::Simple::MAP:
// serializeMapVector(vector, ranges, stream);
break;
case VectorEncoding::Simple::LAZY:
serializeColumn(BaseVector::loadedVectorShared(vector), ranges, stream);
break;
default:
VELOX_UNSUPPORTED();
}
}
} // namespace facebook::velox::serializer::presto::detail
12 changes: 12 additions & 0 deletions velox/serializers/PrestoBatchVectorSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
}
}

template <typename RangeType>
void serializeColumn(
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges,
BufferedOutputStream* stream);

template <
TypeKind kind,
typename RangeType,
Expand Down Expand Up @@ -498,6 +504,12 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
nulls_.flush(stream);
}

template <typename RangeType>
void serializeRowVector(
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges,
BufferedOutputStream* stream);

StreamArena arena_;
const std::unique_ptr<folly::io::Codec> codec_;
const PrestoVectorSerde::PrestoOptions opts_;
Expand Down

0 comments on commit aa95c76

Please sign in to comment.