diff --git a/velox/serializers/PrestoBatchVectorSerializer.cpp b/velox/serializers/PrestoBatchVectorSerializer.cpp index bcdf27092c87..aecc41b3f005 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.cpp +++ b/velox/serializers/PrestoBatchVectorSerializer.cpp @@ -42,7 +42,8 @@ void PrestoBatchVectorSerializer::serialize( opts_); if (numRows > 0) { - serializeColumn(vector->childAt(i), ranges, &streams[i], scratch); + velox::serializer::presto::detail::serializeColumn( + vector->childAt(i), ranges, &streams[i], scratch); } } @@ -260,4 +261,193 @@ void PrestoBatchVectorSerializer::writeNulls( } nulls_.flush(stream); } + +template +void PrestoBatchVectorSerializer::serializeRowVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* rowVector = vector->as(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + const bool hasNulls = this->hasNulls(vector, ranges); + + // The ranges to write of the child Vectors, this is the same as the ranges + // of this RowVector to write except positions where the row is null. + folly::Range childRanges; + // PrestoPage requires us to write out for each row 0 if the row is null or + // i if the row is the i'th non-null row. We track these values here. + ScratchPtr offsetsHolder(scratch_); + int32_t* mutableOffsets = offsetsHolder.get(numRows + 1); + // The first offset is always 0, this in addition to the offset per row. + mutableOffsets[0] = 0; + // The index at which we should write the next value in mutableOffsets. + size_t offsetsIndex = 1; + // The value of "offset" to write for the next non-null row. + int32_t rowOffset = 1; + + // We use this to construct contiguous ranges to write for the children, + // excluding any null rows. + ScratchPtr selectedRangesHolder(scratch_); + + if (hasNulls) { + IndexRange* mutableSelectedRanges = selectedRangesHolder.get(numRows); + // The index in mutableSelectedRanges to write the next range. + size_t rangeIndex = 0; + + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (range.isNull) { + std::fill_n(&mutableOffsets[offsetsIndex], range.size, 0); + offsetsIndex += range.size; + + continue; + } + } + + if (vector->mayHaveNulls() && + !bits::isAllSet( + vector->rawNulls(), range.begin, range.begin + range.size)) { + // The start of the current contiguous range. + int rangeStart = -1; + // The length of the current contiguous range. + int rangeSize = 0; + for (auto i = range.begin; i < range.begin + range.size; ++i) { + if (!vector->isNullAt(i)) { + mutableOffsets[offsetsIndex++] = rowOffset++; + + // If we aren't already in a contiguous range, mark the beginning. + if (rangeStart == -1) { + rangeStart = i; + } + // Continue the contiguous range. + rangeSize++; + } else { + mutableOffsets[offsetsIndex++] = 0; + + // If we were in a contiguous range, write it out to the scratch + // buffer and indicate we are no longer in one. + if (rangeStart != -1) { + mutableSelectedRanges[rangeIndex++] = + IndexRange{rangeStart, rangeSize}; + rangeStart = -1; + rangeSize = 0; + } + } + } + + // If we were in a contigous range, write out the last one. + if (rangeStart != -1) { + mutableSelectedRanges[rangeIndex++] = + IndexRange{rangeStart, rangeSize}; + } + } else { + // There are now nulls in this range, write out the offsets and copy + // the range to the scratch buffer. + std::iota( + &mutableOffsets[offsetsIndex], + &mutableOffsets[offsetsIndex + range.size], + rowOffset); + rowOffset += range.size; + offsetsIndex += range.size; + + mutableSelectedRanges[rangeIndex++] = + IndexRange{range.begin, range.size}; + } + } + + // Lastly update child ranges to exclude any null rows. + childRanges = + folly::Range(mutableSelectedRanges, rangeIndex); + } else { + // There are no null rows, so offsets is just an incrementing series and + // we can reuse ranges for the children. + std::iota(&mutableOffsets[1], &mutableOffsets[numRows + 1], rowOffset); + + if constexpr (std::is_same_v) { + IndexRange* mutableSelectedRanges = selectedRangesHolder.get(numRows); + // The index in mutableSelectedRanges to write the next range. + size_t rangeIndex = 0; + for (const auto& range : ranges) { + mutableSelectedRanges[rangeIndex++] = {range.begin, range.size}; + } + + childRanges = + folly::Range(mutableSelectedRanges, ranges.size()); + } else { + childRanges = ranges; + } + } + + if (opts_.nullsFirst) { + // Write out the number of rows. + writeInt32(stream, numRows); + // Write out the hasNull and isNull flags. + writeNullsSegment(hasNulls, vector, ranges, numRows, stream); + } + + // Write out the number of children. + writeInt32(stream, vector->type()->size()); + // Write out the children. + for (int32_t i = 0; i < rowVector->childrenSize(); ++i) { + serializeColumn(rowVector->childAt(i), childRanges, stream); + } + + if (!opts_.nullsFirst) { + // Write out the number of rows. + writeInt32(stream, numRows); + // Write out the offsets. + stream->write( + reinterpret_cast(mutableOffsets), + (numRows + 1) * sizeof(int32_t)); + // Write out the hasNull and isNull flags. + writeNullsSegment(hasNulls, vector, ranges, numRows, stream); + } +} + +template +void PrestoBatchVectorSerializer::serializeColumn( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + switch (vector->encoding()) { + case VectorEncoding::Simple::FLAT: + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + serializeFlatVector, vector->typeKind(), vector, ranges, stream); + break; + case VectorEncoding::Simple::CONSTANT: + // VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + // serializeConstantVector, + // vector->typeKind(), + // vector, + // ranges, + // stream); + break; + case VectorEncoding::Simple::DICTIONARY: + // VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + // serializeDictionaryVector, + // vector->typeKind(), + // vector, + // ranges, + // stream); + break; + case VectorEncoding::Simple::ROW: + serializeRowVector(vector, ranges, stream); + break; + case VectorEncoding::Simple::ARRAY: + // serializeArrayVector(vector, ranges, stream); + break; + case VectorEncoding::Simple::MAP: + // serializeMapVector(vector, ranges, stream); + break; + case VectorEncoding::Simple::LAZY: + serializeColumn(BaseVector::loadedVectorShared(vector), ranges, stream); + break; + default: + VELOX_UNSUPPORTED(); + } +} } // namespace facebook::velox::serializer::presto::detail diff --git a/velox/serializers/PrestoBatchVectorSerializer.h b/velox/serializers/PrestoBatchVectorSerializer.h index c1d10940e581..ee4700370eef 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.h +++ b/velox/serializers/PrestoBatchVectorSerializer.h @@ -92,6 +92,12 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { } } + template + void serializeColumn( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream); + template < TypeKind kind, typename RangeType, @@ -498,6 +504,12 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { nulls_.flush(stream); } + template + void serializeRowVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream); + StreamArena arena_; const std::unique_ptr codec_; const PrestoVectorSerde::PrestoOptions opts_;