Skip to content

Commit

Permalink
feat: Optimize PrestoBatchVectorSerializer [1/7]: Serialize FlatVecto…
Browse files Browse the repository at this point in the history
…rs (facebookincubator#12060)

Summary:
Pull Request resolved: facebookincubator#12060

Context:
This is a series of diffs in which I reimplement PrestoBatchVectorSerializer to write directly to the output stream,
rather than the indirect route it currently uses via VectorStreams. Reusing VectorStreams and much of the code
for PrestoIterativeVectorSerializer prevented us from capturing all of the performance benefits of writing data in
batches rather than row by row. These changes combined will speed up PrestoBatchVectorSerializer 2-3x (as
measured in Presto queries and other use cases).

In the final diff I will integrate the new serialization functions into PrestoBatchVectorSerializer's serialize
function which will switch it to the new optimized writing path, therefore I will land these changes as a stack.

In this diff:
I provide the implementations for serializing FlatVectors.

Differential Revision: D68037258
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Jan 13, 2025
1 parent f3bf1e7 commit edf01b5
Show file tree
Hide file tree
Showing 3 changed files with 526 additions and 7 deletions.
54 changes: 51 additions & 3 deletions velox/serializers/PrestoBatchVectorSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ void PrestoBatchVectorSerializer::serialize(
const auto rowType = vector->type();
const auto numChildren = vector->childrenSize();

StreamArena arena(pool_);
std::vector<VectorStream> streams;
streams.reserve(numChildren);
for (int i = 0; i < numChildren; i++) {
streams.emplace_back(
rowType->childAt(i),
std::nullopt,
vector->childAt(i),
&arena,
&arena_,
numRows,
opts_);

Expand All @@ -48,7 +47,9 @@ void PrestoBatchVectorSerializer::serialize(
}

flushStreams(
streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream);
streams, numRows, arena_, *codec_, opts_.minCompressionRatio, stream);

arena_.clear();
}

void PrestoBatchVectorSerializer::estimateSerializedSizeImpl(
Expand Down Expand Up @@ -178,4 +179,51 @@ void PrestoBatchVectorSerializer::estimateSerializedSizeImpl(
VELOX_UNSUPPORTED("Unsupported vector encoding {}", vector->encoding());
}
}

void PrestoBatchVectorSerializer::writeHeader(
const TypePtr& type,
BufferedOutputStream* stream) {
auto encoding = typeToEncodingName(type);
writeInt32(stream, encoding.size());
stream->write(encoding.data(), encoding.size());
}

template <>
bool PrestoBatchVectorSerializer::hasNulls(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges) {
if (vector->nulls()) {
for (auto& range : ranges) {
if (!bits::isAllSet(
vector->rawNulls(), range.begin, range.begin + range.size)) {
return true;
}
}
}

return false;
}

template <>
bool PrestoBatchVectorSerializer::hasNulls(
const VectorPtr& vector,
const folly::Range<const IndexRangeWithNulls*>& ranges) {
if (vector->nulls()) {
for (auto& range : ranges) {
if (range.isNull ||
!bits::isAllSet(
vector->rawNulls(), range.begin, range.begin + range.size)) {
return true;
}
}
} else {
for (auto& range : ranges) {
if (range.isNull) {
return true;
}
}
}

return false;
}
} // namespace facebook::velox::serializer::presto::detail
Loading

0 comments on commit edf01b5

Please sign in to comment.