Skip to content

Commit

Permalink
Changes to set the memory allocation per operator in query profile.
Browse files Browse the repository at this point in the history
Addressing an memory minimization logic was not considering non-buffered operators.
Handling error cases when memory requirements for buffered or non-buffered cannot be reduced.
  • Loading branch information
HanumathRao authored and sohami committed Apr 22, 2019
1 parent 5f2db79 commit 130252a
Show file tree
Hide file tree
Showing 25 changed files with 348 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ public class OpProfileDef {
public int operatorId;
public int operatorType;
public int incomingCount;
public long optimalMemoryAllocation;

public OpProfileDef(int operatorId, int operatorType, int incomingCount) {
public OpProfileDef(int operatorId, int operatorType, int incomingCount, long optimalMemoryAllocation) {
this.operatorId = operatorId;
this.operatorType = operatorType;
this.incomingCount = incomingCount;
this.optimalMemoryAllocation = optimalMemoryAllocation;
}
public int getOperatorId(){
return operatorId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl conte
} else {
OpProfileDef def =
new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(),
OperatorUtilities.getChildCount(popConfig));
OperatorUtilities.getChildCount(popConfig), popConfig.getMaxAllocation());
this.stats = context.getStats().newOperatorStats(def, allocator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class OperatorStats {
public long[] recordsReceivedByInput;
public long[] batchesReceivedByInput;
private long[] schemaCountByInput;

private long optimalMemoryAllocation;

private boolean inProcessing = false;
private boolean inSetup = false;
Expand All @@ -62,7 +62,7 @@ public class OperatorStats {
private int inputCount;

public OperatorStats(OpProfileDef def, BufferAllocator allocator){
this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator);
this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator, def.optimalMemoryAllocation);
}

/**
Expand All @@ -74,7 +74,7 @@ public OperatorStats(OpProfileDef def, BufferAllocator allocator){
*/

public OperatorStats(OperatorStats original, boolean isClean) {
this(original.operatorId, original.operatorType, original.inputCount, original.allocator);
this(original.operatorId, original.operatorType, original.inputCount, original.allocator, original.optimalMemoryAllocation);

if ( !isClean ) {
inProcessing = original.inProcessing;
Expand All @@ -88,7 +88,7 @@ public OperatorStats(OperatorStats original, boolean isClean) {
}

@VisibleForTesting
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) {
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) {
super();
this.allocator = allocator;
this.operatorId = operatorId;
Expand All @@ -97,6 +97,7 @@ public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAll
this.recordsReceivedByInput = new long[inputCount];
this.batchesReceivedByInput = new long[inputCount];
this.schemaCountByInput = new long[inputCount];
this.optimalMemoryAllocation = initialAllocation;
}

private String assertionError(String msg){
Expand Down Expand Up @@ -207,6 +208,7 @@ public OperatorProfile getProfile() {
.setOperatorId(operatorId) //
.setSetupNanos(setupNanos) //
.setProcessNanos(processingNanos)
.setOptimalMemAllocation(optimalMemoryAllocation)
.setWaitNanos(waitNanos);

if (allocator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,6 @@ public boolean enforceWidth() {
return getMinParallelizationWidth() > 1;
}

@Override
@JsonIgnore
public long getInitialAllocation() {
return 0;
}

@Override
@JsonIgnore
public long getMaxAllocation() {
return 0;
}

@Override
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorCon
}
//Creating new stat for appending to list
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorUtilities.getChildCount(config)),
config.getOperatorType(), OperatorUtilities.getChildCount(config), config.getMaxAllocation()),
this.oContext.getAllocator());
fragmentContext.getStats().addOperatorStats(this.stats);
this.fragmentContext = fragmentContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner.fragment;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.function.CheckedConsumer;
Expand All @@ -29,7 +30,7 @@
import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig;
import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,6 +47,7 @@
* fragment is based on the cluster state and provided queue configuration.
*/
public class DistributedQueueParallelizer extends SimpleParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class);
private final boolean planHasMemory;
private final QueryContext queryContext;
private final QueryResourceManager rm;
Expand All @@ -65,9 +67,13 @@ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
if (!planHasMemory) {
final DrillNode drillEndpointNode = DrillNode.create(endpoint);
if (operator.isBufferedOperator(queryContext)) {
return operators.get(drillEndpointNode).get(operator);
Long operatorsMemory = operators.get(drillEndpointNode).get(operator);
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory);
return operatorsMemory;
} else {
return operator.getMaxAllocation();
Long nonBufferedMemory = (long)operator.getCost().getMemoryCost();
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, nonBufferedMemory);
return nonBufferedMemory;
}
}
else {
Expand All @@ -92,10 +98,11 @@ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
*/
public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
Map<DrillbitEndpoint, String> onlineEndpointUUIDs) throws ExecutionSetupException {

if (planHasMemory) {
logger.debug(" Plan already has memory settings. Adjustment of the memory is skipped");
return;
}
logger.info(" Memory adjustment phase triggered");

final Map<DrillNode, String> onlineDrillNodeUUIDs = onlineEndpointUUIDs.entrySet().stream()
.collect(Collectors.toMap(x -> DrillNode.create(x.getKey()), x -> x.getValue()));
Expand All @@ -112,7 +119,7 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,

for (Wrapper wrapper : roots) {
traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> {
MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext);
MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext, rm.minimumOperatorMemory());
fragment.getNode().getRoot().accept(calculator, fragment);
NodeResources.merge(totalNodeResources, fragment.getResourceMap());
operators.entrySet()
Expand All @@ -122,6 +129,10 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
}));
}

