Skip to content

Commit

Permalink
DRILL-6845: Removed parameter to shouldSpill, use post build to check…
Browse files Browse the repository at this point in the history
… probe batches, and set num partitions
  • Loading branch information
Ben-Zvi committed Jan 14, 2019
1 parent 0079c86 commit 823cebc
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode
int pos = currentVectorContainer.appendRow(buildContainer,ind);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
if ( pos == recordsPerBatch ) {
boolean needsSpill = isSpilled || calc.shouldSpill(currentVectorContainer);
boolean needsSpill = isSpilled || calc.shouldSpill();
completeAnInnerBatch(true, needsSpill);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
}

HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
HashJoinMemoryCalculator.HashJoinSpillControl spillControlCalc;
HashJoinMemoryCalculator.BuildSidePartitioning currentCalc; // may be a spill control calc, or buildCalc

{
// Initializing build calculator
Expand All @@ -953,7 +953,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
HashJoinMemoryCalculator calc = getCalculatorImpl();

calc.initialize(doMemoryCalculation);
buildCalc = calc.next();
currentCalc = buildCalc = calc.next();

buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
buildBatch,
Expand All @@ -973,9 +973,32 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
// Do auto tuning
buildCalc = partitionNumTuning(maxBatchRowCount, buildCalc);
}
// to be used in case of a Semi Join skippinging duplicates
spillControlCalc = new HashJoinSpillControlImpl(allocator, RECORDS_PER_BATCH,
(int) context.getOptions().getOption(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR), batchMemoryManager);
if ( semiSkipDuplicates ) {
// in case of a Semi Join skippinging duplicates, use a "spill control" calc
// (may revert back to the buildCalc if the code decides to stop skipping)
currentCalc = new HashJoinSpillControlImpl(allocator, RECORDS_PER_BATCH,
(int) context.getOptions().getOption(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR),
batchMemoryManager, context);

// calculates the max number of partitions possible
if ( spilledState.isFirstCycle() && doMemoryCalculation ) {
currentCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
buildBatch,
probeBatch,
buildJoinColumns,
probeSideIsEmpty.booleanValue(),
allocator.getLimit(),
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
maxBatchRowCount,
maxBatchRowCount,
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);

numPartitions = currentCalc.getNumPartitions();
}
}
}

if (spilledState.isFirstCycle()) {
Expand Down Expand Up @@ -1031,10 +1054,6 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
}
}

// In the case of Semi Join Skipping Duplicates (And using a Hash Table for that) - use a special memory
// calculator, else use the regular build-phase calculator
HashJoinMemoryCalculator.HashJoinSpillControl spillCalc = semiSkipDuplicates ? spillControlCalc : buildCalc;

// For every record in the build batch, hash the key columns and keep the result
for (int ind = 0; ind < currentRecordCount; ind++) {
int hashCode = spilledState.isFirstCycle() ? partitions[0].getBuildHashCode(ind)
Expand All @@ -1054,7 +1073,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
partn.stopSkippingDuplicates();
}
semiSkipDuplicates = false;
spillCalc = buildCalc; // back to using the regular calc
currentCalc = buildCalc; // back to using the regular calc
}
logger.debug("Semi {} skipping duplicates after receiving {} rows with {} percent duplicates",
semiSkipDuplicates ? "to continue" : "stopped", semiCountTotal, (100 * semiCountDuplicates) / semiCountTotal);
Expand All @@ -1065,7 +1084,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
}
}
// Append the new inner row to the appropriate partition; spill (that partition) if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, spillCalc); // may spill if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, currentCalc); // may spill if needed
}

if ( read_right_HV_vector != null ) {
Expand All @@ -1085,12 +1104,15 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
}
}

int numPartitionsSpilled = 0;

// Move the remaining current batches into their temp lists, or spill
// them if the partition is spilled. Add the spilled partitions into
// the spilled partitions list
if ( numPartitions > 1 ) { // a single partition needs no completion
for (HashPartition partn : partitions) {
partn.completeAnInnerBatch(false, partn.isSpilled());
if ( partn.isSpilled() ) { numPartitionsSpilled++; }
}
}

Expand All @@ -1102,8 +1124,8 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
return leftUpstream;
}

HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = currentCalc.next();
postBuildCalc.initialize(probeSideIsEmpty.booleanValue(), numPartitionsSpilled);

//
// Traverse all the in-memory partitions' incoming batches, and build their hash tables
Expand All @@ -1118,13 +1140,13 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
}

