Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-8480: Cleanup before finished. 0 out of 1 streams have finished #2897

Merged
merged 1 commit into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
Expand All @@ -33,15 +34,17 @@ public interface NestedLoopJoin {
public static TemplateClassDefinition<NestedLoopJoin> TEMPLATE_DEFINITION =
new TemplateClassDefinition<>(NestedLoopJoin.class, NestedLoopJoinTemplate.class);

public void setupNestedLoopJoin(FragmentContext context, RecordBatch left,
public void setupNestedLoopJoin(FragmentContext context,
RecordBatch left,
RecordBatch.IterOutcome leftOutcome,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing);

void setTargetOutputCount(int targetOutputCount);

// Produce output records taking into account join type
public int outputRecords(JoinRelType joinType);
public int outputRecords(JoinRelType joinType) throws SchemaChangeException;

// Project the record at offset 'leftIndex' in the left input batch into the output container at offset 'outIndex'
public void emitLeft(int leftIndex, int outIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import org.apache.drill.common.exceptions.UserException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.calcite.rel.core.JoinRelType;
Expand Down Expand Up @@ -183,7 +184,7 @@ public IterOutcome innerNext() {
default:
}
}
nljWorker.setupNestedLoopJoin(context, left, rightContainer, rightCounts, this);
nljWorker.setupNestedLoopJoin(context, left, leftUpstream, rightContainer, rightCounts, this);
state = BatchState.NOT_FIRST;
}

Expand All @@ -193,7 +194,11 @@ public IterOutcome innerNext() {
nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());

// invoke the runtime generated method to emit records in the output batch
outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
try {
outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
} catch (SchemaChangeException e) {
throw UserException.schemaChangeError(e).build(logger);
}

// Set the record count
container.setValueCount(outputRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.drill.exec.physical.impl.join;

import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;

import javax.inject.Named;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -41,8 +43,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
// Current left input batch being processed
private RecordBatch left;

// Record count of the left batch currently being processed
private int leftRecordCount;
private BatchSchema leftSchema;
private RecordBatch.IterOutcome leftOutcome;

// List of record counts per batch in the hyper container
private List<Integer> rightCounts;
Expand All @@ -59,20 +61,23 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
* Method initializes necessary state and invokes the doSetup() to set the
* input and output value vector references.
*
* @param context Fragment context
* @param left Current left input batch being processed
* @param context Fragment context
* @param left Current left input batch being processed
* @param leftOutcome Last left outcome
* @param rightContainer Hyper container
* @param rightCounts Counts for each right container
* @param outgoing Output batch
* @param rightCounts Counts for each right container
* @param outgoing Output batch
*/
@Override
public void setupNestedLoopJoin(FragmentContext context,
RecordBatch left,
RecordBatch.IterOutcome leftOutcome,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing) {
this.left = left;
this.leftRecordCount = left.getRecordCount();
this.leftSchema = left.getSchema();
this.leftOutcome = leftOutcome;
this.rightCounts = rightCounts;
this.outgoing = outgoing;
doSetup(context, rightContainer, left, outgoing);
Expand All @@ -91,10 +96,13 @@ public void setTargetOutputCount(int targetOutputRecords) {
* @return the number of records produced in the output batch
*/
@Override
public int outputRecords(JoinRelType joinType) {
public int outputRecords(JoinRelType joinType) throws SchemaChangeException {
int outputIndex = 0;
while (leftRecordCount != 0) {
outputIndex = populateOutgoingBatch(joinType, outputIndex);

while (leftOutcome != RecordBatch.IterOutcome.NONE && leftOutcome != RecordBatch.IterOutcome.NOT_YET) {
if (left.getRecordCount() != 0) {
outputIndex = populateOutgoingBatch(joinType, outputIndex);
}
if (outputIndex >= targetOutputRecords) {
break;
}
Expand All @@ -110,7 +118,7 @@ public int outputRecords(JoinRelType joinType) {
* If matching record is found both left and right records are written into output batch,
* otherwise if join type is LEFT, than only left record is written, right batch record values will be null.
*
* @param joinType join type (INNER or LEFT)
* @param joinType join type (INNER or LEFT)
* @param outputIndex index to start emitting records at
* @return final outputIndex after producing records in the output batch
*/
Expand All @@ -123,7 +131,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {

outer:
// for every record in the left batch
for (; nextLeftRecordToProcess < leftRecordCount; nextLeftRecordToProcess++) {
for (; nextLeftRecordToProcess < left.getRecordCount(); nextLeftRecordToProcess++) {
// for every batch on the right
for (; nextRightBatchToProcess < rightCounts.size(); nextRightBatchToProcess++) {
int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
Expand Down Expand Up @@ -176,27 +184,25 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
* Resets some internal state which indicates the next records to process in the left and right batches,
* also fetches the next left input batch.
*/
private void resetAndGetNextLeft(int outputIndex) {
private void resetAndGetNextLeft(int outputIndex) throws SchemaChangeException {
for (VectorWrapper<?> vw : left) {
vw.getValueVector().clear();
}
tracker.reset();
RecordBatch.IterOutcome leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);

leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
switch (leftOutcome) {
case OK_NEW_SCHEMA:
throw new DrillRuntimeException("Nested loop join does not handle schema change. Schema change" +
" found on the left side of NLJ.");
case NONE:
case NOT_YET:
leftRecordCount = 0;
break;
if (!left.getSchema().equals(leftSchema)) {
throw SchemaChangeException.schemaChanged("Nested loop join does not handle schema change. Schema change" +
" found on the left side of NLJ.", leftSchema, left.getSchema());
}
case OK:
outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex);
outgoing.getBatchMemoryManager().update(left, LEFT_INDEX, outputIndex);
setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
outgoing.getRecordBatchStatsContext());
leftRecordCount = left.getRecordCount();
break;
default:
}
Expand Down
Loading