if (logger.isDebugEnabled()) {
logger.debug(" Total node resource requirements for the plan is {}", getJSONFromResourcesMap(totalNodeResources));
}

final QueryQueueConfig queueConfig;
try {
queueConfig = this.rm.selectQueue(max(totalNodeResources.values()));
Expand All @@ -130,8 +141,10 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
}

Map<DrillNode,
List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits(operators, totalNodeResources,
queueConfig.getMaxQueryMemoryInMBPerNode());
List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators =
ensureOperatorMemoryWithinLimits(operators, totalNodeResources,
convertMBToBytes(Math.min(queueConfig.getMaxQueryMemoryInMBPerNode(),
queueConfig.getQueueTotalMemoryInMB(onlineEndpointUUIDs.size()))));
memoryAdjustedOperators.entrySet().stream().forEach((x) -> {
Map<PhysicalOperator, Long> memoryPerOperator = x.getValue().stream()
.collect(Collectors.toMap(operatorLongPair -> operatorLongPair.getLeft(),
Expand All @@ -140,9 +153,17 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
this.operators.put(x.getKey(), memoryPerOperator);
});

if (logger.isDebugEnabled()) {
logger.debug(" Total node resource requirements after adjustment {}", getJSONFromResourcesMap(totalNodeResources));
}

this.rm.setCost(convertToUUID(totalNodeResources, onlineDrillNodeUUIDs));
}

private long convertMBToBytes(long value) {
return value * 1024 * 1024;
}

private Map<String, NodeResources> convertToUUID(Map<DrillNode, NodeResources> nodeResourcesMap,
Map<DrillNode, String> onlineDrillNodeUUIDs) {
Map<String, NodeResources> nodeResourcesPerUUID = new HashMap<>();
Expand Down Expand Up @@ -172,50 +193,81 @@ private NodeResources max(Collection<NodeResources> resources) {
*/
private Map<DrillNode, List<Pair<PhysicalOperator, Long>>>
ensureOperatorMemoryWithinLimits(Map<DrillNode, List<Pair<PhysicalOperator, Long>>> memoryPerOperator,
Map<DrillNode, NodeResources> nodeResourceMap, long nodeLimit) {
Map<DrillNode, NodeResources> nodeResourceMap, long nodeLimit) throws ExecutionSetupException {
// Get the physical operators which are above the node memory limit.
Map<DrillNode, List<Pair<PhysicalOperator, Long>>> onlyMemoryAboveLimitOperators = new HashMap<>();
memoryPerOperator.entrySet().stream().forEach((entry) -> {
onlyMemoryAboveLimitOperators.putIfAbsent(entry.getKey(), new ArrayList<>());
if (nodeResourceMap.get(entry.getKey()).getMemoryInBytes() > nodeLimit) {
onlyMemoryAboveLimitOperators.get(entry.getKey()).addAll(entry.getValue());
}
});

Map<DrillNode,
List<Pair<PhysicalOperator, Long>>> onlyMemoryAboveLimitOperators = memoryPerOperator.entrySet()
.stream()
.filter(entry -> nodeResourceMap.get(entry.getKey()).getMemoryInBytes() > nodeLimit)
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));

// Compute the total memory required by the physical operators on the drillbits which are above node limit.
// Then use the total memory to adjust the memory requirement based on the permissible node limit.
Map<DrillNode, List<Pair<PhysicalOperator, Long>>> memoryAdjustedDrillbits = new HashMap<>();
onlyMemoryAboveLimitOperators.entrySet().stream().forEach(
entry -> {
Long totalMemory = entry.getValue().stream().mapToLong(Pair::getValue).sum();
List<Pair<PhysicalOperator, Long>> adjustedMemory = entry.getValue().stream().map(operatorMemory -> {
CheckedConsumer.throwingConsumerWrapper(entry -> {
Long totalBufferedOperatorsMemoryReq = entry.getValue().stream().mapToLong(Pair::getValue).sum();
Long nonBufferedOperatorsMemoryReq = nodeResourceMap.get(entry.getKey()).getMemoryInBytes() - totalBufferedOperatorsMemoryReq;
Long bufferedOperatorsMemoryLimit = nodeLimit - nonBufferedOperatorsMemoryReq;
if (bufferedOperatorsMemoryLimit < 0 || nonBufferedOperatorsMemoryReq < 0) {
logger.error(" Operator memory requirements for buffered operators {} or non buffered operators {} is negative", bufferedOperatorsMemoryLimit,
nonBufferedOperatorsMemoryReq);
throw new ExecutionSetupException("Operator memory requirements for buffered operators " + bufferedOperatorsMemoryLimit + " or non buffered operators " +
nonBufferedOperatorsMemoryReq + " is less than zero");
}
List<Pair<PhysicalOperator, Long>> adjustedMemory = entry.getValue().stream().map(operatorAndMemory -> {
// formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit.
return Pair.of(operatorMemory.getKey(), (long) Math.ceil(operatorMemory.getValue()/totalMemory * nodeLimit));
return Pair.of(operatorAndMemory.getKey(),
Math.max(this.rm.minimumOperatorMemory(),
(long) Math.ceil(operatorAndMemory.getValue()/totalBufferedOperatorsMemoryReq * bufferedOperatorsMemoryLimit)));
}).collect(Collectors.toList());
memoryAdjustedDrillbits.put(entry.getKey(), adjustedMemory);
NodeResources nodeResources = nodeResourceMap.get(entry.getKey());
nodeResources.setMemoryInBytes(adjustedMemory.stream().mapToLong(Pair::getValue).sum());
}
nodeResources.setMemoryInBytes(nonBufferedOperatorsMemoryReq + adjustedMemory.stream().mapToLong(Pair::getValue).sum());
})
);

checkIfWithinLimit(nodeResourceMap, nodeLimit);

// Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not
// adjusted for memory.
Map<DrillNode, List<Pair<PhysicalOperator, Long>>> allDrillbits = new HashMap<>();
memoryPerOperator.entrySet().stream().filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach(
operatorMemory -> {
allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
}
);
Map<DrillNode,
List<Pair<PhysicalOperator, Long>>> allDrillbits = memoryPerOperator.entrySet()
.stream()
.filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));

memoryAdjustedDrillbits.entrySet().stream().forEach(
operatorMemory -> {
allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
}
);
operatorMemory -> allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue()));

