Skip to content

Commit

Permalink
DRILL-6806: Moving code for a HashAgg partition into seperate class.
Browse files Browse the repository at this point in the history
  • Loading branch information
ilooner committed Oct 29, 2018
1 parent 5859968 commit ff8d407
Show file tree
Hide file tree
Showing 10 changed files with 774 additions and 418 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.aggregate;

public interface HashAggBatchAllocator {
void allocateOutgoing(int records);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.aggregate;

public interface HashAggBatchHolder {
void outputValues();
void clear();
int getNumPendingOutput();
void setup();
int getTargetBatchRowCount();
void setTargetBatchRowCount(int batchRowCount);
int getCurrentRowCount();
boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.aggregate;

/**
* TODO this is an incremental change to move existing code into separate classes. This interface and the corresponding
* Implementation need to be change significantly moving forward.
*/
public interface HashAggMemoryCalculator {
/**
* Use reserved values memory (if available) to try and preemp an OOM
*/
void useReservedValuesMemory();

/**
* Use reserved outgoing output memory (if available) to try and preemp an OOM
*/
void useReservedOutgoingMemory();

/**
* Restore the reserve memory (both)
*/
void restoreReservedMemory();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.aggregate;

import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;

public class HashAggMemoryCalculatorImpl implements HashAggMemoryCalculator {
private boolean initialized = false;

private final BufferAllocator allocator;
private final long estOutgoingAllocSize;
private final long estValuesBatchSize;

private long reserveOutgoingMemory;
private long reserveValueBatchMemory;

public HashAggMemoryCalculatorImpl(
final BufferAllocator allocator,
final long estOutgoingAllocSize,
final long estValuesBatchSize,
final long reserveOutgoingMemory,
final long reserveValueBatchMemory) {
Preconditions.checkState(!initialized);
initialized = true;

this.allocator = Preconditions.checkNotNull(allocator);
this.estOutgoingAllocSize = estOutgoingAllocSize;
this.estValuesBatchSize = estValuesBatchSize;
this.reserveOutgoingMemory = reserveOutgoingMemory;
this.reserveValueBatchMemory = reserveValueBatchMemory;
}

@Override
public void useReservedValuesMemory() {
// try to preempt an OOM by using the reserved memory
long reservedMemory = reserveValueBatchMemory;
if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }

reserveValueBatchMemory = 0;
}

@Override
public void useReservedOutgoingMemory() {
Preconditions.checkState(initialized);

// try to preempt an OOM by using the reserved memory
long reservedMemory = reserveOutgoingMemory;
if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }

reserveOutgoingMemory = 0;
}

@Override
public void restoreReservedMemory() {
Preconditions.checkState(initialized);

if ( 0 == reserveOutgoingMemory ) { // always restore OutputValues first (needed for spilling)
long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
if ( memAvail > estOutgoingAllocSize) {
allocator.setLimit(allocator.getLimit() - estOutgoingAllocSize);
reserveOutgoingMemory = estOutgoingAllocSize;
}
}
if ( 0 == reserveValueBatchMemory ) {
long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
if ( memAvail > estValuesBatchSize) {
allocator.setLimit(allocator.getLimit() - estValuesBatchSize);
reserveValueBatchMemory = estValuesBatchSize;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.aggregate;

public class HashAggMemoryCalculatorNoop implements HashAggMemoryCalculator {
@Override
public void useReservedValuesMemory() {

}

@Override
public void useReservedOutgoingMemory() {

}

@Override
public void restoreReservedMemory() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.aggregate;

import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
import org.apache.drill.exec.record.RecordBatch;

import java.io.IOException;

public interface HashAggPartition {
void updateBatches();
void setup(RecordBatch newIncoming) throws SchemaChangeException, IOException;
boolean isSpilled();
int getBatchHolderCount();
int getSpilledBatchesCount();
int getPartitionIndex();
boolean doneOutputting();
int outputCurrentBatch();
void spill();
HashAggSpilledPartition finishSpilling(int originalPartition);
void addStats(HashTableStats hashTableStats);
void cleanup();
void reinitPartition();
boolean hasPendingRows();

void addBatchHolder(HashAggBatchHolder batchHolder);
int buildHashcode(int incomingRowIdx) throws SchemaChangeException;
HashAggBatchHolder getBatchHolder(int index);
HashTable.PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode, int batchSize) throws SchemaChangeException, RetryAfterSpillException;

void resetOutBatchIndex();
}
Loading

0 comments on commit ff8d407

Please sign in to comment.