From 613769d08e6c4de6ef4cedbc03b6ae8fdedee5a3 Mon Sep 17 00:00:00 2001 From: Maksym Rymar Date: Sun, 12 May 2024 07:34:14 +0300 Subject: [PATCH] DRILL-8480: make Nested Loop Join operator properly process empty batches and batches with new schema (#2897) --- .../physical/impl/join/NestedLoopJoin.java | 7 ++- .../impl/join/NestedLoopJoinBatch.java | 9 +++- .../impl/join/NestedLoopJoinTemplate.java | 52 +++++++++++-------- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java index 725c46d1ba8..60b8cb169b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java @@ -19,6 +19,7 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.RecordBatch; @@ -33,7 +34,9 @@ public interface NestedLoopJoin { public static TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition<>(NestedLoopJoin.class, NestedLoopJoinTemplate.class); - public void setupNestedLoopJoin(FragmentContext context, RecordBatch left, + public void setupNestedLoopJoin(FragmentContext context, + RecordBatch left, + RecordBatch.IterOutcome leftOutcome, ExpandableHyperContainer rightContainer, LinkedList rightCounts, NestedLoopJoinBatch outgoing); @@ -41,7 +44,7 @@ public void setupNestedLoopJoin(FragmentContext context, RecordBatch left, void setTargetOutputCount(int targetOutputCount); // Produce output records taking into account join type - public int outputRecords(JoinRelType joinType); + public int outputRecords(JoinRelType joinType) throws SchemaChangeException; // Project the record at offset 'leftIndex' in the left input batch into the output container at offset 'outIndex' public void emitLeft(int leftIndex, int outIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index 4e45f7fea78..d50cb85c780 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -22,6 +22,7 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; +import org.apache.drill.common.exceptions.UserException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.calcite.rel.core.JoinRelType; @@ -183,7 +184,7 @@ public IterOutcome innerNext() { default: } } - nljWorker.setupNestedLoopJoin(context, left, rightContainer, rightCounts, this); + nljWorker.setupNestedLoopJoin(context, left, leftUpstream, rightContainer, rightCounts, this); state = BatchState.NOT_FIRST; } @@ -193,7 +194,11 @@ public IterOutcome innerNext() { nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); // invoke the runtime generated method to emit records in the output batch - outputRecords = nljWorker.outputRecords(popConfig.getJoinType()); + try { + outputRecords = nljWorker.outputRecords(popConfig.getJoinType()); + } catch (SchemaChangeException e) { + throw UserException.schemaChangeError(e).build(logger); + } // Set the record count container.setValueCount(outputRecords); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java index daf23523da9..04e39c5f0e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java @@ -18,14 +18,16 @@ package org.apache.drill.exec.physical.impl.join; import org.apache.calcite.rel.core.JoinRelType; -import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; + import javax.inject.Named; import java.util.LinkedList; import java.util.List; @@ -41,8 +43,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { // Current left input batch being processed private RecordBatch left; - // Record count of the left batch currently being processed - private int leftRecordCount; + private BatchSchema leftSchema; + private RecordBatch.IterOutcome leftOutcome; // List of record counts per batch in the hyper container private List rightCounts; @@ -59,20 +61,23 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { * Method initializes necessary state and invokes the doSetup() to set the * input and output value vector references. * - * @param context Fragment context - * @param left Current left input batch being processed + * @param context Fragment context + * @param left Current left input batch being processed + * @param leftOutcome Last left outcome * @param rightContainer Hyper container - * @param rightCounts Counts for each right container - * @param outgoing Output batch + * @param rightCounts Counts for each right container + * @param outgoing Output batch */ @Override public void setupNestedLoopJoin(FragmentContext context, RecordBatch left, + RecordBatch.IterOutcome leftOutcome, ExpandableHyperContainer rightContainer, LinkedList rightCounts, NestedLoopJoinBatch outgoing) { this.left = left; - this.leftRecordCount = left.getRecordCount(); + this.leftSchema = left.getSchema(); + this.leftOutcome = leftOutcome; this.rightCounts = rightCounts; this.outgoing = outgoing; doSetup(context, rightContainer, left, outgoing); @@ -91,10 +96,13 @@ public void setTargetOutputCount(int targetOutputRecords) { * @return the number of records produced in the output batch */ @Override - public int outputRecords(JoinRelType joinType) { + public int outputRecords(JoinRelType joinType) throws SchemaChangeException { int outputIndex = 0; - while (leftRecordCount != 0) { - outputIndex = populateOutgoingBatch(joinType, outputIndex); + + while (leftOutcome != RecordBatch.IterOutcome.NONE && leftOutcome != RecordBatch.IterOutcome.NOT_YET) { + if (left.getRecordCount() != 0) { + outputIndex = populateOutgoingBatch(joinType, outputIndex); + } if (outputIndex >= targetOutputRecords) { break; } @@ -110,7 +118,7 @@ public int outputRecords(JoinRelType joinType) { * If matching record is found both left and right records are written into output batch, * otherwise if join type is LEFT, than only left record is written, right batch record values will be null. * - * @param joinType join type (INNER or LEFT) + * @param joinType join type (INNER or LEFT) * @param outputIndex index to start emitting records at * @return final outputIndex after producing records in the output batch */ @@ -123,7 +131,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) { outer: // for every record in the left batch - for (; nextLeftRecordToProcess < leftRecordCount; nextLeftRecordToProcess++) { + for (; nextLeftRecordToProcess < left.getRecordCount(); nextLeftRecordToProcess++) { // for every batch on the right for (; nextRightBatchToProcess < rightCounts.size(); nextRightBatchToProcess++) { int rightRecordCount = rightCounts.get(nextRightBatchToProcess); @@ -176,27 +184,25 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) { * Resets some internal state which indicates the next records to process in the left and right batches, * also fetches the next left input batch. */ - private void resetAndGetNextLeft(int outputIndex) { + private void resetAndGetNextLeft(int outputIndex) throws SchemaChangeException { for (VectorWrapper vw : left) { vw.getValueVector().clear(); } tracker.reset(); - RecordBatch.IterOutcome leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left); + + leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left); switch (leftOutcome) { case OK_NEW_SCHEMA: - throw new DrillRuntimeException("Nested loop join does not handle schema change. Schema change" + - " found on the left side of NLJ."); - case NONE: - case NOT_YET: - leftRecordCount = 0; - break; + if (!left.getSchema().equals(leftSchema)) { + throw SchemaChangeException.schemaChanged("Nested loop join does not handle schema change. Schema change" + + " found on the left side of NLJ.", leftSchema, left.getSchema()); + } case OK: - outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex); + outgoing.getBatchMemoryManager().update(left, LEFT_INDEX, outputIndex); setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update() RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX), outgoing.getRecordBatchStatsContext()); - leftRecordCount = left.getRecordCount(); break; default: }