Skip to content

Commit

Permalink
DRILL-8480: make Nested Loop Join operator properly process empty bat…
Browse files Browse the repository at this point in the history
…ches and batches with new schema (apache#2897)
  • Loading branch information
rymarm authored May 12, 2024
1 parent 3a1e558 commit 613769d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
Expand All @@ -33,15 +34,17 @@ public interface NestedLoopJoin {
public static TemplateClassDefinition<NestedLoopJoin> TEMPLATE_DEFINITION =
new TemplateClassDefinition<>(NestedLoopJoin.class, NestedLoopJoinTemplate.class);

public void setupNestedLoopJoin(FragmentContext context, RecordBatch left,
public void setupNestedLoopJoin(FragmentContext context,
RecordBatch left,
RecordBatch.IterOutcome leftOutcome,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing);

void setTargetOutputCount(int targetOutputCount);

// Produce output records taking into account join type
public int outputRecords(JoinRelType joinType);
public int outputRecords(JoinRelType joinType) throws SchemaChangeException;

// Project the record at offset 'leftIndex' in the left input batch into the output container at offset 'outIndex'
public void emitLeft(int leftIndex, int outIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import org.apache.drill.common.exceptions.UserException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.calcite.rel.core.JoinRelType;
Expand Down Expand Up @@ -183,7 +184,7 @@ public IterOutcome innerNext() {
default:
}
}
nljWorker.setupNestedLoopJoin(context, left, rightContainer, rightCounts, this);
nljWorker.setupNestedLoopJoin(context, left, leftUpstream, rightContainer, rightCounts, this);
state = BatchState.NOT_FIRST;
}

Expand All @@ -193,7 +194,11 @@ public IterOutcome innerNext() {
nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());

// invoke the runtime generated method to emit records in the output batch
outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
try {
outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
} catch (SchemaChangeException e) {
throw UserException.schemaChangeError(e).build(logger);
}

// Set the record count
container.setValueCount(outputRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.drill.exec.physical.impl.join;

import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;

import javax.inject.Named;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -41,8 +43,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
// Current left input batch being processed
private RecordBatch left;

// Record count of the left batch currently being processed
private int leftRecordCount;
private BatchSchema leftSchema;
private RecordBatch.IterOutcome leftOutcome;

// List of record counts per batch in the hyper container
private List<Integer> rightCounts;
Expand All @@ -59,20 +61,23 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
* Method initializes necessary state and invokes the doSetup() to set the
* input and output value vector references.
*
* @param context Fragment context
* @param left Current left input batch being processed
* @param context Fragment context
* @param left Current left input batch being processed
* @param leftOutcome Last left outcome
* @param rightContainer Hyper container
* @param rightCounts Counts for each right container
* @param outgoing Output batch
* @param rightCounts Counts for each right container
* @param outgoing Output batch
*/
@Override
public void setupNestedLoopJoin(FragmentContext context,
RecordBatch left,
RecordBatch.IterOutcome leftOutcome,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing) {
this.left = left;
this.leftRecordCount = left.getRecordCount();
this.leftSchema = left.getSchema();
this.leftOutcome = leftOutcome;
this.rightCounts = rightCounts;
this.outgoing = outgoing;
doSetup(context, rightContainer, left, outgoing);
Expand All @@ -91,10 +96,13 @@ public void setTargetOutputCount(int targetOutputRecords) {
* @return the number of records produced in the output batch
*/
@Override
public int outputRecords(JoinRelType joinType) {
public int outputRecords(JoinRelType joinType) throws SchemaChangeException {
int outputIndex = 0;
while (leftRecordCount != 0) {
outputIndex = populateOutgoingBatch(joinType, outputIndex);

while (leftOutcome != RecordBatch.IterOutcome.NONE && leftOutcome != RecordBatch.IterOutcome.NOT_YET) {
if (left.getRecordCount() != 0) {
outputIndex = populateOutgoingBatch(joinType, outputIndex);
}
if (outputIndex >= targetOutputRecords) {
break;
}
Expand All @@ -110,7 +118,7 @@ public int outputRecords(JoinRelType joinType) {
* If matching record is found both left and right records are written into output batch,
* otherwise if join type is LEFT, than only left record is written, right batch record values will be null.
*
* @param joinType join type (INNER or LEFT)
* @param joinType join type (INNER or LEFT)
* @param outputIndex index to start emitting records at
* @return final outputIndex after producing records in the output batch
*/
Expand All @@ -123,7 +131,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {

outer:
// for every record in the left batch
for (; nextLeftRecordToProcess < leftRecordCount; nextLeftRecordToProcess++) {
for (; nextLeftRecordToProcess < left.getRecordCount(); nextLeftRecordToProcess++) {
// for every batch on the right
for (; nextRightBatchToProcess < rightCounts.size(); nextRightBatchToProcess++) {
int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
Expand Down Expand Up @@ -176,27 +184,25 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
* Resets some internal state which indicates the next records to process in the left and right batches,
* also fetches the next left input batch.
*/
private void resetAndGetNextLeft(int outputIndex) {
private void resetAndGetNextLeft(int outputIndex) throws SchemaChangeException {
for (VectorWrapper<?> vw : left) {
vw.getValueVector().clear();
}
tracker.reset();
RecordBatch.IterOutcome leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);

leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
switch (leftOutcome) {
case OK_NEW_SCHEMA:
throw new DrillRuntimeException("Nested loop join does not handle schema change. Schema change" +
" found on the left side of NLJ.");
case NONE:
case NOT_YET:
leftRecordCount = 0;
break;
if (!left.getSchema().equals(leftSchema)) {
throw SchemaChangeException.schemaChanged("Nested loop join does not handle schema change. Schema change" +
" found on the left side of NLJ.", leftSchema, left.getSchema());
}
case OK:
outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex);
outgoing.getBatchMemoryManager().update(left, LEFT_INDEX, outputIndex);
setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
outgoing.getRecordBatchStatsContext());
leftRecordCount = left.getRecordCount();
break;
default:
}
Expand Down

0 comments on commit 613769d

Please sign in to comment.