Skip to content

Commit

Permalink
compress unsaferow and compact row
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 12, 2024
1 parent b44ffc9 commit e4e3b8f
Show file tree
Hide file tree
Showing 17 changed files with 589 additions and 125 deletions.
4 changes: 2 additions & 2 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
&options_);
options_.get());
resultOffset = result_->size();
}
}
Expand All @@ -154,7 +154,7 @@ RowVectorPtr Exchange::getOutput() {
outputType_,
&result_,
resultOffset,
&options_);
options_.get());
// We expect the row-wise deserialization to consume all the input into one
// output vector.
VELOX_CHECK(inputStream->atEnd());
Expand Down
10 changes: 8 additions & 2 deletions velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ class Exchange : public SourceOperator {
serdeKind_(exchangeNode->serdeKind()),
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {
options_.compressionKind =
if (serdeKind_ == VectorSerde::Kind::kPresto) {
options_ = std::make_unique<
serializer::presto::PrestoVectorSerde::PrestoOptions>();
} else {
options_ = std::make_unique<VectorSerde::Options>();
}
options_->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
}

Expand Down Expand Up @@ -115,7 +121,7 @@ class Exchange : public SourceOperator {
std::vector<std::unique_ptr<SerializedPage>> currentPages_;
bool atEnd_{false};
std::default_random_engine rng_{std::random_device{}()};
serializer::presto::PrestoVectorSerde::PrestoOptions options_;
std::unique_ptr<VectorSerde::Options> options_;
};

} // namespace facebook::velox::exec
1 change: 1 addition & 0 deletions velox/exec/OperatorTraceReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class OperatorTraceInputReader {
const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{
true,
common::CompressionKind_ZSTD, // TODO: Use trace config.
0.8,
/*_nullsFirst=*/true};
const std::shared_ptr<filesystems::FileSystem> fs_;
const RowTypePtr dataType_;
Expand Down
1 change: 1 addition & 0 deletions velox/exec/OperatorTraceWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class OperatorTraceInputWriter {
const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = {
true,
common::CompressionKind::CompressionKind_ZSTD,
0.8,
/*nullsFirst=*/true};
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
Expand Down
41 changes: 32 additions & 9 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,38 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
namespace {
std::unique_ptr<VectorSerde::Options> getVectorSerdeOptions(
VectorSerde::Kind kind) {
std::unique_ptr<VectorSerde::Options> options =
kind == VectorSerde::Kind::kPresto
? std::make_unique<serializer::presto::PrestoVectorSerde::PrestoOptions>()
: std::make_unique<VectorSerde::Options>();
options->compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options->minCompressionRatio = PartitionedOutput::minCompressionRatio();
return options;
}
} // namespace

namespace detail {
Destination::Destination(
const std::string& taskId,
int destination,
VectorSerde* serde,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
options_(getVectorSerdeOptions(serde_->kind())),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
setTargetSizePct();
}

BlockingReason Destination::advance(
uint64_t maxBytes,
const std::vector<vector_size_t>& sizes,
Expand Down Expand Up @@ -57,15 +88,7 @@ BlockingReason Destination::advance(
if (current_ == nullptr) {
current_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
const auto rowType = asRowType(output->type());
if (serde_->kind() == VectorSerde::Kind::kPresto) {
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.compressionKind =
OutputBufferManager::getInstance().lock()->compressionKind();
options.minCompressionRatio = PartitionedOutput::minCompressionRatio();
current_->createStreamTree(rowType, rowsInCurrent_, &options);
} else {
current_->createStreamTree(rowType, rowsInCurrent_);
}
current_->createStreamTree(rowType, rowsInCurrent_, options_.get());
}

const auto rows = folly::Range(&rows_[firstRow], rowIdx_ - firstRow);
Expand Down
11 changes: 2 additions & 9 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,7 @@ class Destination {
VectorSerde* serde,
memory::MemoryPool* pool,
bool eagerFlush,
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued)
: taskId_(taskId),
destination_(destination),
serde_(serde),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
setTargetSizePct();
}
std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued);

/// Resets the destination before starting a new batch.
void beginBatch() {
Expand Down Expand Up @@ -112,6 +104,7 @@ class Destination {
const std::string taskId_;
const int destination_;
VectorSerde* const serde_;
const std::unique_ptr<VectorSerde::Options> options_;
memory::MemoryPool* const pool_;
const bool eagerFlush_;
const std::function<void(uint64_t bytes, uint64_t rows)> recordEnqueued_;
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/SpillFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ uint64_t SpillWriter::write(
NanosecondTimer timer(&timeNs);
if (batch_ == nullptr) {
serializer::presto::PrestoVectorSerde::PrestoOptions options = {
kDefaultUseLosslessTimestamp, compressionKind_, true /*nullsFirst*/};
kDefaultUseLosslessTimestamp,
compressionKind_,
0.8,
true /*nullsFirst*/};
batch_ = std::make_unique<VectorStreamGroup>(pool_, serde_);
batch_->createStreamTree(
std::static_pointer_cast<const RowType>(rows->type()),
Expand Down Expand Up @@ -300,6 +303,7 @@ SpillReadFile::SpillReadFile(
readOptions_{
kDefaultUseLosslessTimestamp,
compressionKind_,
0.8,
/*nullsFirst=*/true},
pool_(pool),
serde_(getNamedVectorSerde(VectorSerde::Kind::kPresto)),
Expand Down
10 changes: 3 additions & 7 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2408,22 +2408,18 @@ TEST_P(MultiFragmentTest, mergeSmallBatchesInExchange) {
test(100'000, 1);
} else if (GetParam() == VectorSerde::Kind::kCompactRow) {
test(1, 1'000);
test(1'000, 28);
test(10'000, 3);
test(1'000, 38);
test(10'000, 4);
test(100'000, 1);
} else {
test(1, 1'000);
test(1'000, 63);
test(1'000, 72);
test(10'000, 7);
test(100'000, 1);
}
}

TEST_P(MultiFragmentTest, compression) {
// NOTE: only presto format supports compression for now
if (GetParam() != VectorSerde::Kind::kPresto) {
return;
}
bufferManager_->testingSetCompression(
common::CompressionKind::CompressionKind_LZ4);
auto guard = folly::makeGuard([&]() {
Expand Down
Loading

0 comments on commit e4e3b8f

Please sign in to comment.