// At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and
// the ratio of their requirements.
return allDrillbits;
}

private void checkIfWithinLimit(Map<DrillNode, NodeResources> nodeResourcesMap, long nodeLimit) throws ExecutionSetupException {
for (Map.Entry<DrillNode, NodeResources> entry : nodeResourcesMap.entrySet()) {
if (entry.getValue().getMemoryInBytes() > nodeLimit) {
logger.error(" Memory requirement for the query cannot be adjusted." +
" Memory requirement {} (in bytes) for a node {} is greater than limit {}", entry.getValue()
.getMemoryInBytes(), entry.getKey(), nodeLimit);
throw new ExecutionSetupException("Minimum memory requirement "
+ entry.getValue().getMemoryInBytes() + " for a node " + entry.getKey() + " is greater than limit: " + nodeLimit);
}
}
}

private String getJSONFromResourcesMap(Map<DrillNode, NodeResources> resourcesMap) {
String json = "";
try {
json = new ObjectMapper().writeValueAsString(resourcesMap.entrySet()
.stream()
.collect(Collectors.toMap(entry -> entry.getKey()
.toString(), Map.Entry::getValue)));
} catch (JsonProcessingException exception) {
logger.error(" Cannot convert the Node resources map to json ");
}

return json;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
*/
package org.apache.drill.exec.planner.fragment;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.util.memory.ZKQueueMemoryAllocationUtilities;
import org.apache.drill.exec.work.foreman.ForemanSetupException;

import org.apache.drill.shaded.guava.com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fragment.class);

Expand Down
Loading

0 comments on commit 130252a

Please sign in to comment.