Skip to content

Commit

Permalink
Back out "Optimize cross joins with single build batch"
Browse files Browse the repository at this point in the history
Summary:
facebookincubator#10695 has correctness error. Prestissimo tests are failing with

```
[ERROR]   TestPrestoNativeGeneralQueriesJSON>AbstractTestNativeGeneralQueries.testCorrelatedExistsSubqueries:1647->AbstractTestQueryFramework.assertQuery:174 For query:
 SELECT EXISTS(SELECT 1 FROM (VALUES 1, 1, 1, 2, 2, 3, 4) i(a) WHERE i.a < o.a AND i.a < 4) FROM (VALUES 0, 3, 3, 5) o(a)
 actual column types:
 [boolean]
expected column types:
[boolean]

not equal
Expected rows (3 of 3 missing rows shown, 4 rows in total):
    [false]
    [true]
    [true]
```

We know the velox main branch is not performant for NLJ, but better revert this performance fix which breaks correctness also.

Differential Revision: D61074364
  • Loading branch information
amitkdutta authored and facebook-github-bot committed Aug 11, 2024
1 parent 91420f4 commit 62807ea
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 183 deletions.
159 changes: 45 additions & 114 deletions velox/exec/NestedLoopJoinProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ bool NestedLoopJoinProbe::getBuildData(ContinueFuture* future) {
}

buildVectors_ = std::move(buildData);

for (const auto& build : buildVectors_.value()) {
buildRowCount_ += build->size();
}
buildSideEmpty_ = (buildRowCount_ == 0);
return true;
}

Expand Down Expand Up @@ -271,7 +276,13 @@ RowVectorPtr NestedLoopJoinProbe::generateOutput() {

bool NestedLoopJoinProbe::advanceProbe() {
if (hasProbedAllBuildData()) {
probeRow_ += probeRowCount_;
// For cross joins, if there is a single record on the build side, we return
// batches containing all probe records from `input_` at a time.
if (isCrossJoin() && buildRowCount_ == 1) {
probeRow_ = input_->size();
} else {
++probeRow_;
}
probeRowHasMatch_ = false;
buildIndex_ = 0;

Expand Down Expand Up @@ -308,8 +319,9 @@ bool NestedLoopJoinProbe::addToOutput() {
}

// If this is a cross join, there is no filter to evaluate. We can just
// return the output vector directly. Also don't need to bother about adding
// mismatched rows.
// return the output vector directly, which is composed of the build
// projections at `probeRow_` (as constants), and current vector of the
// build side. Also don't need to bother about adding mismatched rows.
if (isCrossJoin()) {
output_ = getNextCrossProductBatch(
currentBuild, outputType_, identityProjections_, buildProjections_);
Expand Down Expand Up @@ -423,121 +435,40 @@ RowVectorPtr NestedLoopJoinProbe::getNextCrossProductBatch(
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
VELOX_CHECK_GT(buildVector->size(), 0);

// If there is a single build record, we use the entire probe batch `input_`
// and the single build record wrapped as a constant.
if (isSingleBuildRow()) {
return genCrossProductSingleBuildRow(
buildVector, outputType, probeProjections, buildProjections);
} else if (isSingleBuildVector()) {
return genCrossProductSingleBuildVector(
buildVector, outputType, probeProjections, buildProjections);
} else {
return genCrossProductMultipleBuildVectors(
buildVector, outputType, probeProjections, buildProjections);
}
}

RowVectorPtr NestedLoopJoinProbe::genCrossProductSingleBuildRow(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
VELOX_CHECK(isSingleBuildRow());

std::vector<VectorPtr> projectedChildren(outputType->size());
size_t numOutputRows = input_->size();
probeRowCount_ = input_->size();
size_t numOutputRows = 0;

// Project columns from the probe side.
projectChildren(
projectedChildren, input_, probeProjections, numOutputRows, nullptr);

// Wrap projections from the build side as constants.
for (const auto [inputChannel, outputChannel] : buildProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, 0, buildVector->childAt(inputChannel));
}
return std::make_shared<RowVector>(
pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren));
}
// If it's a cross join and there is a single build record, we use the entire
// probe batch `input_` and the single build record wrapped as a constant.
if (isCrossJoin() && buildRowCount_ == 1) {
numOutputRows = input_->size();

RowVectorPtr NestedLoopJoinProbe::genCrossProductSingleBuildVector(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
VELOX_CHECK(isSingleBuildVector());
std::vector<VectorPtr> projectedChildren(outputType->size());
vector_size_t buildRowCount = buildVector->size();
// Project columns from the probe side.
projectChildren(
projectedChildren, input_, probeProjections, numOutputRows, nullptr);

// Calculate how many probe rows we can cover without exceeding
// outputBatchSize_.
if (buildRowCount > outputBatchSize_) {
probeRowCount_ = 1;
// Wrap projections from the build side as constants.
for (const auto [inputChannel, outputChannel] : buildProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, 0, buildVector->childAt(inputChannel));
}
} else {
probeRowCount_ = std::min(
(vector_size_t)outputBatchSize_ / buildRowCount,
input_->size() - probeRow_);
}
size_t numOutputRows = probeRowCount_ * buildRowCount;

// Generate probe dictionary indices.
auto rawProbeIndices =
initializeRowNumberMapping(probeIndices_, numOutputRows, pool());
for (auto i = 0; i < probeRowCount_; ++i) {
std::fill(
rawProbeIndices.begin() + i * buildRowCount,
rawProbeIndices.begin() + (i + 1) * buildRowCount,
probeRow_ + i);
}

// Generate build dictionary indices.
auto rawBuildIndices_ =
initializeRowNumberMapping(buildIndices_, numOutputRows, pool());
for (auto i = 0; i < probeRowCount_; ++i) {
std::iota(
rawBuildIndices_.begin() + i * buildRowCount,
rawBuildIndices_.begin() + (i + 1) * buildRowCount,
0);
}

projectChildren(
projectedChildren,
input_,
probeProjections,
numOutputRows,
probeIndices_);
projectChildren(
projectedChildren,
buildVector,
buildProjections,
numOutputRows,
buildIndices_);

return std::make_shared<RowVector>(
pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren));
}

RowVectorPtr NestedLoopJoinProbe::genCrossProductMultipleBuildVectors(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections) {
std::vector<VectorPtr> projectedChildren(outputType->size());
size_t numOutputRows = buildVector->size();
probeRowCount_ = 1;

// Project columns from the build side.
projectChildren(
projectedChildren, buildVector, buildProjections, numOutputRows, nullptr);

// Wrap projections from the probe side as constants.
for (const auto [inputChannel, outputChannel] : probeProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, probeRow_, input_->childAt(inputChannel));
numOutputRows = buildVector->size();

// Project columns from the build side.
projectChildren(
projectedChildren,
buildVector,
buildProjections,
numOutputRows,
nullptr);

// Wrap projections from the probe side as constants.
for (const auto [inputChannel, outputChannel] : probeProjections) {
projectedChildren[outputChannel] = BaseVector::wrapInConstant(
numOutputRows, probeRow_, input_->childAt(inputChannel));
}
}

return std::make_shared<RowVector>(
pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren));
}
Expand Down Expand Up @@ -603,7 +534,7 @@ void NestedLoopJoinProbe::finishProbeInput() {
// From now one we finished processing the probe side. Check now if this is a
// right or full outer join, and hence we may need to start emitting buid
// mismatch records.
if (!needsBuildMismatch(joinType_) || isBuildSideEmpty()) {
if (!needsBuildMismatch(joinType_) || buildSideEmpty_) {
setState(ProbeOperatorState::kFinish);
return;
}
Expand Down Expand Up @@ -659,7 +590,7 @@ RowVectorPtr NestedLoopJoinProbe::getBuildMismatchedOutput(
// product but the build or probe side is empty, there could still be
// mismatched rows from the other side.
if (matched.isAllSelected() ||
(isCrossJoin() && !probeSideEmpty_ && !isBuildSideEmpty())) {
(isCrossJoin() && !probeSideEmpty_ && !buildSideEmpty_)) {
return nullptr;
}

Expand Down
89 changes: 22 additions & 67 deletions velox/exec/NestedLoopJoinProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ namespace facebook::velox::exec {
/// To produce output, the operator processes each probe record from probe
/// input, using the following steps:
///
/// 1. Materialize a cross-product batch across probe and build.
/// 1. Materialize a cross product by wrapping each probe record (as a constant)
/// to each build vector.
/// 2. Evaluate the join condition.
/// 3. Add key matches to the output.
/// 4. Once all build vectors are processed for a particular probe row, check if
Expand All @@ -48,23 +49,15 @@ namespace facebook::velox::exec {
/// collect all build matches at the end, and emit any records that haven't
/// been matched by any of the peers.
///
/// There are three different cases for the generation of cross-product across
/// probe and build (#1 above):
/// The output always contains dictionaries wrapped around probe columns, and
/// copies for build columns. The only exception are cases when the build side
/// contains a single record. In that case, each probe batch will be wrapped
/// with the single build record (as a constant).
///
/// a) If build side has a single row, simply wrap that row as a constant and
/// produce it along with probe batches.
///
/// b) If build side has a single batch, produce a dictionary wrapped across
/// probe and build rows, covering as many probe rows as allowed by
/// `outputBatchSize_` (maximum record to produce per batch).
///
/// c) If build side has multiple vectors, take one probe row are at a time,
/// wrapping it as a constant, and produce it along with build batches.
///
/// If needed, buid-side copies are done lazily; it first accumulates the ranges
/// to be copied, then performs the copies in batch, column-by-column. It
/// produces at most `outputBatchSize_` records, but it may produce fewer since
/// the output needs to follow the probe vector boundaries.
/// The buid-side copies are done lazily; it first accumulates the ranges to be
/// copied, then performs the copies in batch, column-by-column. It produces at
/// most `outputBatchSize_` records, but it may produce fewer since the output
/// needs to follow the probe vector boundaries.
class NestedLoopJoinProbe : public Operator {
public:
NestedLoopJoinProbe(
Expand Down Expand Up @@ -147,8 +140,14 @@ class NestedLoopJoinProbe : public Operator {
}

// Generates the next batch of a cross product between probe and build. It
// should be used as the entry point, and will internally delegate to one of
// the three functions below.
// handles two cases:
//
// #1. Use the current probe record being processed (`probeRow_` from
// `input_`) for probe projections, and the columns from buildVector for build
// projections.
// #2. For cross joins, if there is a single build record, it uses the columns
// from the current probe batch (`input_`), and the single build record
// wrapped as a constant.
//
// Output projections can be specified so that this function can be used to
// generate both filter input and actual output (in case there is no join
Expand All @@ -159,30 +158,6 @@ class NestedLoopJoinProbe : public Operator {
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections);

// Generates a cross product batch when there is a single build row (probe
// batch plus build row as a constant).
RowVectorPtr genCrossProductSingleBuildRow(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections);

// Generates a cross product batch when there is a single build vector (probe
// and build batch wrapped in a dictionary).
RowVectorPtr genCrossProductSingleBuildVector(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections);

// As a fallback, process the current probe row to as much build data as
// possible (probe row as constant, and flat copied data for build records).
RowVectorPtr genCrossProductMultipleBuildVectors(
const RowVectorPtr& buildVector,
const RowTypePtr& outputType,
const std::vector<IdentityProjection>& probeProjections,
const std::vector<IdentityProjection>& buildProjections);

// Add a single record to `output_` based on buildRow from buildVector, and
// the current probeRow and probe vector (input_). Probe side projections are
// zero-copy (dictionary indices), and build side projections are marked to be
Expand Down Expand Up @@ -234,24 +209,6 @@ class NestedLoopJoinProbe : public Operator {
return joinCondition_ == nullptr;
}

// If build has a single vector, we can wrap probe and build batches into
// dictionaries and produce as many combinations of probe and build rows,
// until `numOutputRows_` is filled.
bool isSingleBuildVector() const {
return buildVectors_->size() == 1;
}

// If there are no incoming records in the build side.
bool isBuildSideEmpty() const {
return buildVectors_->empty();
}

// If build has a single row, we can simply add it as a constant to probe
// batches.
bool isSingleBuildRow() const {
return isSingleBuildVector() && buildVectors_->front()->size() == 1;
}

// Wraps rows of 'data' that are not selected in 'matched' and projects
// to the output according to 'projections'. 'nullProjections' is used to
// create null column vectors in output for outer join. 'unmatchedMapping' is
Expand Down Expand Up @@ -285,9 +242,6 @@ class NestedLoopJoinProbe : public Operator {
BufferPtr probeIndices_;
vector_size_t* rawProbeIndices_;

// Dictionary indices for build columns.
BufferPtr buildIndices_;

// Join condition expression.

// May be nullptr for a cross join.
Expand All @@ -314,9 +268,6 @@ class NestedLoopJoinProbe : public Operator {
// Probe row being currently processed (related to `input_`).
vector_size_t probeRow_{0};

// How many probe rows are being processed by the current batch.
vector_size_t probeRowCount_{1};

// Whether the current probeRow_ has produces a match. Used for left and full
// outer joins.
bool probeRowHasMatch_{false};
Expand All @@ -336,6 +287,10 @@ class NestedLoopJoinProbe : public Operator {

// Stores the data for build vectors (right side of the join).
std::optional<std::vector<RowVectorPtr>> buildVectors_;
bool buildSideEmpty_{false};

// Total number of records from the build side (across all vectors).
vector_size_t buildRowCount_{0};

// Index into `buildVectors_` for the build vector being currently processed.
size_t buildIndex_{0};
Expand Down
3 changes: 1 addition & 2 deletions velox/exec/tests/NestedLoopJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ TEST_F(NestedLoopJoinTest, basicCrossJoin) {

OperatorTestBase::assertQuery(
params,
"SELECT * FROM t, "
"(SELECT * FROM UNNEST (ARRAY[10, 17, 10, 17, 10, 17, 10, 17])) u");
"SELECT * FROM t, (SELECT * FROM UNNEST (ARRAY[10, 17, 10, 17, 10, 17, 10, 17])) u");
}

TEST_F(NestedLoopJoinTest, outerJoinWithoutCondition) {
Expand Down

0 comments on commit 62807ea

Please sign in to comment.