try {
if (semiSkipDuplicates) {
if (postBuildCalc.shouldSpill()) {
// Spill this partition if we need to make room
partn.spillThisPartition();
} else if (semiSkipDuplicates) {
// All in memory, and already got the Hash Table - just build the containers
// (No additional memory is needed, hence no need for any new spill)
partn.buildContainers();
} else if (postBuildCalc.shouldSpill(null)) {
// Spill this partition if we need to make room
partn.spillThisPartition();
} else {
// Only build hash tables for partitions that are not spilled
partn.buildContainersHashTableAndHelper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.exec.record.RecordBatch;

Expand Down Expand Up @@ -106,7 +105,7 @@ public long getMaxReservedMemory() {
}

@Override
public boolean shouldSpill(VectorContainer currentVectorContainer) {
public boolean shouldSpill() {
return partitionStatSet.getNumInMemoryBatches() > maxNumInMemBatches;
}

Expand Down Expand Up @@ -141,7 +140,7 @@ public MechanicalPostBuildCalculations(final int maxNumInMemBatches,
}

@Override
public void initialize(boolean probeEmty) {
public void initialize(boolean probeEmty, int numPartitionsSpilled) {
}

@Override
Expand All @@ -150,7 +149,7 @@ public int getProbeRecordsPerBatch() {
}

@Override
public boolean shouldSpill(VectorContainer currentVectorContainer) {
public boolean shouldSpill() {
return partitionStatSet.getNumInMemoryBatches() > maxNumInMemBatches;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -131,8 +130,9 @@ interface PostBuildCalculations extends HashJoinStateCalculator<HashJoinMemoryCa
/**
* Initializes the calculator with additional information needed.
* @param probeEmty True if the probe is empty. False otherwise.
* @param numPartitionsSpilled
*/
void initialize(boolean probeEmty);
void initialize(boolean probeEmty, int numPartitionsSpilled);

int getProbeRecordsPerBatch();

Expand All @@ -152,7 +152,7 @@ interface PartitionStat {
}

interface HashJoinSpillControl {
boolean shouldSpill(VectorContainer currentVectorContainer);
boolean shouldSpill();
}
/**
* This class represents the memory size statistics for an entire set of partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.common.map.CaseInsensitiveMap;
Expand Down Expand Up @@ -136,7 +135,7 @@ public long getMaxReservedMemory() {
}

@Override
public boolean shouldSpill(VectorContainer currentVectorContainer) {
public boolean shouldSpill() {
return false;
}

Expand Down Expand Up @@ -175,7 +174,7 @@ public HashJoinState getState() {
* <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, boolean, long, int, int, int, int, int, int, double)}.
* This will initialize the StateCalculate with the additional information it needs.</li>
* <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number of partitions that fit in memory.</li>
* <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill(VectorContainer)} To determine if spilling needs to occurr.</li>
* <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill()} To determine if spilling needs to occurr.</li>
* <li><b>Step 3:</b> Call {@link #next()} and get the next memory calculator associated with your next state.</li>
* </ul>
* </p>
Expand Down Expand Up @@ -445,7 +444,7 @@ private void calculateMemoryUsage()
}

@Override
public boolean shouldSpill(VectorContainer currentVectorContainer) {
public boolean shouldSpill() {
Preconditions.checkState(initialized);

long consumedMemory = reservedMemory;
Expand Down Expand Up @@ -515,7 +514,7 @@ public NoopPostBuildCalculationsImpl(final int recordsPerPartitionBatchProbe) {
}

@Override
public void initialize(boolean hasProbeData) {
public void initialize(boolean hasProbeData, int numPartitionsSpilled) {
}

@Override
Expand All @@ -524,7 +523,7 @@ public int getProbeRecordsPerBatch() {
}

@Override
public boolean shouldSpill(VectorContainer currentVectorContainer) {
public boolean shouldSpill() {
return false;
}

Expand Down Expand Up @@ -556,9 +555,9 @@ public String makeDebugString() {
* <h1>Lifecycle</h1>
* <p>
* <ul>
* <li><b>Step 1:</b> Call {@link #initialize(boolean)}. This
* <li><b>Step 1:</b> Call {@link PostBuildCalculations#initialize(boolean, int)}. This
* gives the {@link HashJoinStateCalculator} additional information it needs to compute memory requirements.</li>
* <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill(VectorContainer)}. This tells
* <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill()}. This tells
* you which build side partitions need to be spilled in order to make room for probing.</li>
* <li><b>Step 3:</b> Call {@link #next()}. After you are done probing
* and partitioning the probe side, get the next calculator.</li>
Expand Down Expand Up @@ -625,7 +624,7 @@ public PostBuildCalculationsImpl(final boolean firstCycle,
}

@Override
public void initialize(boolean probeEmpty) {
public void initialize(boolean probeEmpty, int numPartitionsSpilled) {
Preconditions.checkState(!initialized);
// If we had probe data before there should still be probe data now.
// If we didn't have probe data before we could get some new data now.
Expand Down Expand Up @@ -731,7 +730,7 @@ public static long calculateReservedMemory(final int numSpilledPartitions,
}

@Override
public boolean shouldSpill(VectorContainer currentVectorContainer) {
public boolean shouldSpill() {
Preconditions.checkState(initialized);

if (probeEmpty) {
Expand Down
Loading

0 comments on commit 823cebc

Please sign in to comment.