diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 9f15ea440c91..2c405f5bc5bc 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -209,10 +209,11 @@ void GroupingSet::noMoreInput() { addRemainingInput(); } + VELOX_CHECK_NULL(outputSpiller_); // Spill the remaining in-memory state to disk if spilling has been triggered // on this grouping set. This is to simplify query OOM prevention when // producing output as we don't support to spill during that stage as for now. - if (hasSpilled()) { + if (inputSpiller_ != nullptr) { spill(); } @@ -220,7 +221,11 @@ void GroupingSet::noMoreInput() { } bool GroupingSet::hasSpilled() const { - return spiller_ != nullptr; + if (inputSpiller_ != nullptr) { + VELOX_CHECK_NULL(outputSpiller_); + return true; + } + return outputSpiller_ != nullptr; } bool GroupingSet::hasOutput() { @@ -980,6 +985,18 @@ RowTypePtr GroupingSet::makeSpillType() const { return ROW(std::move(names), std::move(types)); } +std::optional GroupingSet::spilledStats() const { + if (!hasSpilled()) { + return std::nullopt; + } + if (inputSpiller_ != nullptr) { + VELOX_CHECK_NULL(outputSpiller_); + return inputSpiller_->stats(); + } + VELOX_CHECK_NOT_NULL(outputSpiller_); + return outputSpiller_->stats(); +} + void GroupingSet::spill() { // NOTE: if the disk spilling is triggered by the memory arbitrator, then it // is possible that the grouping set hasn't processed any input data yet. @@ -989,11 +1006,11 @@ void GroupingSet::spill() { } auto* rows = table_->rows(); - if (!hasSpilled()) { + VELOX_CHECK_NULL(outputSpiller_); + if (inputSpiller_ == nullptr) { VELOX_DCHECK(pool_.trackUsage()); VELOX_CHECK(numDistinctSpillFilesPerPartition_.empty()); - spiller_ = std::make_unique( - Spiller::Type::kAggregateInput, + inputSpiller_ = std::make_unique( rows, makeSpillType(), HashBitRange( @@ -1006,22 +1023,24 @@ void GroupingSet::spill() { spillConfig_, spillStats_); VELOX_CHECK_EQ( - spiller_->state().maxPartitions(), 1 << spillConfig_->numPartitionBits); + inputSpiller_->state().maxPartitions(), 1 << spillConfig_->numPartitionBits); } // Spilling may execute on multiple partitions in parallel, and // HashStringAllocator is not thread safe. If any aggregations // allocate/deallocate memory during spilling it can lead to concurrency bugs. // Freeze the HashStringAllocator to make it effectively immutable and // guarantee we don't accidentally enter an unsafe situation. - rows->stringAllocator().freezeAndExecute([&]() { spiller_->spill(); }); + rows->stringAllocator().freezeAndExecute([&]() { + inputSpiller_->spill(); + }); if (isDistinct() && numDistinctSpillFilesPerPartition_.empty()) { size_t totalNumDistinctSpilledFiles{0}; numDistinctSpillFilesPerPartition_.resize( - spiller_->state().maxPartitions(), 0); - for (int partition = 0; partition < spiller_->state().maxPartitions(); + inputSpiller_->state().maxPartitions(), 0); + for (int partition = 0; partition < inputSpiller_->state().maxPartitions(); ++partition) { numDistinctSpillFilesPerPartition_[partition] = - spiller_->state().numFinishedFiles(partition); + inputSpiller_->state().numFinishedFiles(partition); totalNumDistinctSpilledFiles += numDistinctSpillFilesPerPartition_[partition]; } @@ -1042,20 +1061,17 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) { auto* rows = table_->rows(); VELOX_CHECK(pool_.trackUsage()); - spiller_ = std::make_unique( - Spiller::Type::kAggregateOutput, - rows, - makeSpillType(), - spillConfig_, - spillStats_); + outputSpiller_ = std::make_unique( + rows, makeSpillType(), spillConfig_, spillStats_); // Spilling may execute on multiple partitions in parallel, and // HashStringAllocator is not thread safe. If any aggregations // allocate/deallocate memory during spilling it can lead to concurrency bugs. // Freeze the HashStringAllocator to make it effectively immutable and // guarantee we don't accidentally enter an unsafe situation. - rows->stringAllocator().freezeAndExecute( - [&]() { spiller_->spill(rowIterator); }); + rows->stringAllocator().freezeAndExecute([&]() { + outputSpiller_->spill(rowIterator); + }); table_->clear(/*freeTable=*/true); } @@ -1091,7 +1107,13 @@ bool GroupingSet::getOutputWithSpill( table_->clear(/*freeTable=*/true); VELOX_CHECK_NULL(merge_); - spiller_->finishSpill(spillPartitionSet_); + if (inputSpiller_ != nullptr) { + VELOX_CHECK_NULL(outputSpiller_); + inputSpiller_->finishSpill(spillPartitionSet_); + } else { + VELOX_CHECK_NOT_NULL(outputSpiller_); + outputSpiller_->finishSpill(spillPartitionSet_); + } removeEmptyPartitions(spillPartitionSet_); if (!prepareNextSpillPartitionOutput()) { @@ -1176,9 +1198,11 @@ bool GroupingSet::mergeNextWithoutAggregates( const RowVectorPtr& result) { VELOX_CHECK_NOT_NULL(merge_); VELOX_CHECK(isDistinct()); + VELOX_CHECK_NULL(outputSpiller_); + VELOX_CHECK_NOT_NULL(inputSpiller_); VELOX_CHECK_EQ( numDistinctSpillFilesPerPartition_.size(), - spiller_->state().maxPartitions()); + inputSpiller_->state().maxPartitions()); // We are looping over sorted rows produced by tree-of-losers. We logically // split the stream into runs of duplicate rows. As we process each run we @@ -1414,4 +1438,56 @@ std::optional GroupingSet::estimateOutputRowSize() const { } return table_->rows()->estimateRowSize(); } + +AggregationInputSpiller::AggregationInputSpiller( + RowContainer* container, + RowTypePtr rowType, + const HashBitRange& hashBitRange, + int32_t numSortingKeys, + const std::vector& sortCompareFlags, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + hashBitRange, + numSortingKeys, + sortCompareFlags, + std::numeric_limits::max(), + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} + +AggregationOutputSpiller::AggregationOutputSpiller( + RowContainer* container, + RowTypePtr rowType, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + HashBitRange{}, + 0, + {}, + std::numeric_limits::max(), + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} + +void AggregationInputSpiller::spill() { + SpillerBase::spill(nullptr); +} + +void AggregationOutputSpiller::spill(const RowContainerIterator& startRowIter) { + SpillerBase::spill(&startRowIter); +} + +void AggregationOutputSpiller::runSpill(bool lastRun) { + SpillerBase::runSpill(lastRun); + if (lastRun) { + for (auto partition = 0; partition < spillRuns_.size(); ++partition) { + state_.finishFile(partition); + } + } +} } // namespace facebook::velox::exec diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index 9edb3cfac34d..abb66a9287d0 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -25,6 +25,8 @@ #include "velox/exec/VectorHasher.h" namespace facebook::velox::exec { +class AggregationInputSpiller; +class AggregationOutputSpiller; class GroupingSet { public: @@ -110,16 +112,12 @@ class GroupingSet { void spill(); /// Spills all the rows in container starting from the offset specified by - /// 'rowIterator'. + /// 'rowIterator'. This should be only called during output processing and + /// when no spill has occurred previously. void spill(const RowContainerIterator& rowIterator); /// Returns the spiller stats including total bytes and rows spilled so far. - std::optional spilledStats() const { - if (spiller_ == nullptr) { - return std::nullopt; - } - return spiller_->stats(); - } + std::optional spilledStats() const; /// Returns true if spilling has triggered on this grouping set. bool hasSpilled() const; @@ -134,8 +132,8 @@ class GroupingSet { return table_ ? table_->rows()->numRows() : 0; } - // Frees hash tables and other state when giving up partial aggregation as - // non-productive. Must be called before toIntermediate() is used. + /// Frees hash tables and other state when giving up partial aggregation as + /// non-productive. Must be called before toIntermediate() is used. void abandonPartialAggregation(); /// Translates the raw input in input to accumulators initialized from a @@ -342,7 +340,9 @@ class GroupingSet { // 'remainingInput_'. bool remainingMayPushdown_; - std::unique_ptr spiller_; + std::unique_ptr inputSpiller_; + + std::unique_ptr outputSpiller_; // The current spill partition in producing spill output. If it is -1, then we // haven't started yet. @@ -391,4 +391,52 @@ class GroupingSet { folly::Synchronized* const spillStats_; }; +class AggregationInputSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "AggregationInputSpiller"; + + AggregationInputSpiller( + RowContainer* container, + RowTypePtr rowType, + const HashBitRange& hashBitRange, + int32_t numSortingKeys, + const std::vector& sortCompareFlags, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + void spill(); + + private: + std::string type() const override { + return std::string(kType); + } + + bool needSort() const override { + return true; + } +}; + +class AggregationOutputSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "AggregationOutputSpiller"; + + AggregationOutputSpiller( + RowContainer* container, + RowTypePtr rowType, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + void spill(const RowContainerIterator& startRowIter); + + private: + std::string type() const override { + return std::string(kType); + } + + void runSpill(bool lastRun) override; + + bool needSort() const override { + return false; + } +}; } // namespace facebook::velox::exec diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index f6220a5acf99..63eac56225f3 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -220,8 +220,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { exceededMaxSpillLevelLimit_ = false; } - spiller_ = std::make_unique( - Spiller::Type::kHashJoinBuild, + spiller_ = std::make_unique( joinType_, table_->rows(), spillType_, @@ -418,7 +417,8 @@ void HashBuild::addInput(RowVectorPtr input) { void HashBuild::ensureInputFits(RowVectorPtr& input) { // NOTE: we don't need memory reservation if all the partitions are spilling // as we spill all the input rows to disk directly. - if (!canSpill() || spiller_ == nullptr || spiller_->isAllSpilled()) { + if (!canSpill() || spiller_ == nullptr || + spiller_->state().isAllPartitionSpilled()) { return; } @@ -488,7 +488,7 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) { // itself, we will no longer need the reserved memory for building hash // table as the table is spilled, and the input will be directly spilled, // too. - if (spiller_->isAllSpilled()) { + if (spiller_->state().isAllPartitionSpilled()) { pool()->release(); } return; @@ -503,13 +503,14 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) { void HashBuild::spillInput(const RowVectorPtr& input) { VELOX_CHECK_EQ(input->size(), activeRows_.size()); - if (!canSpill() || spiller_ == nullptr || !spiller_->isAnySpilled() || + if (!canSpill() || spiller_ == nullptr || + !spiller_->state().isAnyPartitionSpilled() || !activeRows_.hasSelections()) { return; } const auto numInput = input->size(); - prepareInputIndicesBuffers(numInput, spiller_->spilledPartitionSet()); + prepareInputIndicesBuffers(numInput, spiller_->state().spilledPartitionSet()); computeSpillPartitions(input); vector_size_t numSpillInputs = 0; @@ -518,7 +519,7 @@ void HashBuild::spillInput(const RowVectorPtr& input) { if (FOLLY_UNLIKELY(!activeRows_.isValid(row))) { continue; } - if (!spiller_->isSpilled(partition)) { + if (!spiller_->state().isPartitionSpilled(partition)) { continue; } activeRows_.setValid(row, false); @@ -537,7 +538,7 @@ void HashBuild::spillInput(const RowVectorPtr& input) { if (numInputs == 0) { continue; } - VELOX_CHECK(spiller_->isSpilled(partition)); + VELOX_CHECK(spiller_->state().isPartitionSpilled(partition)); spillPartition( partition, numInputs, spillInputIndicesBuffers_[partition], input); } @@ -705,7 +706,7 @@ bool HashBuild::finishHashBuild() { otherTables.reserve(peers.size()); SpillPartitionSet spillPartitions; for (auto* build : otherBuilds) { - std::unique_ptr spiller; + std::unique_ptr spiller; { std::lock_guard l(build->mutex_); VELOX_CHECK( @@ -785,8 +786,8 @@ bool HashBuild::finishHashBuild() { void HashBuild::ensureTableFits(uint64_t numRows) { // NOTE: we don't need memory reservation if all the partitions have been // spilled as nothing need to be built. - if (!canSpill() || spiller_ == nullptr || spiller_->isAllSpilled() || - numRows == 0) { + if (!canSpill() || spiller_ == nullptr || + spiller_->state().isAllPartitionSpilled() || numRows == 0) { return; } @@ -811,7 +812,7 @@ void HashBuild::ensureTableFits(uint64_t numRows) { // If reservation triggers the spilling of 'HashBuild' operator itself, we // will no longer need the reserved memory for building hash table as the // table is spilled. - if (spiller_->isAllSpilled()) { + if (spiller_->state().isAllPartitionSpilled()) { pool()->release(); } return; @@ -920,7 +921,7 @@ void HashBuild::addRuntimeStats() { } // Add max spilling level stats if spilling has been triggered. - if (spiller_ != nullptr && spiller_->isAnySpilled()) { + if (spiller_ != nullptr && spiller_->state().isAnyPartitionSpilled()) { lockedStats->addRuntimeStat( "maxSpillLevel", RuntimeCounter( @@ -1094,7 +1095,7 @@ void HashBuild::reclaim( } } - std::vector spillers; + std::vector spillers; for (auto* op : operators) { HashBuild* buildOp = static_cast(op); spillers.push_back(buildOp->spiller_.get()); @@ -1136,4 +1137,70 @@ void HashBuild::close() { table_.reset(); } } + +HashBuildSpiller::HashBuildSpiller( + core::JoinType joinType, + RowContainer* container, + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + bits, + 0, + {}, + spillConfig->maxFileSize, + spillConfig->maxSpillRunRows, + spillConfig, + spillStats), + spillProbeFlag_(needRightSideJoin(joinType)) { + VELOX_CHECK(container_->accumulators().empty()); +} + +void HashBuildSpiller::spill() { + SpillerBase::spill(nullptr); +} + +void HashBuildSpiller::spill( + uint32_t partition, + const RowVectorPtr& spillVector) { + VELOX_CHECK(!finalized_); + if (FOLLY_UNLIKELY(!state_.isPartitionSpilled(partition))) { + VELOX_FAIL( + "Can't spill vector to a non-spilling partition: {}, {}", + partition, + toString()); + } + VELOX_DCHECK(spillRuns_[partition].rows.empty()); + + if (FOLLY_UNLIKELY(spillVector == nullptr)) { + return; + } + + state_.appendToPartition(partition, spillVector); +} + +void HashBuildSpiller::extractSpill( + folly::Range rows, + facebook::velox::RowVectorPtr& resultPtr) { + if (resultPtr == nullptr) { + resultPtr = BaseVector::create( + rowType_, rows.size(), memory::spillMemoryPool()); + } else { + resultPtr->prepareForReuse(); + resultPtr->resize(rows.size()); + } + + auto* result = resultPtr.get(); + const auto& types = container_->columnTypes(); + for (auto i = 0; i < types.size(); ++i) { + container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i)); + } + if (spillProbeFlag_) { + container_->extractProbedFlags( + rows.data(), rows.size(), false, false, result->childAt(types.size())); + } +} } // namespace facebook::velox::exec diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 0b12554afc8d..5055186ca375 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -25,6 +25,7 @@ #include "velox/expression/Expr.h" namespace facebook::velox::exec { +class HashBuildSpiller; /// Builds a hash table for use in HashProbe. This is the final /// Operator in a build side Driver. The build side pipeline has @@ -276,7 +277,7 @@ class HashBuild final : public Operator { // This can be nullptr if either spilling is not allowed or it has been // transferred to the last hash build operator while in kWaitForBuild state or // it has been cleared to set up a new one for recursive spilling. - std::unique_ptr spiller_; + std::unique_ptr spiller_; // Used to read input from previously spilled data for restoring. std::unique_ptr> spillInputReader_; @@ -308,6 +309,40 @@ inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) { os << HashBuild::stateName(state); return os; } + +class HashBuildSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "HashBuildSpiller"; + + HashBuildSpiller( + core::JoinType joinType, + RowContainer* container, + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + /// Invoked to spill all the rows stored in the row container of the hash + /// build. + void spill(); + + /// Invoked to spill a given partition from the input vector 'spillVector'. + void spill(uint32_t partition, const RowVectorPtr& spillVector); + + private: + void extractSpill(folly::Range rows, RowVectorPtr& resultPtr) + override; + + bool needSort() const override { + return false; + } + + std::string type() const override { + return std::string(kType); + } + + const bool spillProbeFlag_; +}; } // namespace facebook::velox::exec template <> diff --git a/velox/exec/HashJoinBridge.cpp b/velox/exec/HashJoinBridge.cpp index 4c8b4e54c5d6..0171d251a74c 100644 --- a/velox/exec/HashJoinBridge.cpp +++ b/velox/exec/HashJoinBridge.cpp @@ -16,6 +16,7 @@ #include "velox/exec/HashJoinBridge.h" #include "velox/common/memory/MemoryArbitrator.h" +#include "velox/exec/HashBuild.h" namespace facebook::velox::exec { namespace { @@ -91,15 +92,14 @@ namespace { // 'table' to parallelize the table spilling. The function spills all the rows // from the row container and returns the spiller for the caller to collect the // spilled partitions and stats. -std::unique_ptr createSpiller( +std::unique_ptr createSpiller( RowContainer* subTableRows, core::JoinType joinType, const RowTypePtr& tableType, const HashBitRange& hashBitRange, const common::SpillConfig* spillConfig, folly::Synchronized* stats) { - return std::make_unique( - Spiller::Type::kHashJoinBuild, + return std::make_unique( joinType, subTableRows, hashJoinTableSpillType(tableType, joinType), @@ -110,7 +110,7 @@ std::unique_ptr createSpiller( } // namespace std::vector> spillHashJoinTable( - const std::vector& spillers, + const std::vector& spillers, const common::SpillConfig* spillConfig) { VELOX_CHECK_NOT_NULL(spillConfig); auto spillExecutor = spillConfig->executor; @@ -172,8 +172,8 @@ SpillPartitionSet spillHashJoinTable( return {}; } - std::vector> spillersHolder; - std::vector spillers; + std::vector> spillersHolder; + std::vector spillers; const auto rowContainers = table->allRows(); const auto tableType = hashJoinTableType(joinNode); for (auto* rowContainer : rowContainers) { diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index d59c564cc7af..16a79de3d4fd 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -21,6 +21,7 @@ #include "velox/exec/Spill.h" namespace facebook::velox::exec { +class HashBuildSpiller; namespace test { class HashJoinBridgeTestHelper; @@ -209,19 +210,19 @@ RowTypePtr hashJoinTableType( const std::shared_ptr& joinNode); struct HashJoinTableSpillResult { - Spiller* spiller{nullptr}; + HashBuildSpiller* spiller{nullptr}; const std::exception_ptr error{nullptr}; explicit HashJoinTableSpillResult(std::exception_ptr _error) : error(_error) {} - explicit HashJoinTableSpillResult(Spiller* _spiller) : spiller(_spiller) {} + explicit HashJoinTableSpillResult(HashBuildSpiller* _spiller) : spiller(_spiller) {} }; /// Invoked to spill the hash table from a set of spillers. If 'spillExecutor' /// is provided, then we do parallel spill. This is used by hash build to spill /// a partially built hash join table. std::vector> spillHashJoinTable( - const std::vector& spillers, + const std::vector& spillers, const common::SpillConfig* spillConfig); /// Invoked to spill 'table' and returns spilled partitions. This is used by diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 8fe132dc0dbb..6e6bebb13a3a 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -250,8 +250,7 @@ void HashProbe::maybeSetupInputSpiller( // If 'spillInputPartitionIds_' is not empty, then we set up a spiller to // spill the incoming probe inputs. - inputSpiller_ = std::make_unique( - Spiller::Type::kHashJoinProbe, + inputSpiller_ = std::make_unique( probeType_, HashBitRange( spillInputPartitionIds_.begin()->partitionBitOffset(), @@ -536,7 +535,7 @@ void HashProbe::spillInput(RowVectorPtr& input) { for (auto row = 0; row < numInput; ++row) { const auto partition = singlePartition.has_value() ? singlePartition.value() : spillPartitions_[row]; - if (!inputSpiller_->isSpilled(partition)) { + if (!inputSpiller_->state().isPartitionSpilled(partition)) { rawNonSpillInputIndicesBuffer_[numNonSpillingInput++] = row; continue; } @@ -556,7 +555,7 @@ void HashProbe::spillInput(RowVectorPtr& input) { if (numSpillInputs == 0) { continue; } - VELOX_CHECK(inputSpiller_->isSpilled(partition)); + VELOX_CHECK(inputSpiller_->state().isPartitionSpilled(partition)); inputSpiller_->spill( partition, wrap(numSpillInputs, spillInputIndicesBuffers_[partition], input)); @@ -1630,7 +1629,7 @@ void HashProbe::noMoreInputInternal() { VELOX_CHECK_NOT_NULL(inputSpiller_); VELOX_CHECK_EQ( spillInputPartitionIds_.size(), - inputSpiller_->spilledPartitionSet().size()); + inputSpiller_->state().spilledPartitionSet().size()); inputSpiller_->finishSpill(inputSpillPartitionSet_); VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeNanos, 0); } @@ -1889,8 +1888,7 @@ void HashProbe::spillOutput() { return; } // We spill all the outputs produced from 'input_' into a single partition. - auto outputSpiller = std::make_unique( - Spiller::Type::kHashJoinProbe, + auto outputSpiller = std::make_unique( outputType_, HashBitRange{}, spillConfig(), @@ -1918,7 +1916,7 @@ void HashProbe::spillOutput() { isRightSemiFilterJoin(joinType_) || isRightSemiProjectJoin(joinType_)); VELOX_CHECK((output == nullptr) && (input_ != nullptr)); } - VELOX_CHECK_LE(outputSpiller->spilledPartitionSet().size(), 1); + VELOX_CHECK_LE(outputSpiller->state().spilledPartitionSet().size(), 1); VELOX_CHECK(spillOutputPartitionSet_.empty()); outputSpiller->finishSpill(spillOutputPartitionSet_); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 81548b8f80cb..eef2b802a49c 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -649,7 +649,7 @@ class HashProbe : public Operator { // 'inputSpiller_' is created if some part of build-side rows have been // spilled. It is used to spill probe-side rows if the corresponding // build-side rows have been spilled. - std::unique_ptr inputSpiller_; + std::unique_ptr inputSpiller_; // If not empty, the probe inputs with partition id set in // 'spillInputPartitionIds_' needs to spill. It is set along with 'spiller_' diff --git a/velox/exec/OrderBy.h b/velox/exec/OrderBy.h index fe8e2e61e9b2..1765b553772d 100644 --- a/velox/exec/OrderBy.h +++ b/velox/exec/OrderBy.h @@ -22,7 +22,6 @@ #include "velox/exec/Spiller.h" namespace facebook::velox::exec { - /// OrderBy operator implementation: OrderBy stores all its inputs in a /// RowContainer as the inputs are added. Until all inputs are available, /// it blocks the pipeline. Once all inputs are available, it sorts pointers diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 176d518b8ce1..54f053e48e19 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -394,15 +394,13 @@ void RowNumber::reclaim( } SpillPartitionNumSet RowNumber::spillHashTable() { - // TODO Replace joinPartitionBits and Spiller::Type::kHashJoinBuild. VELOX_CHECK_NOT_NULL(table_); auto columnTypes = table_->rows()->columnTypes(); auto tableType = ROW(std::move(columnTypes)); const auto& spillConfig = spillConfig_.value(); - auto hashTableSpiller = std::make_unique( - Spiller::Type::kRowNumber, + auto hashTableSpiller = std::make_unique( table_->rows(), tableType, spillPartitionBits_, @@ -423,9 +421,7 @@ void RowNumber::setupInputSpiller( const auto& spillConfig = spillConfig_.value(); - // TODO Replace Spiller::Type::kHashJoinProbe. - inputSpiller_ = std::make_unique( - Spiller::Type::kHashJoinProbe, + inputSpiller_ = std::make_unique( inputType_, spillPartitionBits_, &spillConfig, @@ -535,4 +531,24 @@ void RowNumber::setSpillPartitionBits( startPartitionBitOffset + spillConfig_->numPartitionBits); } +RowNumberHashTableSpiller::RowNumberHashTableSpiller( + RowContainer* container, + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + bits, + 0, + {}, + spillConfig->maxFileSize, + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} + +void RowNumberHashTableSpiller::spill() { + SpillerBase::spill(nullptr); +} } // namespace facebook::velox::exec diff --git a/velox/exec/RowNumber.h b/velox/exec/RowNumber.h index fa87d5391e8e..ee27d3d3cfbc 100644 --- a/velox/exec/RowNumber.h +++ b/velox/exec/RowNumber.h @@ -20,6 +20,8 @@ #include "velox/exec/Operator.h" namespace facebook::velox::exec { +class RowNumberHashTableSpiller; +class RowNumberInputSpiller; class RowNumber : public Operator { public: @@ -127,7 +129,7 @@ class RowNumber : public Operator { SpillPartitionSet spillHashTablePartitionSet_; // Spiller for input received after spilling has been triggered. - std::unique_ptr inputSpiller_; + std::unique_ptr inputSpiller_; // Used to restore previously spilled input. std::unique_ptr> spillInputReader_; @@ -143,4 +145,27 @@ class RowNumber : public Operator { bool exceededMaxSpillLevelLimit_{false}; }; + +class RowNumberHashTableSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "RowNumberHashTableSpiller"; + + RowNumberHashTableSpiller( + RowContainer* container, + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + void spill(); + + private: + bool needSort() const override { + return false; + } + + std::string type() const override { + return std::string(kType); + } +}; } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index fe12fbecf1c1..c8fa360ed151 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -16,6 +16,7 @@ #include "SortBuffer.h" #include "velox/exec/MemoryReclaimer.h" +#include "velox/exec/Spiller.h" namespace facebook::velox::exec { @@ -110,6 +111,8 @@ void SortBuffer::noMoreInput() { velox::common::testutil::TestValue::adjust( "facebook::velox::exec::SortBuffer::noMoreInput", this); VELOX_CHECK(!noMoreInput_); + VELOX_CHECK_NULL(outputSpiller_); + // It may trigger spill, make sure it's triggered before noMoreInput_ is set. ensureSortFits(); @@ -120,7 +123,7 @@ void SortBuffer::noMoreInput() { return; } - if (spiller_ == nullptr) { + if (inputSpiller_ == nullptr) { VELOX_CHECK_EQ(numInputRows_, data_->numRows()); updateEstimatedOutputRowSize(); // Sort the pointers to the rows in RowContainer (data_) instead of sorting @@ -160,7 +163,7 @@ RowVectorPtr SortBuffer::getOutput(vector_size_t maxOutputRows) { std::min(numInputRows_ - numOutputRows_, maxOutputRows); ensureOutputFits(batchSize); prepareOutput(batchSize); - if (spiller_ != nullptr) { + if (hasSpilled()) { getOutputWithSpill(); } else { getOutputWithoutSpill(); @@ -168,6 +171,14 @@ RowVectorPtr SortBuffer::getOutput(vector_size_t maxOutputRows) { return output_; } +bool SortBuffer::hasSpilled() const { + if (inputSpiller_ != nullptr) { + VELOX_CHECK_NULL(outputSpiller_); + return true; + } + return outputSpiller_ != nullptr; +} + void SortBuffer::spill() { VELOX_CHECK_NOT_NULL( spillConfig_, "spill config is null when SortBuffer spill is called"); @@ -263,7 +274,7 @@ void SortBuffer::ensureOutputFits(vector_size_t batchSize) { return; } - if (!estimatedOutputRowSize_.has_value() || spiller_ != nullptr) { + if (!estimatedOutputRowSize_.has_value() || hasSpilled()) { return; } @@ -294,7 +305,7 @@ void SortBuffer::ensureSortFits() { return; } - if (numInputRows_ == 0 || spiller_ != nullptr) { + if (numInputRows_ == 0 || inputSpiller_ != nullptr) { return; } @@ -333,10 +344,9 @@ void SortBuffer::updateEstimatedOutputRowSize() { } void SortBuffer::spillInput() { - if (spiller_ == nullptr) { + if (inputSpiller_ == nullptr) { VELOX_CHECK(!noMoreInput_); - spiller_ = std::make_unique( - Spiller::Type::kOrderByInput, + inputSpiller_ = std::make_unique( data_.get(), spillerStoreType_, data_->keyTypes().size(), @@ -344,12 +354,12 @@ void SortBuffer::spillInput() { spillConfig_, spillStats_); } - spiller_->spill(); + inputSpiller_->spill(); data_->clear(); } void SortBuffer::spillOutput() { - if (spiller_ != nullptr) { + if (hasSpilled()) { // Already spilled. return; } @@ -358,17 +368,13 @@ void SortBuffer::spillOutput() { return; } - spiller_ = std::make_unique( - Spiller::Type::kOrderByOutput, - data_.get(), - spillerStoreType_, - spillConfig_, - spillStats_); - auto spillRows = Spiller::SpillRows( + outputSpiller_ = std::make_unique( + data_.get(), spillerStoreType_, spillConfig_, spillStats_); + auto spillRows = SpillerBase::SpillRows( sortedRows_.begin() + numOutputRows_, sortedRows_.end(), *memory::spillMemoryPool()); - spiller_->spill(spillRows); + outputSpiller_->spill(spillRows); data_->clear(); sortedRows_.clear(); sortedRows_.shrink_to_fit(); @@ -391,7 +397,7 @@ void SortBuffer::prepareOutput(vector_size_t batchSize) { child->resize(batchSize); } - if (spiller_ != nullptr) { + if (hasSpilled()) { spillSources_.resize(batchSize); spillSourceRows_.resize(batchSize); prepareOutputWithSpill(); @@ -461,12 +467,18 @@ void SortBuffer::getOutputWithSpill() { void SortBuffer::finishSpill() { VELOX_CHECK_NULL(spillMerger_); VELOX_CHECK(spillPartitionSet_.empty()); - spiller_->finishSpill(spillPartitionSet_); + VELOX_CHECK_EQ( + !!(outputSpiller_ != nullptr) + !!(inputSpiller_ != nullptr), + 1, + "inputSpiller_ {}, outputSpiller_ {}", + inputSpiller_ == nullptr ? "set" : "null", + outputSpiller_ == nullptr ? "set" : "null"); + outputSpiller_->finishSpill(spillPartitionSet_); VELOX_CHECK_EQ(spillPartitionSet_.size(), 1); } void SortBuffer::prepareOutputWithSpill() { - VELOX_CHECK_NOT_NULL(spiller_); + VELOX_CHECK(hasSpilled()); if (spillMerger_ != nullptr) { VELOX_CHECK(spillPartitionSet_.empty()); return; @@ -477,5 +489,4 @@ void SortBuffer::prepareOutputWithSpill() { spillConfig_->readBufferSize, pool(), spillStats_); spillPartitionSet_.clear(); } - } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index eafe29f9e16c..9804e2f85890 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -21,10 +21,11 @@ #include "velox/exec/OperatorUtils.h" #include "velox/exec/PrefixSort.h" #include "velox/exec/RowContainer.h" -#include "velox/exec/Spill.h" #include "velox/vector/BaseVector.h" namespace facebook::velox::exec { +class SortInputSpiller; +class SortOutputSpiller; /// A utility class to accumulate data inside and output the sorted result. /// Spilling would be triggered if spilling is enabled and memory usage exceeds @@ -71,38 +72,59 @@ class SortBuffer { private: // Ensures there is sufficient memory reserved to process 'input'. void ensureInputFits(const VectorPtr& input); + // Reserves memory for output processing. If reservation cannot be increased, // spills enough to make output fit. void ensureOutputFits(vector_size_t outputBatchSize); - // Reserves memory for sort. If reservation cannot be increased, - // spills enough to make output fit. + + // Reserves memory for sort. If reservation cannot be increased, spills enough + // to make output fit. void ensureSortFits(); + void updateEstimatedOutputRowSize(); + // Invoked to initialize or reset the reusable output buffer to get output. void prepareOutput(vector_size_t outputBatchSize); + // Invoked to initialize reader to read the spilled data from storage for // output processing. void prepareOutputWithSpill(); + void getOutputWithoutSpill(); + void getOutputWithSpill(); + // Spill during input stage. void spillInput(); + // Spill during output stage. void spillOutput(); + // Finish spill, and we shouldn't get any rows from non-spilled partition as // there is only one hash partition for SortBuffer. void finishSpill(); + // Returns true if the sort buffer has spilled, regardless of during input or + // output processing. If spilled() is true, it means the sort buffer is in + // minimal memory mode and could not be spilled further. + bool hasSpilled() const; + const RowTypePtr input_; + const std::vector sortCompareFlags_; + velox::memory::MemoryPool* const pool_; + // The flag is passed from the associated operator such as OrderBy or // TableWriter to indicate if this sort buffer object is under non-reclaimable // execution section or not. tsan_atomic* const nonReclaimableSection_; + // Configuration settings for prefix-sort. const common::PrefixSortConfig prefixSortConfig_; + const common::SpillConfig* const spillConfig_; + folly::Synchronized* const spillStats_; // The column projection map between 'input_' and 'spillerStoreType_' as sort @@ -112,30 +134,41 @@ class SortBuffer { // Indicates no more input. Once it is set, addInput() can't be called on this // sort buffer object. bool noMoreInput_ = false; + // The number of received input rows. uint64_t numInputRows_ = 0; + // Used to store the input data in row format. std::unique_ptr data_; + std::vector> sortedRows_; // The data type of the rows stored in 'data_' and spilled on disk. The // sort key columns are stored first then the non-sorted data columns. RowTypePtr spillerStoreType_; - std::unique_ptr spiller_; + + std::unique_ptr inputSpiller_; + + std::unique_ptr outputSpiller_; + SpillPartitionSet spillPartitionSet_; + // Used to merge the sorted runs from in-memory rows and spilled rows on disk. std::unique_ptr> spillMerger_; + // Records the source rows to copy to 'output_' in order. std::vector spillSources_; + std::vector spillSourceRows_; // Reusable output vector. RowVectorPtr output_; + // Estimated size of a single output row by using the max // 'data_->estimateRowSize()' across all accumulated data set. std::optional estimatedOutputRowSize_{}; + // The number of rows that has been returned. uint64_t numOutputRows_{0}; }; - } // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index be08fb7109b6..927afefe9f3e 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -180,9 +180,7 @@ void SortWindowBuild::ensureSortFits() { void SortWindowBuild::setupSpiller() { VELOX_CHECK_NULL(spiller_); - spiller_ = std::make_unique( - // TODO Replace Spiller::Type::kOrderBy. - Spiller::Type::kOrderByInput, + spiller_ = std::make_unique( data_.get(), inputType_, compareFlags_.size(), @@ -201,6 +199,13 @@ void SortWindowBuild::spill() { data_->pool()->release(); } +std::optional SortWindowBuild::spilledStats() const { + if (spiller_ == nullptr) { + return std::nullopt; + } + return spiller_->stats(); +} + // Use double front and back search algorithm to find next partition start row. // It is more efficient than linear or binary search. // This algorithm is described at @@ -371,5 +376,4 @@ bool SortWindowBuild::hasNextPartition() { return partitionStartRows_.size() > 0 && currentPartition_ < static_cast(partitionStartRows_.size() - 2); } - } // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index 45e7b3390cb1..72875094007a 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -21,7 +21,6 @@ #include "velox/exec/WindowBuild.h" namespace facebook::velox::exec { - // Sorts input data of the Window by {partition keys, sort keys} // to identify window partitions. This sort fully orders // rows as needed for window function computation. @@ -48,12 +47,7 @@ class SortWindowBuild : public WindowBuild { void spill() override; - std::optional spilledStats() const override { - if (spiller_ == nullptr) { - return std::nullopt; - } - return spiller_->stats(); - } + std::optional spilledStats() const override; void noMoreInput() override; @@ -123,10 +117,9 @@ class SortWindowBuild : public WindowBuild { vector_size_t currentPartition_ = -1; // Spiller for contents of the 'data_'. - std::unique_ptr spiller_; + std::unique_ptr spiller_; // Used to sort-merge spilled data. std::unique_ptr> merge_; }; - } // namespace facebook::velox::exec diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index a2266aa7315a..00dc3683f636 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -32,7 +32,7 @@ #include "velox/vector/VectorStream.h" namespace facebook::velox::exec { -// A source of sorted spilled RowVectors coming either from a file or memory. +/// A source of sorted spilled RowVectors coming either from a file or memory. class SpillMergeStream : public MergeStream { public: SpillMergeStream() = default; @@ -135,7 +135,7 @@ class SpillMergeStream : public MergeStream { SelectivityVector rows_; }; -// A source of spilled RowVectors coming from a file. +/// A source of spilled RowVectors coming from a file. class FileSpillMergeStream : public SpillMergeStream { public: static std::unique_ptr create( @@ -448,7 +448,7 @@ class SpillState { // the max spill bytes limit. common::UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb_; - /// Prefix for spill files. + // Prefix for spill files. const std::string fileNamePrefix_; const int32_t maxPartitions_; const int32_t numSortKeys_; diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 54ac93b5d3ae..df0d122c2329 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -27,470 +27,135 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { -namespace { -#define CHECK_NOT_FINALIZED() \ - VELOX_CHECK(!finalized_, "Spiller has been finalized") -#define CHECK_FINALIZED() \ - VELOX_CHECK(finalized_, "Spiller hasn't been finalized yet"); -} // namespace - -Spiller::Spiller( - Type type, +SpillerBase::SpillerBase( RowContainer* container, RowTypePtr rowType, + HashBitRange bits, int32_t numSortingKeys, const std::vector& sortCompareFlags, + uint64_t targetFileSize, + uint64_t maxSpillRunRows, const common::SpillConfig* spillConfig, folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - HashBitRange{}, - numSortingKeys, - sortCompareFlags, - false, + : container_(container), + executor_(spillConfig->executor), + bits_(bits), + rowType_(rowType), + maxSpillRunRows_(maxSpillRunRows), + spillStats_(spillStats), + state_( spillConfig->getSpillDirPathCb, spillConfig->updateAndCheckSpillLimitCb, spillConfig->fileNamePrefix, - std::numeric_limits::max(), - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK_EQ( - type_, - Type::kOrderByInput, - "Unexpected spiller type: {}", - typeName(type_)); - VELOX_CHECK_EQ(state_.maxPartitions(), 1); -} - -Spiller::Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - const HashBitRange& hashBitRange, - int32_t numSortingKeys, - const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - hashBitRange, + bits.numPartitions(), numSortingKeys, sortCompareFlags, - false, - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, - std::numeric_limits::max(), - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK( - type_ == Type::kOrderByInput || type_ == Type::kAggregateInput, - "Unexpected spiller type: {}", - typeName(type_)); - VELOX_CHECK_EQ(state_.targetFileSize(), std::numeric_limits::max()); -} - -Spiller::Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - HashBitRange{}, - 0, - {}, - false, - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, - std::numeric_limits::max(), + targetFileSize, spillConfig->writeBufferSize, spillConfig->compressionKind, spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK( - type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput, - "Unexpected spiller type: {}", - typeName(type_)); - VELOX_CHECK_EQ(state_.maxPartitions(), 1); - VELOX_CHECK_EQ(state_.targetFileSize(), std::numeric_limits::max()); + memory::spillMemoryPool(), + spillStats, + spillConfig->fileCreateConfig) { + spillRuns_.reserve(state_.maxPartitions()); + for (int i = 0; i < state_.maxPartitions(); ++i) { + spillRuns_.emplace_back(*memory::spillMemoryPool()); + } } -Spiller::Spiller( - Type type, +NoRowContainerSpiller::NoRowContainerSpiller( RowTypePtr rowType, HashBitRange bits, const common::SpillConfig* spillConfig, folly::Synchronized* spillStats) - : Spiller( - type, + : SpillerBase( nullptr, std::move(rowType), bits, 0, {}, - false, - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, spillConfig->maxFileSize, - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, 0, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK_EQ( - type_, - Type::kHashJoinProbe, - "Unexpected spiller type: {}", - typeName(type_)); -} - -Spiller::Spiller( - Type type, - core::JoinType joinType, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - bits, - 0, - {}, - needRightSideJoin(joinType), - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, - spillConfig->maxFileSize, - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK_EQ(type_, Type::kHashJoinBuild); - VELOX_CHECK(isHashJoinTableSpillType(rowType_, joinType)); -} - -Spiller::Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats) - : Spiller( - type, - container, - std::move(rowType), - bits, - 0, - {}, - false, - spillConfig->getSpillDirPathCb, - spillConfig->updateAndCheckSpillLimitCb, - spillConfig->fileNamePrefix, - spillConfig->maxFileSize, - spillConfig->writeBufferSize, - spillConfig->compressionKind, - spillConfig->prefixSortConfig, - spillConfig->executor, - spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig, - spillStats) { - VELOX_CHECK_EQ(type_, Type::kRowNumber); -} - -Spiller::Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - int32_t numSortingKeys, - const std::vector& sortCompareFlags, - bool recordProbedFlag, - const common::GetSpillDirectoryPathCB& getSpillDirPathCb, - const common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb, - const std::string& fileNamePrefix, - uint64_t targetFileSize, - uint64_t writeBufferSize, - common::CompressionKind compressionKind, - const std::optional& prefixSortConfig, - folly::Executor* executor, - uint64_t maxSpillRunRows, - const std::string& fileCreateConfig, - folly::Synchronized* spillStats) - : type_(type), - container_(container), - executor_(executor), - bits_(bits), - rowType_(std::move(rowType)), - spillProbedFlag_(recordProbedFlag), - maxSpillRunRows_(maxSpillRunRows), - spillStats_(spillStats), - state_( - getSpillDirPathCb, - updateAndCheckSpillLimitCb, - fileNamePrefix, - bits.numPartitions(), - numSortingKeys, - sortCompareFlags, - targetFileSize, - writeBufferSize, - compressionKind, - prefixSortConfig, - memory::spillMemoryPool(), - spillStats, - fileCreateConfig) { - TestValue::adjust("facebook::velox::exec::Spiller", this); - - VELOX_CHECK(!spillProbedFlag_ || type_ == Type::kHashJoinBuild); - VELOX_CHECK_EQ(container_ == nullptr, type_ == Type::kHashJoinProbe); - spillRuns_.reserve(state_.maxPartitions()); - for (int i = 0; i < state_.maxPartitions(); ++i) { - spillRuns_.emplace_back(*memory::spillMemoryPool()); - } -} - -void Spiller::extractSpill(folly::Range rows, RowVectorPtr& resultPtr) { - if (resultPtr == nullptr) { - resultPtr = BaseVector::create( - rowType_, rows.size(), memory::spillMemoryPool()); - } else { - resultPtr->prepareForReuse(); - resultPtr->resize(rows.size()); - } - - auto* result = resultPtr.get(); - const auto& types = container_->columnTypes(); - for (auto i = 0; i < types.size(); ++i) { - container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i)); - } - const auto& accumulators = container_->accumulators(); - column_index_t accumulatorColumnOffset = types.size(); - if (spillProbedFlag_) { - container_->extractProbedFlags( - rows.data(), rows.size(), false, false, result->childAt(types.size())); - ++accumulatorColumnOffset; - } - for (auto i = 0; i < accumulators.size(); ++i) { - accumulators[i].extractForSpill( - rows, result->childAt(i + accumulatorColumnOffset)); - } -} - -int64_t Spiller::extractSpillVector( - SpillRows& rows, - int32_t maxRows, - int64_t maxBytes, - RowVectorPtr& spillVector, - size_t& nextBatchIndex) { - VELOX_CHECK_NE(type_, Type::kHashJoinProbe); - uint64_t extractNs{0}; - auto limit = std::min(rows.size() - nextBatchIndex, maxRows); - VELOX_CHECK(!rows.empty()); - int32_t numRows = 0; - int64_t bytes = 0; - { - NanosecondTimer timer(&extractNs); - for (; numRows < limit; ++numRows) { - bytes += container_->rowSize(rows[nextBatchIndex + numRows]); - if (bytes > maxBytes) { - // Increment because the row that went over the limit is part - // of the result. We must spill at least one row. - ++numRows; - break; - } - } - extractSpill(folly::Range(&rows[nextBatchIndex], numRows), spillVector); - nextBatchIndex += numRows; - } - updateSpillExtractVectorTime(extractNs); - return bytes; -} + spillConfig, + spillStats) {} -namespace { -// A stream of ordered rows being read from the in memory -// container. This is the part of a spillable range that is not yet -// spilled when starting to produce output. This is only used for -// sorted spills since for hash join spilling we just use the data in -// the RowContainer as is. -class RowContainerSpillMergeStream : public SpillMergeStream { - public: - RowContainerSpillMergeStream( - int32_t numSortKeys, - const std::vector& sortCompareFlags, - Spiller::SpillRows&& rows, - Spiller& spiller) - : numSortKeys_(numSortKeys), - sortCompareFlags_(sortCompareFlags), - rows_(std::move(rows)), - spiller_(spiller) { - if (!rows_.empty()) { - RowContainerSpillMergeStream::nextBatch(); - } - } - - uint32_t id() const override { - // Returns the max uint32_t as the special id for in-memory spill merge - // stream. - return std::numeric_limits::max(); - } +void SpillerBase::spill(const RowContainerIterator* startRowIter) { + VELOX_CHECK(!finalized_); - private: - int32_t numSortKeys() const override { - return numSortKeys_; - } - - const std::vector& sortCompareFlags() const override { - return sortCompareFlags_; - } + markAllPartitionsSpilled(); - void nextBatch() override { - // Extracts up to 64 rows at a time. Small batch size because may - // have wide data and no advantage in large size for narrow data - // since this is all processed row by row. - static constexpr vector_size_t kMaxRows = 64; - constexpr uint64_t kMaxBytes = 1 << 18; - if (nextBatchIndex_ >= rows_.size()) { - index_ = 0; - size_ = 0; - return; - } - spiller_.extractSpillVector( - rows_, kMaxRows, kMaxBytes, rowVector_, nextBatchIndex_); - size_ = rowVector_->size(); - index_ = 0; + RowContainerIterator rowIter; + if (startRowIter != nullptr) { + rowIter = *startRowIter; } - const int32_t numSortKeys_; - const std::vector sortCompareFlags_; - - Spiller::SpillRows rows_; - Spiller& spiller_; - size_t nextBatchIndex_ = 0; -}; -} // namespace - -std::unique_ptr Spiller::spillMergeStreamOverRows( - int32_t partition) { - CHECK_FINALIZED(); - VELOX_CHECK_LT(partition, state_.maxPartitions()); + bool lastRun{false}; + do { + lastRun = fillSpillRuns(&rowIter); + runSpill(lastRun); + } while (!lastRun); - if (!state_.isPartitionSpilled(partition)) { - return nullptr; - } - // Skip the merge stream from row container if it is empty. - if (spillRuns_[partition].rows.empty()) { - return nullptr; - } - ensureSorted(spillRuns_[partition]); - return std::make_unique( - container_->keyTypes().size(), - state_.sortCompareFlags(), - std::move(spillRuns_[partition].rows), - *this); + checkEmptySpillRuns(); } -void Spiller::ensureSorted(SpillRun& run) { - // The spill data of a hash join doesn't need to be sorted. - if (run.sorted || !needSort()) { - return; - } +bool SpillerBase::fillSpillRuns(RowContainerIterator* iterator) { + checkEmptySpillRuns(); - uint64_t sortTimeNs{0}; + bool lastRun{false}; + uint64_t execTimeNs{0}; { - NanosecondTimer timer(&sortTimeNs); + NanosecondTimer timer(&execTimeNs); - if (!state_.prefixSortConfig().has_value()) { - gfx::timsort( - run.rows.begin(), - run.rows.end(), - [&](const char* left, const char* right) { - return container_->compareRows( - left, right, state_.sortCompareFlags()) < 0; - }); - } else { - PrefixSort::sort( - container_, - state_.sortCompareFlags(), - state_.prefixSortConfig().value(), - memory::spillMemoryPool(), - run.rows); - } + // Number of rows to hash and divide into spill partitions at a time. + constexpr int32_t kHashBatchSize = 4096; + std::vector hashes(kHashBatchSize); + std::vector rows(kHashBatchSize); + const bool isSinglePartition = bits_.numPartitions() == 1; - run.sorted = true; - } + uint64_t totalRows{0}; + for (;;) { + const auto numRows = container_->listRows( + iterator, rows.size(), RowContainer::kUnlimited, rows.data()); + if (numRows == 0) { + lastRun = true; + break; + } - // NOTE: Always set a non-zero sort time to avoid flakiness in tests which - // check sort time. - updateSpillSortTime(std::max(1, sortTimeNs)); -} + // Calculate hashes for this batch of spill candidates. + auto rowSet = folly::Range(rows.data(), numRows); -std::unique_ptr Spiller::writeSpill(int32_t partition) { - VELOX_CHECK_NE(type_, Type::kHashJoinProbe); - // Target size of a single vector of spilled content. One of - // these will be materialized at a time for each stream of the - // merge. - constexpr int32_t kTargetBatchBytes = 1 << 18; // 256K - constexpr int32_t kTargetBatchRows = 64; + if (!isSinglePartition) { + for (auto i = 0; i < container_->keyTypes().size(); ++i) { + container_->hash(i, rowSet, i > 0, hashes.data()); + } + } - RowVectorPtr spillVector; - auto& run = spillRuns_[partition]; - try { - ensureSorted(run); - int64_t totalBytes = 0; - size_t written = 0; - while (written < run.rows.size()) { - extractSpillVector( - run.rows, kTargetBatchRows, kTargetBatchBytes, spillVector, written); - totalBytes += state_.appendToPartition(partition, spillVector); - if (totalBytes > state_.targetFileSize()) { - VELOX_CHECK(!needSort()); - state_.finishFile(partition); + // Put each in its run. + for (auto i = 0; i < numRows; ++i) { + // TODO: consider to cache the hash bits in row container so we only + // need to calculate them once. + const auto partition = isSinglePartition + ? 0 + : bits_.partition(hashes[i], state_.maxPartitions()); + VELOX_DCHECK_GE(partition, 0); + spillRuns_[partition].rows.push_back(rows[i]); + spillRuns_[partition].numBytes += container_->rowSize(rows[i]); + } + + totalRows += numRows; + if (maxSpillRunRows_ > 0 && totalRows >= maxSpillRunRows_) { + break; } } - return std::make_unique(partition, written, nullptr); - } catch (const std::exception&) { - // The exception is passed to the caller thread which checks this in - // advanceSpill(). - return std::make_unique( - partition, 0, std::current_exception()); } + updateSpillFillTime(execTimeNs); + + return lastRun; } -void Spiller::runSpill(bool lastRun) { +void SpillerBase::runSpill(bool lastRun) { ++spillStats_->wlock()->spillRuns; - VELOX_CHECK(type_ != Spiller::Type::kOrderByOutput || lastRun); std::vector>> writes; for (auto partition = 0; partition < spillRuns_.size(); ++partition) { @@ -538,88 +203,184 @@ void Spiller::runSpill(bool lastRun) { state_.finishFile(partition); } } +} - // For aggregation output / orderby output spiller, we expect only one spill - // call to spill all the rows starting from the specified row offset. - if (lastRun && - (type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput)) { - for (auto partition = 0; partition < spillRuns_.size(); ++partition) { - state_.finishFile(partition); +std::unique_ptr SpillerBase::writeSpill( + int32_t partition) { + // Target size of a single vector of spilled content. One of + // these will be materialized at a time for each stream of the + // merge. + constexpr int32_t kTargetBatchBytes = 1 << 18; // 256K + constexpr int32_t kTargetBatchRows = 64; + + RowVectorPtr spillVector; + auto& run = spillRuns_[partition]; + try { + ensureSorted(run); + int64_t totalBytes = 0; + size_t written = 0; + while (written < run.rows.size()) { + extractSpillVector( + run.rows, kTargetBatchRows, kTargetBatchBytes, spillVector, written); + totalBytes += state_.appendToPartition(partition, spillVector); + if (totalBytes > state_.targetFileSize()) { + VELOX_CHECK(!needSort()); + state_.finishFile(partition); + } } + return std::make_unique(partition, written, nullptr); + } catch (const std::exception&) { + // The exception is passed to the caller thread which checks this in + // advanceSpill(). + return std::make_unique( + partition, 0, std::current_exception()); } } -void Spiller::updateSpillFillTime(uint64_t timeNs) { - spillStats_->wlock()->spillFillTimeNanos += timeNs; - common::updateGlobalSpillFillTime(timeNs); +void SpillerBase::ensureSorted(SpillRun& run) { + // The spill data of a hash join doesn't need to be sorted. + if (run.sorted || !needSort()) { + return; + } + + uint64_t sortTimeNs{0}; + { + NanosecondTimer timer(&sortTimeNs); + + if (!state_.prefixSortConfig().has_value()) { + gfx::timsort( + run.rows.begin(), + run.rows.end(), + [&](const char* left, const char* right) { + return container_->compareRows( + left, right, state_.sortCompareFlags()) < 0; + }); + } else { + PrefixSort::sort( + container_, + state_.sortCompareFlags(), + state_.prefixSortConfig().value(), + memory::spillMemoryPool(), + run.rows); + } + + run.sorted = true; + } + + // NOTE: Always set a non-zero sort time to avoid flakiness in tests which + // check sort time. + updateSpillSortTime(std::max(1, sortTimeNs)); } -void Spiller::updateSpillSortTime(uint64_t timeNs) { - spillStats_->wlock()->spillSortTimeNanos += timeNs; - common::updateGlobalSpillSortTime(timeNs); +int64_t SpillerBase::extractSpillVector( + SpillRows& rows, + int32_t maxRows, + int64_t maxBytes, + RowVectorPtr& spillVector, + size_t& nextBatchIndex) { + uint64_t extractNs{0}; + auto limit = std::min(rows.size() - nextBatchIndex, maxRows); + VELOX_CHECK(!rows.empty()); + int32_t numRows = 0; + int64_t bytes = 0; + { + NanosecondTimer timer(&extractNs); + for (; numRows < limit; ++numRows) { + bytes += container_->rowSize(rows[nextBatchIndex + numRows]); + if (bytes > maxBytes) { + // Increment because the row that went over the limit is part + // of the result. We must spill at least one row. + ++numRows; + break; + } + } + extractSpill(folly::Range(&rows[nextBatchIndex], numRows), spillVector); + nextBatchIndex += numRows; + } + updateSpillExtractVectorTime(extractNs); + return bytes; +} + +void SpillerBase::extractSpill( + folly::Range rows, + RowVectorPtr& resultPtr) { + if (resultPtr == nullptr) { + resultPtr = BaseVector::create( + rowType_, rows.size(), memory::spillMemoryPool()); + } else { + resultPtr->prepareForReuse(); + resultPtr->resize(rows.size()); + } + + auto* result = resultPtr.get(); + const auto& types = container_->columnTypes(); + for (auto i = 0; i < types.size(); ++i) { + container_->extractColumn(rows.data(), rows.size(), i, result->childAt(i)); + } + const auto& accumulators = container_->accumulators(); + column_index_t accumulatorColumnOffset = types.size(); + for (auto i = 0; i < accumulators.size(); ++i) { + accumulators[i].extractForSpill( + rows, result->childAt(i + accumulatorColumnOffset)); + } } -void Spiller::updateSpillExtractVectorTime(uint64_t timeNs) { +void SpillerBase::updateSpillExtractVectorTime(uint64_t timeNs) { spillStats_->wlock()->spillExtractVectorTimeNanos += timeNs; common::updateGlobalSpillExtractVectorTime(timeNs); } -bool Spiller::needSort() const { - return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild && - type_ != Type::kRowNumber && type_ != Type::kAggregateOutput && - type_ != Type::kOrderByOutput; +void SpillerBase::updateSpillSortTime(uint64_t timeNs) { + spillStats_->wlock()->spillSortTimeNanos += timeNs; + common::updateGlobalSpillSortTime(timeNs); } -void Spiller::spill() { - return spill(nullptr); +void SpillerBase::checkEmptySpillRuns() const { + for (const auto& spillRun : spillRuns_) { + VELOX_CHECK(spillRun.rows.empty()); + } } -void Spiller::spill(const RowContainerIterator& startRowIter) { - VELOX_CHECK_EQ(type_, Type::kAggregateOutput); - return spill(&startRowIter); +void SpillerBase::updateSpillFillTime(uint64_t timeNs) { + spillStats_->wlock()->spillFillTimeNanos += timeNs; + common::updateGlobalSpillFillTime(timeNs); } -void Spiller::spill(const RowContainerIterator* startRowIter) { - CHECK_NOT_FINALIZED(); - VELOX_CHECK_NE(type_, Type::kHashJoinProbe); - VELOX_CHECK_NE(type_, Type::kOrderByOutput); - - markAllPartitionsSpilled(); +void SpillerBase::finishSpill(SpillPartitionSet& partitionSet) { + finalizeSpill(); - RowContainerIterator rowIter; - if (startRowIter != nullptr) { - rowIter = *startRowIter; + for (auto& partition : state_.spilledPartitionSet()) { + const SpillPartitionId partitionId(bits_.begin(), partition); + if (partitionSet.count(partitionId) == 0) { + partitionSet.emplace( + partitionId, + std::make_unique( + partitionId, state_.finish(partition))); + } else { + partitionSet[partitionId]->addFiles(state_.finish(partition)); + } } - - bool lastRun{false}; - do { - lastRun = fillSpillRuns(&rowIter); - runSpill(lastRun); - } while (!lastRun); - - checkEmptySpillRuns(); } -void Spiller::spill(SpillRows& rows) { - CHECK_NOT_FINALIZED(); - VELOX_CHECK_EQ(type_, Type::kOrderByOutput); - VELOX_CHECK(!rows.empty()); - - markAllPartitionsSpilled(); +common::SpillStats SpillerBase::stats() const { + return spillStats_->copy(); +} - fillSpillRun(rows); - runSpill(true); - checkEmptySpillRuns(); +std::string SpillerBase::toString() const { + return fmt::format( + "{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}", + type(), + rowType_->toString(), + state_.maxPartitions(), + finalized_); } -void Spiller::checkEmptySpillRuns() const { - for (const auto& spillRun : spillRuns_) { - VELOX_CHECK(spillRun.rows.empty()); - } +void SpillerBase::finalizeSpill() { + VELOX_CHECK(!finalized_); + finalized_ = true; } -void Spiller::markAllPartitionsSpilled() { +void SpillerBase::markAllPartitionsSpilled() { for (auto partition = 0; partition < state_.maxPartitions(); ++partition) { if (!state_.isPartitionSpilled(partition)) { state_.setPartitionSpilled(partition); @@ -627,13 +388,10 @@ void Spiller::markAllPartitionsSpilled() { } } -void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) { - CHECK_NOT_FINALIZED(); - VELOX_CHECK( - type_ == Type::kHashJoinProbe || type_ == Type::kHashJoinBuild || - type_ == Type::kRowNumber, - "Unexpected spiller type: {}", - typeName(type_)); +void NoRowContainerSpiller::spill( + uint32_t partition, + const RowVectorPtr& spillVector) { + VELOX_CHECK(!finalized_); if (FOLLY_UNLIKELY(!state_.isPartitionSpilled(partition))) { VELOX_FAIL( "Can't spill vector to a non-spilling partition: {}, {}", @@ -649,83 +407,32 @@ void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) { state_.appendToPartition(partition, spillVector); } -void Spiller::finishSpill(SpillPartitionSet& partitionSet) { - finalizeSpill(); - - for (auto& partition : state_.spilledPartitionSet()) { - const SpillPartitionId partitionId(bits_.begin(), partition); - if (partitionSet.count(partitionId) == 0) { - partitionSet.emplace( - partitionId, - std::make_unique( - partitionId, state_.finish(partition))); - } else { - partitionSet[partitionId]->addFiles(state_.finish(partition)); - } - } -} - -void Spiller::finalizeSpill() { - CHECK_NOT_FINALIZED(); - finalized_ = true; +void SortInputSpiller::spill() { + SpillerBase::spill(nullptr); } -bool Spiller::fillSpillRuns(RowContainerIterator* iterator) { - checkEmptySpillRuns(); - - bool lastRun{false}; - uint64_t execTimeNs{0}; - { - NanosecondTimer timer(&execTimeNs); - - // Number of rows to hash and divide into spill partitions at a time. - constexpr int32_t kHashBatchSize = 4096; - std::vector hashes(kHashBatchSize); - std::vector rows(kHashBatchSize); - const bool isSinglePartition = bits_.numPartitions() == 1; - - uint64_t totalRows{0}; - for (;;) { - const auto numRows = container_->listRows( - iterator, rows.size(), RowContainer::kUnlimited, rows.data()); - if (numRows == 0) { - lastRun = true; - break; - } - - // Calculate hashes for this batch of spill candidates. - auto rowSet = folly::Range(rows.data(), numRows); - - if (!isSinglePartition) { - for (auto i = 0; i < container_->keyTypes().size(); ++i) { - container_->hash(i, rowSet, i > 0, hashes.data()); - } - } - - // Put each in its run. - for (auto i = 0; i < numRows; ++i) { - // TODO: consider to cache the hash bits in row container so we only - // need to calculate them once. - const auto partition = isSinglePartition - ? 0 - : bits_.partition(hashes[i], state_.maxPartitions()); - VELOX_DCHECK_GE(partition, 0); - spillRuns_[partition].rows.push_back(rows[i]); - spillRuns_[partition].numBytes += container_->rowSize(rows[i]); - } +SortOutputSpiller::SortOutputSpiller( + RowContainer* container, + RowTypePtr rowType, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + HashBitRange{}, + 0, + {}, + std::numeric_limits::max(), + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} - totalRows += numRows; - if (maxSpillRunRows_ > 0 && totalRows >= maxSpillRunRows_) { - break; - } - } - } - updateSpillFillTime(execTimeNs); +void SortOutputSpiller::spill(SpillRows& rows) { + VELOX_CHECK(!finalized_); + VELOX_CHECK(!rows.empty()); - return lastRun; -} + markAllPartitionsSpilled(); -void Spiller::fillSpillRun(SpillRows& rows) { VELOX_CHECK_EQ(bits_.numPartitions(), 1); checkEmptySpillRuns(); uint64_t execTimeNs{0}; @@ -738,40 +445,16 @@ void Spiller::fillSpillRun(SpillRows& rows) { } } updateSpillFillTime(execTimeNs); + runSpill(true); + checkEmptySpillRuns(); } -std::string Spiller::toString() const { - return fmt::format( - "{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}", - typeName(type_), - rowType_->toString(), - state_.maxPartitions(), - finalized_); -} - -// static -std::string Spiller::typeName(Type type) { - switch (type) { - case Type::kOrderByInput: - return "ORDER_BY_INPUT"; - case Type::kOrderByOutput: - return "ORDER_BY_OUTPUT"; - case Type::kHashJoinBuild: - return "HASH_JOIN_BUILD"; - case Type::kHashJoinProbe: - return "HASH_JOIN_PROBE"; - case Type::kAggregateInput: - return "AGGREGATE_INPUT"; - case Type::kAggregateOutput: - return "AGGREGATE_OUTPUT"; - case Type::kRowNumber: - return "ROW_NUMBER"; - default: - VELOX_UNREACHABLE("Unknown type: {}", static_cast(type)); +void SortOutputSpiller::runSpill(bool lastRun) { + SpillerBase::runSpill(lastRun); + if (lastRun) { + for (auto partition = 0; partition < spillRuns_.size(); ++partition) { + state_.finishFile(partition); + } } } - -common::SpillStats Spiller::stats() const { - return spillStats_->copy(); -} } // namespace facebook::velox::exec diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 91a1352a205b..02965bd3e461 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -21,177 +21,26 @@ #include "velox/exec/RowContainer.h" namespace facebook::velox::exec { +namespace test { +class SpillerTest; +} -/// Manages spilling data from a RowContainer. -class Spiller { +class SpillerBase { public: - // Define the spiller types. - enum class Type : int8_t { - // Used for aggregation input processing stage. - kAggregateInput = 0, - // Used for aggregation output processing stage. - kAggregateOutput = 1, - // Used for hash join build. - kHashJoinBuild = 2, - // Used for hash join probe. - kHashJoinProbe = 3, - // Used for order by input processing stage. - kOrderByInput = 4, - // Used for order by output processing stage. - kOrderByOutput = 5, - // Used for row number. - kRowNumber = 6, - // Number of spiller types. - kNumTypes = 7, - }; - - static std::string typeName(Type); - using SpillRows = std::vector>; - /// The constructor without specifying hash bits which will only use one - /// partition by default. - - /// type == Type::kAggregateInput - Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - const HashBitRange& hashBitRange, - int32_t numSortingKeys, - const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - /// type == Type::kOrderByInput - Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - int32_t numSortingKeys, - const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - /// type == Type::kAggregateOutput || type == Type::kOrderByOutput - Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); + virtual ~SpillerBase() = default; - /// type == Type::kHashJoinProbe - Spiller( - Type type, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - /// type == Type::kHashJoinBuild - Spiller( - Type type, - core::JoinType joinType, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - /// type == Type::kRowNumber - Spiller( - Type type, - RowContainer* container, - RowTypePtr rowType, - HashBitRange bits, - const common::SpillConfig* spillConfig, - folly::Synchronized* spillStats); - - Type type() const { - return type_; - } - - /// Spills all the rows from 'this' to disk. The spilled rows stays in the - /// row container. The caller needs to erase the spilled rows from the row - /// container. - void spill(); - - /// Spill all rows starting from 'startRowIter'. This is only used by - /// 'kAggregateOutput' spiller type to spill during the aggregation output - /// processing. Similarly, the spilled rows still stays in the row container. - /// The caller needs to erase them from the row container. - void spill(const RowContainerIterator& startRowIter); - - /// Invoked to spill all the rows pointed by rows. This is used by - /// 'kOrderByOutput' spiller type to spill during the order by - /// output processing. Similarly, the spilled rows still stays in the row - /// container. The caller needs to erase them from the row container. - void spill(SpillRows& rows); - - /// Append 'spillVector' into the spill file of given 'partition'. It is now - /// only used by the spilling operator which doesn't need data sort, such as - /// hash join build and hash join probe. - /// - /// NOTE: the spilling operator should first mark 'partition' as spilling and - /// spill any data buffered in row container before call this. - void spill(uint32_t partition, const RowVectorPtr& spillVector); - - /// Extracts up to 'maxRows' or 'maxBytes' from 'rows' into 'spillVector'. The - /// extract starts at nextBatchIndex and updates nextBatchIndex to be the - /// index of the first non-extracted element of 'rows'. Returns the byte size - /// of the extracted rows. - int64_t extractSpillVector( - SpillRows& rows, - int32_t maxRows, - int64_t maxBytes, - RowVectorPtr& spillVector, - size_t& nextBatchIndex); - - /// Finishes spilling and accumulate the spilled partition metadata in - /// 'partitionSet' indexed by spill partition id. void finishSpill(SpillPartitionSet& partitionSet); - const SpillState& state() const { - return state_; - } - const HashBitRange& hashBits() const { return bits_; } - bool isSpilled(int32_t partition) const { - return state_.isPartitionSpilled(partition); - } - - /// Indicates if all the partitions have spilled. - bool isAllSpilled() const { - return state_.isAllPartitionSpilled(); - } - - /// Indicates if any one of the partitions has spilled. - bool isAnySpilled() const { - return state_.isAnyPartitionSpilled(); - } - - /// Returns the spilled partition number set. - SpillPartitionNumSet spilledPartitionSet() const { - return state_.spilledPartitionSet(); - } - - /// Invokes to set a set of 'partitions' as spilling. - void setPartitionsSpilled(const SpillPartitionNumSet& partitions) { - VELOX_CHECK_EQ( - type_, - Spiller::Type::kHashJoinProbe, - "Unexpected spiller type: ", - typeName(type_)); - for (const auto& partition : partitions) { - state_.setPartitionSpilled(partition); - } + const SpillState& state() const { + return state_; } - /// Indicates if this spiller has finalized or not. bool finalized() const { return finalized_; } @@ -200,43 +49,41 @@ class Spiller { std::string toString() const; - private: - Spiller( - Type type, + protected: + SpillerBase( RowContainer* container, RowTypePtr rowType, HashBitRange bits, int32_t numSortingKeys, const std::vector& sortCompareFlags, - bool spillProbedFlag, - const common::GetSpillDirectoryPathCB& getSpillDirPathCb, - const common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb, - const std::string& fileNamePrefix, uint64_t targetFileSize, - uint64_t writeBufferSize, - common::CompressionKind compressionKind, - const std::optional& prefixSortConfig, - folly::Executor* executor, uint64_t maxSpillRunRows, - const std::string& fileCreateConfig, + const common::SpillConfig* spillConfig, folly::Synchronized* spillStats); // Invoked to spill. If 'startRowIter' is not null, then we only spill rows // from row container starting at the offset pointed by 'startRowIter'. void spill(const RowContainerIterator* startRowIter); + // Writes out all the rows collected in spillRuns_. + virtual void runSpill(bool lastRun); + // Extracts the keys, dependents or accumulators for 'rows' into '*result'. // Creates '*results' in spillPool() if nullptr. Used from Spiller and // RowContainerSpillMergeStream. - void extractSpill(folly::Range rows, RowVectorPtr& result); + virtual void extractSpill(folly::Range rows, RowVectorPtr& resultPtr); - // Returns a mergeable stream that goes over unspilled in-memory - // rows for the spill partition 'partition'. finishSpill() - // first and 'partition' must specify a partition that has started spilling. - std::unique_ptr spillMergeStreamOverRows(int32_t partition); + virtual bool needSort() const = 0; - // Invoked to finalize the spiller and flush any buffered spill to disk. - void finalizeSpill(); + virtual std::string type() const = 0; + + // Marks all the partitions have been spilled as we don't support + // fine-grained spilling as for now. + void markAllPartitionsSpilled(); + + void updateSpillFillTime(uint64_t timeNs); + + void checkEmptySpillRuns() const; // Represents a run of rows from a spillable partition of // a RowContainer. Rows that hash to the same partition are accumulated here @@ -284,11 +131,33 @@ class Spiller { : partition(_partition), rowsWritten(_numWritten), error(_error) {} }; - void checkEmptySpillRuns() const; + RowContainer* const container_{nullptr}; - // Marks all the partitions have been spilled as we don't support - // fine-grained spilling as for now. - void markAllPartitionsSpilled(); + folly::Executor* const executor_; + + const HashBitRange bits_; + + const RowTypePtr rowType_; + + const uint64_t maxSpillRunRows_; + + folly::Synchronized* const spillStats_; + + // True if all rows of spilling partitions are in 'spillRuns_', so + // that one can start reading these back. + bool finalized_{false}; + + SpillState state_; + + // Collects the rows to spill for each partition. + std::vector spillRuns_; + + private: + // Function for writing a spill partition on an executor. Writes to + // 'partition' until all rows in spillRuns_[partition] are written + // or spill file size limit is exceeded. Returns the number of rows + // written. + std::unique_ptr writeSpill(int32_t partition); // Prepares spill runs for the spillable data from all the hash partitions. // If 'startRowIter' is not null, we prepare runs starting from the offset @@ -296,62 +165,113 @@ class Spiller { // The function returns true if it is the last spill run. bool fillSpillRuns(RowContainerIterator* startRowIter = nullptr); - // Prepares spill run of a single partition for the spillable data from the - // rows. - void fillSpillRun(SpillRows& rows); + void updateSpillExtractVectorTime(uint64_t timeNs); - // Writes out all the rows collected in spillRuns_. - void runSpill(bool lastRun); + void updateSpillSortTime(uint64_t timeNs); // Sorts 'run' if not already sorted. void ensureSorted(SpillRun& run); - // Function for writing a spill partition on an executor. Writes to - // 'partition' until all rows in spillRuns_[partition] are written - // or spill file size limit is exceeded. Returns the number of rows - // written. - std::unique_ptr writeSpill(int32_t partition); + // Extracts up to 'maxRows' or 'maxBytes' from 'rows' into 'spillVector'. The + // extract starts at nextBatchIndex and updates nextBatchIndex to be the + // index of the first non-extracted element of 'rows'. Returns the byte size + // of the extracted rows. + int64_t extractSpillVector( + SpillRows& rows, + int32_t maxRows, + int64_t maxBytes, + RowVectorPtr& spillVector, + size_t& nextBatchIndex); - // Indicates if the spill data needs to be sorted before write to file. It is - // based on the spiller type. As for now, we need to sort spill data for any - // non hash join types of spilling. - bool needSort() const; + // Invoked to finalize the spiller and flush any buffered spill to disk. + void finalizeSpill(); - void updateSpillFillTime(uint64_t timeUs); + friend class test::SpillerTest; +}; - void updateSpillSortTime(uint64_t timeUs); +class NoRowContainerSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "NoRowContainerSpiller"; - void updateSpillExtractVectorTime(uint64_t timeUs); + NoRowContainerSpiller( + RowTypePtr rowType, + HashBitRange bits, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); - const Type type_; - // NOTE: for hash join probe type, there is no associated row container for - // the spiller. - RowContainer* const container_{nullptr}; - folly::Executor* const executor_; - const HashBitRange bits_; - const RowTypePtr rowType_; - const bool spillProbedFlag_; - const uint64_t maxSpillRunRows_; + void spill(uint32_t partition, const RowVectorPtr& spillVector); - folly::Synchronized* const spillStats_; + void setPartitionsSpilled(const SpillPartitionNumSet& partitions) { + for (const auto& partition : partitions) { + state_.setPartitionSpilled(partition); + } + } - // True if all rows of spilling partitions are in 'spillRuns_', so - // that one can start reading these back. This means that the rows - // that are not written out and deleted will be captured by - // spillMergeStreamOverRows(). - bool finalized_{false}; + private: + std::string type() const override { + return std::string(kType); + } - SpillState state_; + bool needSort() const override { + return false; + } +}; - // Collects the rows to spill for each partition. - std::vector spillRuns_; +class SortInputSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "SortInputSpiller"; + + SortInputSpiller( + RowContainer* container, + RowTypePtr rowType, + int32_t numSortingKeys, + const std::vector& sortCompareFlags, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) + : SpillerBase( + container, + std::move(rowType), + HashBitRange{}, + numSortingKeys, + sortCompareFlags, + std::numeric_limits::max(), + spillConfig->maxSpillRunRows, + spillConfig, + spillStats) {} + + void spill(); + + private: + std::string type() const override { + return std::string(kType); + } + + bool needSort() const override { + return true; + } }; -} // namespace facebook::velox::exec -template <> -struct fmt::formatter : formatter { - auto format(facebook::velox::exec::Spiller::Type s, format_context& ctx) - const { - return formatter::format(static_cast(s), ctx); +class SortOutputSpiller : public SpillerBase { + public: + static constexpr std::string_view kType = "SortOutputSpiller"; + + SortOutputSpiller( + RowContainer* container, + RowTypePtr rowType, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + void spill(SpillRows& rows); + + private: + void runSpill(bool lastRun) override; + + bool needSort() const override { + return false; + } + + std::string type() const override { + return std::string(kType); } }; +} // namespace facebook::velox::exec diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index f9766007c3ae..8d22ac3bb0b7 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -758,9 +758,7 @@ void TopNRowNumber::setupSpiller() { VELOX_CHECK_NULL(spiller_); VELOX_CHECK(spillConfig_.has_value()); - spiller_ = std::make_unique( - // TODO Replace Spiller::Type::kOrderBy. - Spiller::Type::kOrderByInput, + spiller_ = std::make_unique( data_.get(), inputType_, spillCompareFlags_.size(), diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index 099122e8b56d..a9163ab0086d 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -19,6 +19,7 @@ #include "velox/exec/Operator.h" namespace facebook::velox::exec { +class TopNRowNumberSpiller; /// Partitions the input using specified partitioning keys, sorts rows within /// partitions using specified sorting keys, assigns row numbers and returns up @@ -154,7 +155,9 @@ class TopNRowNumber : public Operator { bool abandonPartialEarly() const; const int32_t limit_; + const bool generateRowNumber_; + const size_t numPartitionKeys_; // Input columns in the order of: partition keys, sorting keys, the rest. @@ -171,6 +174,7 @@ class TopNRowNumber : public Operator { const std::vector spillCompareFlags_; const vector_size_t abandonPartialMinRows_; + const int32_t abandonPartialMinPct_; // True if this operator runs a 'partial' stage without sufficient reduction @@ -181,12 +185,15 @@ class TopNRowNumber : public Operator { // partitioning keys. For each partition, stores an instance of TopRows // struct. std::unique_ptr table_; + std::unique_ptr lookup_; + int32_t partitionOffset_; // TopRows struct to keep track of top rows for a single partition, when // there are no partitioning keys. std::unique_ptr allocator_; + std::unique_ptr singlePartition_; // Stores input data. For each partition, only up to 'limit_' rows are stored. @@ -211,6 +218,7 @@ class TopNRowNumber : public Operator { // Maximum number of rows in the output batch. vector_size_t outputBatchSize_; + std::vector outputRows_; // Number of partitions to fetch from a HashTable in a single listAllRows @@ -218,13 +226,17 @@ class TopNRowNumber : public Operator { static const size_t kPartitionBatchSize = 100; BaseHashTable::RowsIterator partitionIt_; + std::vector partitions_{kPartitionBatchSize}; + size_t numPartitions_{0}; + std::optional currentPartition_; + vector_size_t remainingRowsInPartition_{0}; // Spiller for contents of the 'data_'. - std::unique_ptr spiller_; + std::unique_ptr spiller_; // Used to sort-merge spilled data. std::unique_ptr> merge_; diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index 83d6d7983e86..a36b1cce8059 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -21,10 +21,15 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/common/testutil/TestValue.h" +#include "velox/exec/GroupingSet.h" +#include "velox/exec/HashBuild.h" #include "velox/exec/HashJoinBridge.h" #include "velox/exec/HashPartitionFunction.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/RowContainer.h" +#include "velox/exec/RowNumber.h" +#include "velox/exec/SortWindowBuild.h" +#include "velox/exec/TopNRowNumber.h" #include "velox/exec/tests/utils/RowContainerTestBase.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -34,7 +39,22 @@ using namespace facebook::velox::exec::test; using namespace facebook::velox::common::testutil; using facebook::velox::filesystems::FileSystem; +namespace facebook::velox::exec::test { namespace { +enum class SpillerType { + HASH_PROBE = 0, + ROW_NUMBER_INPUT = 1, // Extends HASH_PROBE + ORDER_BY_INPUT = 2, + TOP_N_ROW_NUMBER = 3, // Extends ORDER_BY_INPUT + SORT_WINDOW_BUILD = 4, // Extends ORDER_BY_INPUT + ORDER_BY_OUTPUT = 5, + HASH_BUILD = 6, + AGGREGATION_INPUT = 7, + AGGREGATION_OUTPUT = 8, + ROW_NUMBER_HASH_TABLE = 9, + NUM_TYPES = 10, +}; + // Class to write runtime stats in the tests to the stats container. class TestRuntimeStatWriter : public BaseRuntimeStatWriter { public: @@ -51,8 +71,35 @@ class TestRuntimeStatWriter : public BaseRuntimeStatWriter { std::unordered_map& stats_; }; +std::string typeName(SpillerType type) { + switch (type) { + case SpillerType::HASH_PROBE: + return std::string(NoRowContainerSpiller::kType); + case SpillerType::ORDER_BY_INPUT: + return std::string(SortInputSpiller::kType); + case SpillerType::ORDER_BY_OUTPUT: + return std::string(SortOutputSpiller::kType); + case SpillerType::HASH_BUILD: + return std::string(HashBuildSpiller::kType); + case SpillerType::AGGREGATION_INPUT: + return std::string(AggregationInputSpiller::kType); + case SpillerType::AGGREGATION_OUTPUT: + return std::string(AggregationOutputSpiller::kType); + case SpillerType::ROW_NUMBER_INPUT: + return std::string(RowNumberInputSpiller::kType); + case SpillerType::ROW_NUMBER_HASH_TABLE: + return std::string(RowNumberHashTableSpiller::kType); + case SpillerType::TOP_N_ROW_NUMBER: + return std::string(TopNRowNumberSpiller::kType); + case SpillerType::SORT_WINDOW_BUILD: + return std::string(SortWindowBuildSpiller::kType); + default: + VELOX_FAIL("UNKNOWN SpillerType"); + } +} + struct TestParam { - Spiller::Type type; + SpillerType type; // Specifies the spill executor pool size. If the size is zero, then spill // write path is executed inline with spiller control code path. int poolSize; @@ -61,7 +108,7 @@ struct TestParam { core::JoinType joinType; TestParam( - Spiller::Type _type, + SpillerType _type, int _poolSize, common::CompressionKind _compressionKind, bool _enablePrefixSort, @@ -75,7 +122,7 @@ struct TestParam { std::string toString() const { return fmt::format( "{}|{}|{}|{}", - Spiller::typeName(type), + typeName(type), poolSize, compressionKindToString(compressionKind), std::to_string(enablePrefixSort), @@ -86,9 +133,9 @@ struct TestParam { struct TestParamsBuilder { std::vector getTestParams() { std::vector params; - const auto numSpillerTypes = static_cast(Spiller::Type::kNumTypes); + const auto numSpillerTypes = static_cast(SpillerType::NUM_TYPES); for (int i = 0; i < numSpillerTypes; ++i) { - const auto type = static_cast(i); + const auto type = static_cast(i); if (typesToExclude.find(type) == typesToExclude.end()) { common::CompressionKind compressionKind = static_cast(numSpillerTypes % 6); @@ -99,7 +146,7 @@ struct TestParamsBuilder { compressionKind, poolSize % 2, core::JoinType::kRight); - if (type == Spiller::Type::kHashJoinBuild) { + if (type == SpillerType::HASH_BUILD) { params.emplace_back( type, poolSize, @@ -113,7 +160,34 @@ struct TestParamsBuilder { return params; } - std::unordered_set typesToExclude{}; +//// TODO: DEBUG REMOVE +//std::vector getTestParams() { +// std::vector params; +// const auto type = SpillerType::HASH_BUILD; +// if (typesToExclude.find(type) == typesToExclude.end()) { +// common::CompressionKind compressionKind = +// static_cast(0); +// for (int poolSize : {0, 8}) { +// params.emplace_back( +// type, +// poolSize, +// compressionKind, +// poolSize % 2, +// core::JoinType::kRight); +// if (type == SpillerType::HASH_BUILD) { +// params.emplace_back( +// type, +// poolSize, +// compressionKind, +// poolSize % 2, +// core::JoinType::kLeft); +// } +// } +// } +// return params; +//} + + std::unordered_set typesToExclude{}; }; // Set sequential value in a given child vector. 'value' is the starting value. @@ -138,6 +212,75 @@ void resizeVector(RowVector& vector, vector_size_t size) { class SpillerTest : public exec::test::RowContainerTestBase { public: + template + T castOrThrow(SpillerBase* spiller) const { + if (auto* casted = dynamic_cast(spiller)) { + return casted; + } + VELOX_UNREACHABLE("Unsuccessful cast of spiller: {}", spiller->toString()); + } + + // Delegate to base spiller protected spill method. + void spill(SpillerBase* spiller) const { + spiller->spill(nullptr); + } + + struct SpillParams { + std::optional partition; + std::optional spillVector; + std::optional spillRows; + std::optional rowIter; + }; + + // Delegate to spiller implementation public spill method. + void spill(SpillerBase* spiller, SpillParams params) const { + const auto type = spiller->type(); + if (type == std::string(NoRowContainerSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + ASSERT_TRUE(params.partition.has_value()); + ASSERT_TRUE(params.spillVector.has_value()); + spillerImpl->spill(params.partition.value(), params.spillVector.value()); + } else if (type == std::string(SortInputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + spillerImpl->spill(); + } else if (type == std::string(SortOutputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + ASSERT_TRUE(params.spillRows.has_value()); + spillerImpl->spill(*params.spillRows.value()); + } else if (type == std::string(HashBuildSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + if (params.partition.has_value() && params.spillVector.has_value()) { + spillerImpl->spill( + params.partition.value(), params.spillVector.value()); + } else { + spillerImpl->spill(); + } + } else if (type == std::string(AggregationInputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + spillerImpl->spill(); + } else if (type == std::string(AggregationOutputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + ASSERT_TRUE(params.rowIter.has_value()); + spillerImpl->spill(*params.rowIter.value()); + } else if (type == std::string(RowNumberInputSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + ASSERT_TRUE(params.partition.has_value()); + ASSERT_TRUE(params.spillVector.has_value()); + spillerImpl->spill(params.partition.value(), params.spillVector.value()); + } else if (type == std::string(RowNumberHashTableSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + spillerImpl->spill(); + } else if (type == std::string(TopNRowNumberSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + spillerImpl->spill(); + } else if (type == std::string(SortWindowBuildSpiller::kType)) { + auto* spillerImpl = castOrThrow(spiller); + spillerImpl->spill(); + } else { + VELOX_UNREACHABLE("Unknown spiller: {}", spiller->toString()); + } + } + static void SetUpTestCase() { TestValue::enable(); memory::MemoryManager::testingSetInstance({}); @@ -152,13 +295,14 @@ class SpillerTest : public exec::test::RowContainerTestBase { enablePrefixSort_(param.enablePrefixSort), joinType_(param.joinType), spillProbedFlag_( - type_ == Spiller::Type::kHashJoinBuild && - needRightSideJoin(joinType_)), + type_ == SpillerType::HASH_BUILD && needRightSideJoin(joinType_)), hashBits_( 0, - (type_ == Spiller::Type::kOrderByInput || - type_ == Spiller::Type::kOrderByOutput || - type_ == Spiller::Type::kAggregateOutput) + (type_ == SpillerType::ORDER_BY_INPUT || + type_ == SpillerType::TOP_N_ROW_NUMBER || + type_ == SpillerType::SORT_WINDOW_BUILD || + type_ == SpillerType::ORDER_BY_OUTPUT || + type_ == SpillerType::AGGREGATION_OUTPUT) ? 0 : 2), numPartitions_(hashBits_.numPartitions()), @@ -197,7 +341,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { MAP(BIGINT(), ROW({{"s2_int", INTEGER()}, {"s2_string", VARCHAR()}})))}, }); - if (type_ == Spiller::Type::kHashJoinBuild) { + if (type_ == SpillerType::HASH_BUILD) { rowType_ = hashJoinTableSpillType(containerType_, joinType_); } else { rowType_ = containerType_; @@ -226,6 +370,13 @@ class SpillerTest : public exec::test::RowContainerTestBase { spillPartitionNums.begin(), spillPartitionNums.end()); } + bool needSort(SpillerType type) { + return type == SpillerType::AGGREGATION_INPUT || + type == SpillerType::ORDER_BY_INPUT || + type == SpillerType::TOP_N_ROW_NUMBER || + type == SpillerType::SORT_WINDOW_BUILD; + } + void testSortedSpill( int numDuplicates, int32_t outputBatchSize = 0, @@ -234,7 +385,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { uint64_t readBufferSize = 1 << 20) { SCOPED_TRACE(fmt::format( "spillType: {} numDuplicates: {} outputBatchSize: {} ascending: {} makeError: {}", - Spiller::typeName(type_), + typeName(type_), numDuplicates, outputBatchSize, ascending, @@ -251,14 +402,14 @@ class SpillerTest : public exec::test::RowContainerTestBase { setupSpiller(2'000'000, 0, makeError, 0, readBufferSize); // We spill spillPct% of the data in 10% increments. - runSpill(makeError); + runSortedSpill(makeError); if (makeError) { return; } // Verify the spilled file exist on file system. auto stats = spiller_->stats(); const auto numSpilledFiles = stats.spilledFiles; - if (type_ == Spiller::Type::kAggregateOutput) { + if (type_ == SpillerType::AGGREGATION_OUTPUT) { ASSERT_EQ(numSpilledFiles, 1); } else { ASSERT_GT(numSpilledFiles, 0); @@ -272,12 +423,9 @@ class SpillerTest : public exec::test::RowContainerTestBase { ASSERT_NE(readFile.get(), nullptr); totalSpilledBytes += readFile->size(); } - ASSERT_TRUE(spiller_->isAnySpilled()); - ASSERT_TRUE(spiller_->isAllSpilled()); + ASSERT_TRUE(spiller_->state().isAnyPartitionSpilled()); + ASSERT_TRUE(spiller_->state().isAllPartitionSpilled()); ASSERT_FALSE(spiller_->finalized()); - VELOX_ASSERT_THROW(spiller_->spill(0, nullptr), "Unexpected spiller type"); - VELOX_ASSERT_THROW( - spiller_->setPartitionsSpilled({}), "Unexpected spiller type"); SpillPartitionSet spillPartitionSet; spiller_->finishSpill(spillPartitionSet); ASSERT_EQ(spillPartitionSet.size(), numPartitions_); @@ -289,10 +437,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { // Assert we can't call any spill function after the spiller has been // finalized. - VELOX_ASSERT_THROW(spiller_->spill(), "Spiller has been finalize"); - VELOX_ASSERT_THROW( - spiller_->spill(0, nullptr), "Spiller has been finalize"); - VELOX_ASSERT_THROW(spiller_->spill(RowContainerIterator{}), ""); + VELOX_ASSERT_THROW(spiller_->spill(nullptr), ""); verifySortedSpillData(spillPartitionSet, outputBatchSize); @@ -304,7 +449,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { ASSERT_EQ(stats.spilledBytes, totalSpilledBytes); ASSERT_EQ(stats.spillReadBytes, totalSpilledBytes); ASSERT_GT(stats.spillWriteTimeNanos, 0); - if (type_ == Spiller::Type::kAggregateOutput) { + if (type_ == SpillerType::AGGREGATION_OUTPUT) { ASSERT_EQ(stats.spillSortTimeNanos, 0); } else { ASSERT_GT(stats.spillSortTimeNanos, 0); @@ -401,7 +546,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { // spilling will be tested separately. rowContainer_ = makeRowContainer(keys, dependents, false); - if (numRows == 0 || type_ == Spiller::Type::kHashJoinProbe) { + if (numRows == 0 || type_ == SpillerType::HASH_PROBE || + type_ == SpillerType::ROW_NUMBER_INPUT) { return; } const SelectivityVector allRows(numRows); @@ -466,45 +612,6 @@ class SpillerTest : public exec::test::RowContainerTestBase { } } - void setupSpillContainer(const RowTypePtr& rowType, int32_t numKeys) { - const auto& childTypes = rowType->children(); - std::vector keys(childTypes.begin(), childTypes.begin() + numKeys); - std::vector dependents; - if (numKeys < childTypes.size()) { - dependents.insert( - dependents.end(), childTypes.begin() + numKeys, childTypes.end()); - } - rowContainer_ = makeRowContainer(keys, dependents, false); - rowType_ = rowType; - } - - void writeSpillData(const std::vector& batches) { - vector_size_t numRows = 0; - for (const auto& batch : batches) { - numRows += batch->size(); - } - if (rowVector_ == nullptr) { - rowVector_ = - BaseVector::create(rowType_, numRows, pool_.get()); - } - rows_.resize(numRows); - for (int i = 0; i < numRows; ++i) { - rows_[i] = rowContainer_->newRow(); - } - - vector_size_t nextRow = 0; - for (const auto& batch : batches) { - rowVector_->append(batch.get()); - const SelectivityVector allRows(batch->size()); - for (int index = 0; index < batch->size(); ++index, ++nextRow) { - for (int i = 0; i < rowType_->size(); ++i) { - DecodedVector decodedVector(*batch->childAt(i), allRows); - rowContainer_->store(decodedVector, index, rows_[nextRow], i); - } - } - } - } - void sortSpillData(bool ascending = true) { partitions_.clear(); const auto numRows = rows_.size(); @@ -532,8 +639,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { // NOTE: for aggregation output type, we expect the merge read to produce // the output rows in the same order of the row insertion. So do need the // sort for testing. - if (type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput) { + if (type_ == SpillerType::AGGREGATION_OUTPUT || + type_ == SpillerType::ORDER_BY_OUTPUT) { return; } for (auto& partition : partitions_) { @@ -578,68 +685,82 @@ class SpillerTest : public exec::test::RowContainerTestBase { spillConfig_.maxFileSize = targetFileSize; spillConfig_.fileCreateConfig = {}; - if (type_ == Spiller::Type::kHashJoinProbe) { - // kHashJoinProbe doesn't have associated row container. - spiller_ = std::make_unique( - type_, rowType_, hashBits_, &spillConfig_, &spillStats_); - } else if (type_ == Spiller::Type::kAggregateInput) { - spiller_ = std::make_unique( - type_, + if (type_ == SpillerType::HASH_PROBE) { + spiller_ = std::make_unique( + rowType_, hashBits_, &spillConfig_, &spillStats_); + } else if (type_ == SpillerType::ROW_NUMBER_INPUT) { + spiller_ = std::make_unique( + rowType_, hashBits_, &spillConfig_, &spillStats_); + } else if (type_ == SpillerType::ORDER_BY_INPUT) { + spiller_ = std::make_unique( rowContainer_.get(), rowType_, - hashBits_, rowContainer_->keyTypes().size(), compareFlags_, &spillConfig_, &spillStats_); - } else if (type_ == Spiller::Type::kOrderByInput) { - // We spill 'data' in one partition in type of kOrderBy. - spiller_ = std::make_unique( - type_, + } else if (type_ == SpillerType::TOP_N_ROW_NUMBER) { + spiller_ = std::make_unique( rowContainer_.get(), rowType_, rowContainer_->keyTypes().size(), compareFlags_, &spillConfig_, &spillStats_); - } else if ( - type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput) { - spiller_ = std::make_unique( - type_, rowContainer_.get(), rowType_, &spillConfig_, &spillStats_); - } else if (type_ == Spiller::Type::kRowNumber) { - spiller_ = std::make_unique( - type_, + } else if (type_ == SpillerType::SORT_WINDOW_BUILD) { + spiller_ = std::make_unique( rowContainer_.get(), rowType_, - hashBits_, + rowContainer_->keyTypes().size(), + compareFlags_, &spillConfig_, &spillStats_); - } else { - VELOX_CHECK_EQ(type_, Spiller::Type::kHashJoinBuild); - spiller_ = std::make_unique( - type_, + } else if (type_ == SpillerType::ORDER_BY_OUTPUT) { + spiller_ = std::make_unique( + rowContainer_.get(), rowType_, &spillConfig_, &spillStats_); + } else if (type_ == SpillerType::HASH_BUILD) { + spiller_ = std::make_unique( joinType_, rowContainer_.get(), rowType_, hashBits_, &spillConfig_, &spillStats_); + } else if (type_ == SpillerType::AGGREGATION_INPUT) { + spiller_ = std::make_unique( + rowContainer_.get(), + rowType_, + hashBits_, + rowContainer_->keyTypes().size(), + compareFlags_, + &spillConfig_, + &spillStats_); + } else if (type_ == SpillerType::AGGREGATION_OUTPUT) { + spiller_ = std::make_unique( + rowContainer_.get(), rowType_, &spillConfig_, &spillStats_); + } else if (type_ == SpillerType::ROW_NUMBER_HASH_TABLE) { + spiller_ = std::make_unique( + rowContainer_.get(), + rowType_, + hashBits_, + &spillConfig_, + &spillStats_); + } else { + VELOX_UNREACHABLE("Unknown spiller type"); } + ASSERT_EQ(spiller_->state().maxPartitions(), numPartitions_); - ASSERT_FALSE(spiller_->isAllSpilled()); - ASSERT_FALSE(spiller_->isAnySpilled()); + ASSERT_FALSE(spiller_->state().isAllPartitionSpilled()); + ASSERT_FALSE(spiller_->state().isAnyPartitionSpilled()); ASSERT_EQ(spiller_->hashBits(), hashBits_); } - void runSpill(bool expectedError) { + void runSortedSpill(bool expectedError) { + ASSERT_TRUE(spiller_->needSort()); try { - if (type_ != Spiller::Type::kAggregateOutput) { - spiller_->spill(); - } else { - RowContainerIterator iter; - spiller_->spill(iter); - } + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::nullopt, std::optional(nullptr)}); rowContainer_->clear(); ASSERT_FALSE(expectedError); } catch (const std::exception&) { @@ -651,8 +772,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { SpillPartitionSet& spillPartitionSet, int32_t outputBatchSize = 0) { for (auto& spillPartitionEntry : spillPartitionSet) { - ASSERT_TRUE( - spiller_->isSpilled(spillPartitionEntry.first.partitionNumber())); + ASSERT_TRUE(spiller_->state().isPartitionSpilled( + spillPartitionEntry.first.partitionNumber())); const auto partition = spillPartitionEntry.first.partitionNumber(); auto* spillPartition = spillPartitionEntry.second.get(); // We make a merge reader that merges the spill files and the rows that @@ -792,12 +913,10 @@ class SpillerTest : public exec::test::RowContainerTestBase { int targetFileSize, uint64_t maxSpillRunRows, uint64_t readBufferSize = 1 << 20) { - ASSERT_TRUE( - type_ == Spiller::Type::kHashJoinBuild || - type_ == Spiller::Type::kHashJoinProbe || - type_ == Spiller::Type::kRowNumber); + ASSERT_FALSE(needSort(type_)); - const int numSpillPartitions = type_ != Spiller::Type::kHashJoinProbe + const int numSpillPartitions = (type_ != SpillerType::HASH_PROBE && + type_ != SpillerType::ROW_NUMBER_INPUT) ? numPartitions_ : 1 + folly::Random().rand32() % numPartitions_; SpillPartitionNumSet spillPartitionNumSet; @@ -819,53 +938,70 @@ class SpillerTest : public exec::test::RowContainerTestBase { HashPartitionFunction spillHashFunction(hashBits_, rowType_, keyChannels_); // Setup a number of spillers to spill data and then accumulate results from // them by partition. - std::vector> spillers; + std::vector> spillers; for (int iter = 0; iter < numSpillers; ++iter) { const auto prevGStats = common::globalSpillStats(); setupSpillData( numKeys_, - type_ != Spiller::Type::kHashJoinProbe ? numBatchRows * 10 : 0, + (type_ != SpillerType::HASH_PROBE && + type_ != SpillerType::ROW_NUMBER_INPUT) + ? numBatchRows * 10 + : 0, 1, nullptr, {}); setupSpiller(targetFileSize, 0, false, maxSpillRunRows, readBufferSize); // Can't append without marking a partition as spilling. - VELOX_ASSERT_THROW(spiller_->spill(0, rowVector_), ""); + if (auto* rowNumberInputSpiller = + dynamic_cast(spiller_.get())) { + VELOX_ASSERT_THROW(rowNumberInputSpiller->spill(0, rowVector_), ""); + } else if ( + auto* hashProbeSpiller = + dynamic_cast(spiller_.get())) { + VELOX_ASSERT_THROW(hashProbeSpiller->spill(0, rowVector_), ""); + } else if ( + auto* hashBuildSpiller = + dynamic_cast(spiller_.get())) { + VELOX_ASSERT_THROW(hashBuildSpiller->spill(0, rowVector_), ""); + } splitByPartition(rowVector_, spillHashFunction, inputsByPartition); - if (type_ == Spiller::Type::kHashJoinProbe) { - spiller_->setPartitionsSpilled(spillPartitionNumSet); + if (auto* spiller = dynamic_cast(spiller_.get())) { + spiller->setPartitionsSpilled(spillPartitionNumSet); #ifndef NDEBUG VELOX_ASSERT_THROW( - spiller_->setPartitionsSpilled(spillPartitionNumSet), ""); + spiller->setPartitionsSpilled(spillPartitionNumSet), ""); #endif } else { - VELOX_ASSERT_THROW( - spiller_->setPartitionsSpilled(spillPartitionNumSet), ""); - spiller_->spill(); + spiller_->spill(nullptr); rowContainer_->clear(); - ASSERT_TRUE(spiller_->isAllSpilled()); + ASSERT_TRUE(spiller_->state().isAllPartitionSpilled()); } // Spill data. - for (int i = 0; i < numAppendBatches; ++i) { - RowVectorPtr batch = makeDataset(rowType_, numBatchRows, nullptr); - splitByPartition(batch, spillHashFunction, inputsByPartition); - for (const auto& partition : spillPartitionNumSet) { - spiller_->spill(partition, inputsByPartition[partition].back()); + if (type_ == SpillerType::HASH_PROBE || + type_ == SpillerType::ROW_NUMBER_INPUT) { + auto* spiller = dynamic_cast(spiller_.get()); + ASSERT_NE(spiller, nullptr); + for (int i = 0; i < numAppendBatches; ++i) { + RowVectorPtr batch = makeDataset(rowType_, numBatchRows, nullptr); + splitByPartition(batch, spillHashFunction, inputsByPartition); + for (const auto& partition : spillPartitionNumSet) { + spiller->spill(partition, inputsByPartition[partition].back()); + } } } // Assert that hash probe type of spiller type doesn't support incremental // spilling. - if (type_ == Spiller::Type::kHashJoinProbe) { - VELOX_ASSERT_THROW(spiller_->spill(), ""); - } else { - spiller_->spill(); - ASSERT_TRUE(spiller_->isAllSpilled()); + if (type_ != SpillerType::HASH_PROBE && + type_ != SpillerType::ROW_NUMBER_INPUT) { + spiller_->spill(nullptr); + ASSERT_TRUE(spiller_->state().isAllPartitionSpilled()); } const auto stats = spiller_->stats(); ASSERT_GE(stats.spilledFiles, 0); - if (type_ == Spiller::Type::kHashJoinProbe) { + if (type_ == SpillerType::HASH_PROBE || + type_ == SpillerType::ROW_NUMBER_INPUT) { if (numAppendBatches == 0) { ASSERT_EQ(stats.spilledRows, 0); ASSERT_EQ(stats.spilledBytes, 0); @@ -894,8 +1030,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { } ASSERT_GT(stats.spilledPartitions, 0); ASSERT_EQ(stats.spillSortTimeNanos, 0); - if (type_ == Spiller::Type::kHashJoinBuild || - type_ == Spiller::Type::kRowNumber) { + if (type_ == SpillerType::HASH_BUILD || + type_ == SpillerType::ROW_NUMBER_HASH_TABLE) { ASSERT_GT(stats.spillFillTimeNanos, 0); } else { ASSERT_EQ(stats.spillFillTimeNanos, 0); @@ -950,20 +1086,28 @@ class SpillerTest : public exec::test::RowContainerTestBase { } void verifyNonSortedSpillData( - std::vector> spillers, + std::vector> spillers, const SpillPartitionNumSet& spillPartitionNumSet, const std::vector>& inputsByPartition) { - ASSERT_TRUE( - type_ == Spiller::Type::kHashJoinBuild || - type_ == Spiller::Type::kRowNumber || - type_ == Spiller::Type::kHashJoinProbe); + needSort(type_); SpillPartitionSet spillPartitionSet; for (auto& spiller : spillers) { spiller->finishSpill(spillPartitionSet); - VELOX_ASSERT_THROW( - spiller->spill(0, nullptr), "Spiller has been finalized"); - VELOX_ASSERT_THROW(spiller->spill(), "Spiller has been finalized"); + if (type_ == SpillerType::HASH_PROBE || + type_ == SpillerType::ROW_NUMBER_INPUT) { + auto* spillerImpl = dynamic_cast(spiller.get()); + ASSERT_NE(spillerImpl, nullptr); + // Check finalized throw + VELOX_ASSERT_THROW(spillerImpl->spill(0, nullptr), ""); + } else if (type_ == SpillerType::HASH_BUILD) { + auto* spillerImpl = dynamic_cast(spiller.get()); + ASSERT_NE(spillerImpl, nullptr); + // Check finalized throw + VELOX_ASSERT_THROW(spillerImpl->spill(0, nullptr), ""); + } + // Check finalized throw + VELOX_ASSERT_THROW(spiller->spill(nullptr), ""); } ASSERT_EQ(spillPartitionSet.size(), spillPartitionNumSet.size()); @@ -973,7 +1117,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { hashBits_.begin(), spillPartitionEntry.first.partitionBitOffset()); auto reader = spillPartitionEntry.second->createUnorderedReader( spillConfig_.readBufferSize, pool(), &spillStats_); - if (type_ == Spiller::Type::kHashJoinProbe) { + if (type_ == SpillerType::HASH_PROBE || + type_ == SpillerType::ROW_NUMBER_INPUT) { // For hash probe type, we append each input vector as one batch in // spill file so that we can do one-to-one comparison. for (int i = 0; i < inputsByPartition[partition].size(); ++i) { @@ -1026,16 +1171,27 @@ class SpillerTest : public exec::test::RowContainerTestBase { const SpillPartitionNumSet& spillPartitionNumSet, const std::vector>& inputsByPartition) { ASSERT_TRUE( - type_ == Spiller::Type::kHashJoinBuild || - type_ == Spiller::Type::kRowNumber || - type_ == Spiller::Type::kHashJoinProbe); + type_ == SpillerType::HASH_BUILD || + type_ == SpillerType::ROW_NUMBER_HASH_TABLE || + type_ == SpillerType::HASH_PROBE || + type_ == SpillerType::ROW_NUMBER_INPUT); SpillPartitionSet spillPartitionSet; spiller_->finishSpill(spillPartitionSet); - VELOX_ASSERT_THROW( - spiller_->spill(0, nullptr), "Spiller has been finalized"); - VELOX_ASSERT_THROW(spiller_->spill(), "Spiller has been finalized"); - VELOX_ASSERT_THROW(spiller_->spill(RowContainerIterator{}), ""); + + if (type_ == SpillerType::HASH_PROBE || + type_ == SpillerType::ROW_NUMBER_INPUT) { + auto* spillerImpl = dynamic_cast(spiller_.get()); + ASSERT_NE(spillerImpl, nullptr); + VELOX_ASSERT_THROW( + spillerImpl->spill(0, nullptr), ""); + } else if (type_ == SpillerType::HASH_BUILD) { + auto* spillerImpl = dynamic_cast(spiller_.get()); + ASSERT_NE(spillerImpl, nullptr); + VELOX_ASSERT_THROW( + spillerImpl->spill(0, nullptr), ""); + } + VELOX_ASSERT_THROW(spiller_->spill(nullptr), ""); ASSERT_EQ(spillPartitionSet.size(), spillPartitionNumSet.size()); for (auto& spillPartitionEntry : spillPartitionSet) { @@ -1044,7 +1200,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { hashBits_.begin(), spillPartitionEntry.first.partitionBitOffset()); auto reader = spillPartitionEntry.second->createUnorderedReader( spillConfig_.readBufferSize, pool(), &spillStats_); - if (type_ == Spiller::Type::kHashJoinProbe) { + if (type_ == SpillerType::HASH_PROBE || + type_ == SpillerType::ROW_NUMBER_INPUT) { // For hash probe type, we append each input vector as one batch in // spill file so that we can do one-to-one comparison. for (int i = 0; i < inputsByPartition[partition].size(); ++i) { @@ -1115,7 +1272,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { } const TestParam param_; - const Spiller::Type type_; + const SpillerType type_; const int32_t executorPoolSize_; const common::CompressionKind compressionKind_; const bool enablePrefixSort_; @@ -1143,7 +1300,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { std::vector rows_; std::vector> partitions_; std::vector compareFlags_; - std::unique_ptr spiller_; + std::unique_ptr spiller_; common::SpillConfig spillConfig_; folly::Synchronized spillStats_; }; @@ -1153,10 +1310,11 @@ struct AllTypesTestParam { uint64_t maxSpillRunRows; }; -class AllTypes : public SpillerTest, - public testing::WithParamInterface { +class AllTypesSpillerTest + : public SpillerTest, + public testing::WithParamInterface { public: - AllTypes() + AllTypesSpillerTest() : SpillerTest(GetParam().param), maxSpillRunRows_(GetParam().maxSpillRunRows) {} @@ -1177,30 +1335,30 @@ class AllTypes : public SpillerTest, protected: uint64_t maxSpillRunRows_; }; - -TEST_P(AllTypes, nonSortedSpillFunctions) { - if (type_ == Spiller::Type::kOrderByInput || - type_ == Spiller::Type::kOrderByOutput || - type_ == Spiller::Type::kAggregateInput || - type_ == Spiller::Type::kAggregateOutput) { +} // namespace facebook::velox::exec::test + +TEST_P(AllTypesSpillerTest, nonSortedSpillFunctions) { + if (type_ == SpillerType::ORDER_BY_INPUT || + type_ == SpillerType::SORT_WINDOW_BUILD || + type_ == SpillerType::TOP_N_ROW_NUMBER || + type_ == SpillerType::ORDER_BY_OUTPUT || + type_ == SpillerType::AGGREGATION_INPUT || + type_ == SpillerType::AGGREGATION_OUTPUT) { setupSpillData(numKeys_, 5'000, 1, nullptr, {}); sortSpillData(); setupSpiller(100'000, 0, false, maxSpillRunRows_); - { - RowVectorPtr dummyVector; - VELOX_ASSERT_THROW( - spiller_->spill(0, dummyVector), "Unexpected spiller type"); - } - if (type_ == Spiller::Type::kOrderByOutput) { + if (type_ == SpillerType::ORDER_BY_OUTPUT) { RowContainerIterator rowIter; std::vector> rows(5'000, *pool_); int numListedRows{0}; numListedRows = rowContainer_->listRows(&rowIter, 5000, rows.data()); ASSERT_EQ(numListedRows, 5000); - spiller_->spill(rows); + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::optional(&rows), std::nullopt}); } else { - spiller_->spill(); + spill(spiller_.get()); } ASSERT_FALSE(spiller_->finalized()); @@ -1217,36 +1375,40 @@ TEST_P(AllTypes, nonSortedSpillFunctions) { testNonSortedSpill(1, 5'000, 0, 1, maxSpillRunRows_); } -TEST_P(AllTypes, readaheadTest) { - if (type_ == Spiller::Type::kOrderByOutput || - type_ == Spiller::Type::kAggregateOutput) { +TEST_P(AllTypesSpillerTest, readaheadTest) { + if (type_ == SpillerType::ORDER_BY_OUTPUT || + type_ == SpillerType::AGGREGATION_OUTPUT) { return; } - if (type_ == Spiller::Type::kOrderByInput || - type_ == Spiller::Type::kAggregateInput) { + if (type_ == SpillerType::ORDER_BY_INPUT || + type_ == SpillerType::SORT_WINDOW_BUILD || + type_ == SpillerType::TOP_N_ROW_NUMBER || + type_ == SpillerType::AGGREGATION_INPUT) { testSortedSpill(10, 10, false, false, 512); return; } testNonSortedSpill(1, 5'000, 0, 1'000'000'000, maxSpillRunRows_, 512); } -class NoHashJoin : public SpillerTest, +class SortedSpillerTest : public SpillerTest, public testing::WithParamInterface { public: - NoHashJoin() : SpillerTest(GetParam()) {} + SortedSpillerTest() : SpillerTest(GetParam()) {} static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kHashJoinProbe, - Spiller::Type::kHashJoinBuild, - Spiller::Type::kRowNumber, - Spiller::Type::kOrderByOutput}} + {SpillerType::HASH_PROBE, + SpillerType::ROW_NUMBER_INPUT, + SpillerType::HASH_BUILD, + SpillerType::ROW_NUMBER_HASH_TABLE, + SpillerType::ORDER_BY_OUTPUT, + SpillerType::AGGREGATION_OUTPUT}} .getTestParams(); } }; -TEST_P(NoHashJoin, spillFew) { +TEST_P(SortedSpillerTest, spillFew) { // Test with distinct sort keys. testSortedSpill(10, 1); testSortedSpill(10, 1, false, false); @@ -1259,7 +1421,7 @@ TEST_P(NoHashJoin, spillFew) { testSortedSpill(10, 10, true, false); } -TEST_P(NoHashJoin, spillMost) { +TEST_P(SortedSpillerTest, spillMost) { // Test with distinct sort keys. testSortedSpill(60, 1); testSortedSpill(60, 1, false, false); @@ -1272,7 +1434,7 @@ TEST_P(NoHashJoin, spillMost) { testSortedSpill(60, 10, true, false); } -TEST_P(NoHashJoin, spillAll) { +TEST_P(SortedSpillerTest, spillAll) { // Test with distinct sort keys. testSortedSpill(100, 1); testSortedSpill(100, 1, false, false); @@ -1285,7 +1447,7 @@ TEST_P(NoHashJoin, spillAll) { testSortedSpill(100, 10, true, false); } -TEST_P(NoHashJoin, error) { +TEST_P(SortedSpillerTest, error) { testSortedSpill(100, 1, false, true); } @@ -1297,11 +1459,15 @@ class HashJoinBuildOnly : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kAggregateInput, - Spiller::Type::kAggregateOutput, - Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderByInput, - Spiller::Type::kOrderByOutput}} + {SpillerType::AGGREGATION_INPUT, + SpillerType::AGGREGATION_OUTPUT, + SpillerType::HASH_PROBE, + SpillerType::ROW_NUMBER_INPUT, + SpillerType::ORDER_BY_INPUT, + SpillerType::SORT_WINDOW_BUILD, + SpillerType::TOP_N_ROW_NUMBER, + SpillerType::ORDER_BY_OUTPUT, + SpillerType::ROW_NUMBER_HASH_TABLE}} .getTestParams(); } }; @@ -1312,23 +1478,32 @@ TEST_P(HashJoinBuildOnly, spillPartition) { HashPartitionFunction spillHashFunction(hashBits_, rowType_, keyChannels_); splitByPartition(rowVector_, spillHashFunction, vectorsByPartition); setupSpiller(100'000, 0, false); - spiller_->spill(); + spill(spiller_.get()); rowContainer_->clear(); - spiller_->spill(); + spill(spiller_.get()); verifyNonSortedSpillData(allPartitionNumSet(), vectorsByPartition); - VELOX_ASSERT_THROW(spiller_->spill(), "Spiller has been finalized"); - VELOX_ASSERT_THROW(spiller_->spill(RowContainerIterator{}), ""); + VELOX_ASSERT_THROW(spill(spiller_.get()), ""); + RowContainerIterator rowIter; + // TODO: Separating different types to different assert calls with different + // API. + VELOX_ASSERT_THROW( + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::nullopt, std::optional(&rowIter)}), + ""); } TEST_P(HashJoinBuildOnly, writeBufferSize) { - std::vector writeBufferSizes = {0, 4'000'000'000}; + std::vector writeBufferSizes = {0/*, 4'000'000'000*/}; for (const auto writeBufferSize : writeBufferSizes) { SCOPED_TRACE( fmt::format("writeBufferSize {}", succinctBytes(writeBufferSize))); setupSpillData(numKeys_, 1'000, 1, nullptr, {}); setupSpiller(4'000'000'000, writeBufferSize, false); - spiller_->spill(); - ASSERT_TRUE(spiller_->isAllSpilled()); + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::nullopt, std::nullopt}); + ASSERT_TRUE(spiller_->state().isAllPartitionSpilled()); const int numDiskWrites = spiller_->stats().spillWrites; if (writeBufferSize != 0) { ASSERT_EQ(numDiskWrites, 0); @@ -1350,7 +1525,13 @@ TEST_P(HashJoinBuildOnly, writeBufferSize) { for (int partition = 0; partition < numPartitions_; ++partition) { const auto& splitVector = splitVectors[partition]; if (!splitVector.empty()) { - spiller_->spill(partition, splitVector.back()); + // TODO: Check if this is for all types? + spill( + spiller_.get(), + {std::optional(partition), + std::optional(splitVector.back()), + std::nullopt, + std::nullopt}); ++spillInputVectorCount; } } @@ -1389,12 +1570,15 @@ class AggregationOutputOnly : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kAggregateInput, - Spiller::Type::kHashJoinBuild, - Spiller::Type::kRowNumber, - Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderByInput, - Spiller::Type::kOrderByOutput}} + {SpillerType::AGGREGATION_INPUT, + SpillerType::HASH_BUILD, + SpillerType::ROW_NUMBER_HASH_TABLE, + SpillerType::HASH_PROBE, + SpillerType::ROW_NUMBER_INPUT, + SpillerType::ORDER_BY_INPUT, + SpillerType::SORT_WINDOW_BUILD, + SpillerType::TOP_N_ROW_NUMBER, + SpillerType::ORDER_BY_OUTPUT}} .getTestParams(); } }; @@ -1455,13 +1639,9 @@ TEST_P(AggregationOutputOnly, basic) { &rowIter, testData.spillRowOffset, rows.data()); } ASSERT_EQ(numListedRows, std::min(numRows, testData.spillRowOffset)); - { - RowVectorPtr dummy; - VELOX_ASSERT_THROW( - spiller_->spill(0, dummy), - "Unexpected spiller type: AGGREGATE_OUTPUT"); - } - spiller_->spill(rowIter); + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::nullopt, std::optional(&rowIter)}); ASSERT_EQ(rowContainer_->numRows(), numRows); rowContainer_->clear(); @@ -1510,12 +1690,15 @@ class OrderByOutputOnly : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kAggregateInput, - Spiller::Type::kAggregateOutput, - Spiller::Type::kHashJoinBuild, - Spiller::Type::kHashJoinProbe, - Spiller::Type::kRowNumber, - Spiller::Type::kOrderByInput}} + {SpillerType::AGGREGATION_INPUT, + SpillerType::AGGREGATION_OUTPUT, + SpillerType::HASH_BUILD, + SpillerType::HASH_PROBE, + SpillerType::ROW_NUMBER_INPUT, + SpillerType::ROW_NUMBER_HASH_TABLE, + SpillerType::ORDER_BY_INPUT, + SpillerType::TOP_N_ROW_NUMBER, + SpillerType::SORT_WINDOW_BUILD}} .getTestParams(); } }; @@ -1560,18 +1743,21 @@ TEST_P(OrderByOutputOnly, basic) { rowContainer_->listRows(&rowIter, testData.numSpillRows, rows.data()); ASSERT_LE(numListedRows, numRows); { - RowVectorPtr dummy; + SpillerBase::SpillRows emptyRows(*pool_); VELOX_ASSERT_THROW( - spiller_->spill(0, dummy), - "Unexpected spiller type: ORDER_BY_OUTPUT"); - } - { - Spiller::SpillRows emptyRows(*pool_); - VELOX_ASSERT_THROW(spiller_->spill(emptyRows), ""); + spill( + spiller_.get(), + {std::nullopt, + std::nullopt, + std::optional(&emptyRows), + std::nullopt}), + ""); } - auto spillRows = - Spiller::SpillRows(rows.begin(), rows.begin() + numListedRows, *pool_); - spiller_->spill(spillRows); + auto spillRows = SpillerBase::SpillRows( + rows.begin(), rows.begin() + numListedRows, *pool_); + spill( + spiller_.get(), + {std::nullopt, std::nullopt, std::optional(&spillRows), std::nullopt}); ASSERT_EQ(rowContainer_->numRows(), numRows); rowContainer_->clear(); @@ -1619,7 +1805,9 @@ class MaxSpillRunTest : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kHashJoinProbe, Spiller::Type::kOrderByOutput}} + {SpillerType::HASH_PROBE, + SpillerType::ROW_NUMBER_INPUT, + SpillerType::ORDER_BY_OUTPUT}} .getTestParams(); } }; @@ -1646,16 +1834,19 @@ TEST_P(MaxSpillRunTest, basic) { 0, false, testData.maxSpillRunRows); - if (type_ == Spiller::Type::kOrderByOutput) { - RowContainerIterator rowIter; - Spiller::SpillRows rows(numRows, *pool_); - int numListedRows{0}; - numListedRows = rowContainer_->listRows(&rowIter, numRows, rows.data()); - ASSERT_EQ(numListedRows, numRows); - spiller_->spill(rows); - } else { - spiller_->spill(); - } + RowContainerIterator rowIter; + SpillerBase::SpillRows rows(numRows, *pool_); + int numListedRows{0}; + numListedRows = rowContainer_->listRows(&rowIter, numRows, rows.data()); + ASSERT_EQ(numListedRows, numRows); + // Let helper decide which public API to call based on impl type. + spill( + spiller_.get(), + {std::nullopt, + std::nullopt, + std::optional(&rows), + std::optional(nullptr)}); + ASSERT_FALSE(spiller_->finalized()); SpillPartitionSet spillPartitionSet; spiller_->finishSpill(spillPartitionSet); @@ -1671,15 +1862,18 @@ TEST_P(MaxSpillRunTest, basic) { const auto& stats = spiller_->stats(); ASSERT_EQ(totalSize, stats.spilledBytes); - if (type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput) { + if (type_ == SpillerType::AGGREGATION_OUTPUT || + type_ == SpillerType::ORDER_BY_OUTPUT) { ASSERT_EQ(numFiles, numPartitions_); ASSERT_EQ(spillPartitionSet.size(), numPartitions_); - } else if (type_ == Spiller::Type::kOrderByInput) { + } else if ( + type_ == SpillerType::ORDER_BY_INPUT || + type_ == SpillerType::SORT_WINDOW_BUILD || + type_ == SpillerType::TOP_N_ROW_NUMBER) { // Need sort. ASSERT_EQ(numFiles, testData.expectedNumFiles); ASSERT_EQ(spillPartitionSet.size(), numPartitions_); - } else if (type_ == Spiller::Type::kAggregateInput) { + } else if (type_ == SpillerType::AGGREGATION_INPUT) { ASSERT_GE(numFiles, testData.expectedNumFiles); ASSERT_EQ(spillPartitionSet.size(), numPartitions_); } else { @@ -1691,13 +1885,13 @@ TEST_P(MaxSpillRunTest, basic) { VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, - AllTypes, - testing::ValuesIn(AllTypes::getTestParams())); + AllTypesSpillerTest, + testing::ValuesIn(AllTypesSpillerTest::getTestParams())); VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, - NoHashJoin, - testing::ValuesIn(NoHashJoin::getTestParams())); + SortedSpillerTest, + testing::ValuesIn(SortedSpillerTest::getTestParams())); VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest,