Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support compress UnsafeRow and CompactRow #11497

Closed

Conversation

jinchengchenghh
Copy link
Contributor

@jinchengchenghh jinchengchenghh commented Nov 11, 2024

Convert the buffers to folly::IOBuf, then compress it, and record some stats, skip compression when compressedSize/uncompressedSize exceeds minCompressionRatio with default value 0.8.
Serialization format is:
| uncompressedSize | compressedSize | compressed | serializedData for Iterator[Row] |
Test the RowVector with all types and size is 500, the test output is as following:

row kind compression kind uncompressedSize compressedSize compression ratio
UnsafeRow zlib 519344 227749 44%
UnsafeRow snappy 307936 115012 37%
UnsafeRow zstd 689544 273433 40%
UnsafeRow lz4 622688 205956 33%
UnsafeRow gzip 759608 213922 28%
CompactRow zlib 263474 129241 49%
CompactRow snappy 388313 78297 20%
CompactRow zstd 224144 92744 41%
CompactRow lz4 110043 61615 56%
CompactRow gzip 224631 93989 42%

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Nov 11, 2024
Copy link

netlify bot commented Nov 11, 2024

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 8fd8f14
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/67627e42c9e07200098285ee

@jinchengchenghh jinchengchenghh changed the title Support compress UnsafeRow and CompactRow feat: Support compress UnsafeRow and CompactRow Nov 15, 2024
namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if () {
} else {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ternary conditional operator can get the VectorSerde::Options, then set the field of options, otherwise, we must set in each branch.

kDefaultUseLosslessTimestamp,
compressionKind_,
0.8,
true /*nullsFirst*/};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/*nullsFirst=*/true

} else {
current_->createStreamTree(rowType, rowsInCurrent_);
}
current_->createStreamTree(rowType, rowsInCurrent_, options_.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create options in partition output operator and pass to destination as a raw pointer and partition output operator owns the option? Thanks!

@@ -54,7 +54,13 @@ class Exchange : public SourceOperator {
serdeKind_(exchangeNode->serdeKind()),
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
if (serdeKind_ == VectorSerde::Kind::kPresto) {
options_ = std::make_unique<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make options_ a const member and set through an anonymous function?

options_(getVectorSerdeOptions(kind))

@@ -115,7 +121,7 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
serializer::presto::PrestoVectorSerde::PrestoOptions options_;
std::unique_ptr<VectorSerde::Options> options_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const?

return *options;
}

std::unique_ptr<folly::IOBuf> buffersToIOBuf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just call it toIOBuf since this is a very limited use case?

}

struct CompactRowHeader {
int32_t uncompressedSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the initial values for these fields? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not have initial value, it should set each field for it like PrestoHeader. https://github.com/facebookincubator/velox/blob/main/velox/serializers/PrestoSerializer.cpp#L221

@jinchengchenghh jinchengchenghh force-pushed the compress branch 3 times, most recently from b2d564a to c584816 Compare December 16, 2024 04:21
@jinchengchenghh
Copy link
Contributor Author

Failed by existing error velox_hdfs_file_test

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh nice change % minors. Thanks!

velox/serializers/RowSerializer.h Show resolved Hide resolved
void write(OutputStream* out) {
out->write(reinterpret_cast<char*>(&uncompressedSize), sizeof(int32_t));
out->write(reinterpret_cast<char*>(&compressedSize), sizeof(int32_t));
char writeValue = compressed ? 1 : 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const?

std::string debugString() const {
return fmt::format(
"uncompressedSize: {}, compressedSize: {}, compressed: {}",
uncompressedSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

succinctBytes?

velox/serializers/RowSerializer.h Show resolved Hide resolved
}

// The serialization format is | uncompressedSize | compressedSize |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// for public comments.

// in a different buffer.
if (serializedBuffer->size() < rowSize) {
concatenatePartialRow(source, rowSize, *serializedBuffer);
std::unique_ptr<folly::IOBuf> uncompressedBuf = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::unique_ptr<folly::IOBuf> uncompressedBuf;

std::unique_ptr<folly::IOBuf> uncompressedBuf = nullptr;
const auto header = detail::RowHeader::read(source);
if (header.compressed) {
VELOX_DCHECK(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VELOX_DCHECK_NE

velox/serializers/RowSerializer.h Show resolved Hide resolved
codec->uncompress(compressBuf.get(), header.uncompressedSize);
}
std::unique_ptr<ByteInputStream> uncompressedStream;
ByteInputStream* uncompressedSource;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncompressedSource{nullptr}

velox/serializers/RowSerializer.h Show resolved Hide resolved
@jinchengchenghh
Copy link
Contributor Author

CI failed by #11898

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh thanks for the update!

velox/serializers/RowSerializer.h Show resolved Hide resolved
return sizeof(int32_t) * 2 + sizeof(char);
}

std::string debugString() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/debugString/toString/

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@jinchengchenghh
Copy link
Contributor Author

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For row-wise shuffle, this helps to reduce 1/3 for shuffle volume from one stage.

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng merged this pull request in 0ee82f3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants