-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support compress UnsafeRow and CompactRow #11497
Conversation
✅ Deploy Preview for meta-velox canceled.
|
e67e248
to
e4e3b8f
Compare
namespace { | ||
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions( | ||
VectorSerde::Kind kind) { | ||
std::unique_ptr<VectorSerde::Options> options = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if () {
} else {
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ternary conditional operator can get the VectorSerde::Options, then set the field of options, otherwise, we must set in each branch.
velox/exec/SpillFile.cpp
Outdated
kDefaultUseLosslessTimestamp, | ||
compressionKind_, | ||
0.8, | ||
true /*nullsFirst*/}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/*nullsFirst=*/true
velox/exec/PartitionedOutput.cpp
Outdated
} else { | ||
current_->createStreamTree(rowType, rowsInCurrent_); | ||
} | ||
current_->createStreamTree(rowType, rowsInCurrent_, options_.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create options in partition output operator and pass to destination as a raw pointer and partition output operator owns the option? Thanks!
velox/exec/Exchange.h
Outdated
@@ -54,7 +54,13 @@ class Exchange : public SourceOperator { | |||
serdeKind_(exchangeNode->serdeKind()), | |||
processSplits_{operatorCtx_->driverCtx()->driverId == 0}, | |||
exchangeClient_{std::move(exchangeClient)} { | |||
options_.compressionKind = | |||
if (serdeKind_ == VectorSerde::Kind::kPresto) { | |||
options_ = std::make_unique< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make options_ a const member and set through an anonymous function?
options_(getVectorSerdeOptions(kind))
velox/exec/Exchange.h
Outdated
@@ -115,7 +121,7 @@ class Exchange : public SourceOperator { | |||
std::vector<std::unique_ptr<SerializedPage>> currentPages_; | |||
bool atEnd_{false}; | |||
std::default_random_engine rng_{std::random_device{}()}; | |||
serializer::presto::PrestoVectorSerde::PrestoOptions options_; | |||
std::unique_ptr<VectorSerde::Options> options_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const?
return *options; | ||
} | ||
|
||
std::unique_ptr<folly::IOBuf> buffersToIOBuf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just call it toIOBuf since this is a very limited use case?
} | ||
|
||
struct CompactRowHeader { | ||
int32_t uncompressedSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the initial values for these fields? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not have initial value, it should set each field for it like PrestoHeader. https://github.com/facebookincubator/velox/blob/main/velox/serializers/PrestoSerializer.cpp#L221
b2d564a
to
c584816
Compare
Failed by existing error velox_hdfs_file_test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinchengchenghh nice change % minors. Thanks!
velox/serializers/RowSerializer.h
Outdated
void write(OutputStream* out) { | ||
out->write(reinterpret_cast<char*>(&uncompressedSize), sizeof(int32_t)); | ||
out->write(reinterpret_cast<char*>(&compressedSize), sizeof(int32_t)); | ||
char writeValue = compressed ? 1 : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const?
velox/serializers/RowSerializer.h
Outdated
std::string debugString() const { | ||
return fmt::format( | ||
"uncompressedSize: {}, compressedSize: {}, compressed: {}", | ||
uncompressedSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
succinctBytes?
velox/serializers/RowSerializer.h
Outdated
} | ||
|
||
// The serialization format is | uncompressedSize | compressedSize | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// for public comments.
velox/serializers/RowSerializer.h
Outdated
// in a different buffer. | ||
if (serializedBuffer->size() < rowSize) { | ||
concatenatePartialRow(source, rowSize, *serializedBuffer); | ||
std::unique_ptr<folly::IOBuf> uncompressedBuf = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::unique_ptr<folly::IOBuf> uncompressedBuf;
velox/serializers/RowSerializer.h
Outdated
std::unique_ptr<folly::IOBuf> uncompressedBuf = nullptr; | ||
const auto header = detail::RowHeader::read(source); | ||
if (header.compressed) { | ||
VELOX_DCHECK( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VELOX_DCHECK_NE
velox/serializers/RowSerializer.h
Outdated
codec->uncompress(compressBuf.get(), header.uncompressedSize); | ||
} | ||
std::unique_ptr<ByteInputStream> uncompressedStream; | ||
ByteInputStream* uncompressedSource; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uncompressedSource{nullptr}
CI failed by #11898 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinchengchenghh thanks for the update!
velox/serializers/RowSerializer.h
Outdated
return sizeof(int32_t) * 2 + sizeof(char); | ||
} | ||
|
||
std::string debugString() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/debugString/toString/
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For row-wise shuffle, this helps to reduce 1/3 for shuffle volume from one stage.
@xiaoxmeng merged this pull request in 0ee82f3. |
Convert the buffers to
folly::IOBuf
, then compress it, and record some stats, skip compression when compressedSize/uncompressedSize exceeds minCompressionRatio with default value 0.8.Serialization format is:
| uncompressedSize | compressedSize | compressed | serializedData for Iterator[Row] |
Test the RowVector with all types and size is 500, the test output is as following: