Skip to content

Commit

Permalink
Addressing Review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
HanumathRao committed May 8, 2019
1 parent 1517a87 commit 38e9a73
Show file tree
Hide file tree
Showing 20 changed files with 389 additions and 480 deletions.
43 changes: 32 additions & 11 deletions exec/java-exec/src/main/java/org/apache/drill/common/DrillNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,38 @@ public static DrillNode create(DrillbitEndpoint endpoint) {
return new DrillNode(endpoint);
}

public boolean equals(Object other) {
if (!(other instanceof DrillNode)) {
return false;
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint)) {
return super.equals(obj);
}
org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint other = (org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint) obj;

DrillbitEndpoint otherEndpoint = ((DrillNode) other).endpoint;
return endpoint.getAddress().equals(otherEndpoint.getAddress()) &&
endpoint.getUserPort() == otherEndpoint.getUserPort() &&
endpoint.getControlPort() == otherEndpoint.getControlPort() &&
endpoint.getDataPort() == otherEndpoint.getDataPort() &&
endpoint.getVersion().equals(otherEndpoint.getVersion());
boolean result = true;
result = result && (endpoint.hasAddress() == other.hasAddress());
if (endpoint.hasAddress()) {
result = result && endpoint.getAddress()
.equals(other.getAddress());
}
result = result && (endpoint.hasUserPort() == other.hasUserPort());
if (endpoint.hasUserPort()) {
result = result && (endpoint.getUserPort() == other.getUserPort());
}
result = result && (endpoint.hasControlPort() == other.hasControlPort());
if (endpoint.hasControlPort()) {
result = result && (endpoint.getControlPort() == other.getControlPort());
}
result = result && (endpoint.hasDataPort() == other.hasDataPort());
if (endpoint.hasDataPort()) {
result = result && (endpoint.getDataPort() == other.getDataPort());
}
result = result && (endpoint.hasVersion() == other.hasVersion());
if (endpoint.hasVersion()) {
result = result && endpoint.getVersion().equals(other.getVersion());
}
return result;
}

@Override
Expand Down Expand Up @@ -81,8 +102,8 @@ public String toString() {
StringBuilder sb = new StringBuilder();

return sb.append("endpoint address :")
.append(endpoint.getAddress())
.append(endpoint.hasAddress() ? endpoint.getAddress() : "no-address")
.append("endpoint user port: ")
.append(endpoint.getUserPort()).toString();
.append(endpoint.hasUserPort() ? endpoint.getUserPort() : "no-userport").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public OperatorStats(OperatorStats original, boolean isClean) {
}

@VisibleForTesting
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) {
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long optimalMemAllocation) {
super();
this.allocator = allocator;
this.operatorId = operatorId;
Expand All @@ -97,7 +97,7 @@ public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAll
this.recordsReceivedByInput = new long[inputCount];
this.batchesReceivedByInput = new long[inputCount];
this.schemaCountByInput = new long[inputCount];
this.optimalMemoryAllocation = initialAllocation;
this.optimalMemoryAllocation = optimalMemAllocation;
}

private String assertionError(String msg){
Expand Down Expand Up @@ -208,7 +208,7 @@ public OperatorProfile getProfile() {
.setOperatorId(operatorId) //
.setSetupNanos(setupNanos) //
.setProcessNanos(processingNanos)
.setOptimalMemAllocation(optimalMemoryAllocation)
.setMaxAllocation(optimalMemoryAllocation)
.setWaitNanos(waitNanos);

if (allocator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* fragment is based on the cluster state and provided queue configuration.
*/
public class DistributedQueueParallelizer extends SimpleParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class);
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class);
private final boolean planHasMemory;
private final QueryContext queryContext;
private final QueryResourceManager rm;
Expand All @@ -62,23 +62,21 @@ public DistributedQueueParallelizer(boolean memoryPlanning, QueryContext queryCo
}

// return the memory computed for a physical operator on a drillbitendpoint.
// At this stage buffered operator memory could have been reduced depending upon
// the selected queue limits.
public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
return (endpoint, operator) -> {
long operatorsMemory = operator.getMaxAllocation();
if (!planHasMemory) {
final DrillNode drillEndpointNode = DrillNode.create(endpoint);
if (operator.isBufferedOperator(queryContext)) {
Long operatorsMemory = operators.get(drillEndpointNode).get(operator);
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory);
return operatorsMemory;
final DrillNode drillEndpointNode = DrillNode.create(endpoint);
operatorsMemory = operators.get(drillEndpointNode).get(operator);
} else {
Long nonBufferedMemory = (long)operator.getCost().getMemoryCost();
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, nonBufferedMemory);
return nonBufferedMemory;
operatorsMemory = (long)operator.getCost().getMemoryCost();
}
}
else {
return operator.getMaxAllocation();
}
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory);
return operatorsMemory;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.drill.exec.planner.fragment;

import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.util.memory.MemoryAllocationUtilities;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;

Expand Down Expand Up @@ -167,27 +167,11 @@ public boolean equals(Object obj) {

public List<PhysicalOperator> getBufferedOperators(QueryContext queryContext) {
List<PhysicalOperator> bufferedOps = new ArrayList<>();
root.accept(new BufferedOpFinder(queryContext), bufferedOps);
root.accept(new MemoryAllocationUtilities.BufferedOpFinder(queryContext), bufferedOps);
return bufferedOps;
}

protected static class BufferedOpFinder extends AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
private final QueryContext context;

public BufferedOpFinder(QueryContext queryContext) {
this.context = queryContext;
}

@Override
public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
throws RuntimeException {
if (op.isBufferedOperator(context)) {
value.add(op);
}
visitChildren(op, value);
return null;
}
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ public class MemoryCalculator extends AbstractOpWrapperVisitor<Void, RuntimeExce
// List of all the buffered operators and their memory requirement per drillbit.
private final Map<DrillNode, List<Pair<PhysicalOperator, Long>>> bufferedOperators;
private final QueryContext queryContext;
private final long MINIMUM_MEMORY_FOR_BUFFER_OPERS;
private final long minimum_memory_for_buffer_opers;

public MemoryCalculator(PlanningSet planningSet, QueryContext context, long minMemory) {
this.planningSet = planningSet;
this.bufferedOperators = new HashMap<>();
this.queryContext = context;
this.MINIMUM_MEMORY_FOR_BUFFER_OPERS = minMemory;
this.minimum_memory_for_buffer_opers = minMemory;
}

// Helper method to compute the minor fragment count per drillbit. This method returns
Expand Down Expand Up @@ -138,7 +138,7 @@ public Void visitOp(PhysicalOperator op, Wrapper fragment) {
// The memory estimates of the optimizer are for the whole operator spread across all the
// minor fragments. Divide this memory estimation by fragment width to get the memory
// requirement per minor fragment.
long memoryCostPerMinorFrag = Math.max((long)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()), MINIMUM_MEMORY_FOR_BUFFER_OPERS);
long memoryCostPerMinorFrag = Math.max((long)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()), minimum_memory_for_buffer_opers);
Map<DrillNode, Integer> drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment);

Map<DrillNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint fo
PlanFragment fragment = PlanFragment.newBuilder()
.setForeman(foremanNode)
.setHandle(handle)
.setEndpointUUID(endpointUUID)
.setAssignedEndpointUUID(endpointUUID)
.setAssignment(endpoint)
.setLeafFragment(isLeafFragment)
.setContext(queryContextInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner.fragment;

import org.apache.drill.common.DrillNode;
import org.apache.drill.common.util.function.CheckedConsumer;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
Expand All @@ -26,9 +27,10 @@
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.util.memory.ZKQueueMemoryAllocationUtilities;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
Expand Down Expand Up @@ -69,15 +71,17 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,


public class Collector extends AbstractOpWrapperVisitor<Void, RuntimeException> {
private final Multimap<DrillbitEndpoint, PhysicalOperator> bufferedOperators;
private final Map<DrillNode, List<PhysicalOperator>> bufferedOperators;

public Collector() {
this.bufferedOperators = ArrayListMultimap.create();
this.bufferedOperators = new HashMap<>();
}

private void getMinorFragCountPerDrillbit(Wrapper currFragment, PhysicalOperator operator) {
for (DrillbitEndpoint endpoint : currFragment.getAssignedEndpoints()) {
bufferedOperators.put(endpoint, operator);
DrillNode node = new DrillNode(endpoint);
bufferedOperators.putIfAbsent(node, new ArrayList<>());
bufferedOperators.get(node).add(operator);
}
}

Expand All @@ -103,10 +107,10 @@ public Void visitOp(PhysicalOperator op, Wrapper fragment) {
}

public Map<String, Collection<PhysicalOperator>> getNodeMap() {
Map<DrillbitEndpoint, Collection<PhysicalOperator>> endpointCollectionMap = bufferedOperators.asMap();
Map<DrillNode, List<PhysicalOperator>> endpointCollectionMap = bufferedOperators;
Map<String, Collection<PhysicalOperator>> nodeMap = new HashMap<>();
for (Map.Entry<DrillbitEndpoint, Collection<PhysicalOperator>> entry : endpointCollectionMap.entrySet()) {
nodeMap.put(entry.getKey().getAddress(), entry.getValue());
for (Map.Entry<DrillNode, List<PhysicalOperator>> entry : endpointCollectionMap.entrySet()) {
nodeMap.put(entry.getKey().toString(), entry.getValue());
}

return nodeMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ public class NodeResources {

private long memoryInBytes;

private long numVirtualCpu;
private int numVirtualCpu;

private static final int CURRENT_VERSION = 1;

public NodeResources(long memoryInBytes, long numVirtualCpu) {
public NodeResources(long memoryInBytes, int numVirtualCpu) {
this(CURRENT_VERSION, memoryInBytes, numVirtualCpu);
}

@JsonCreator
public NodeResources(@JsonProperty("version") int version,
@JsonProperty("memoryInBytes") long memoryInBytes,
@JsonProperty("numVirtualCpu") long numVirtualCpu) {
@JsonProperty("numVirtualCpu") int numVirtualCpu) {
this.version = version;
this.memoryInBytes = memoryInBytes;
this.numVirtualCpu = numVirtualCpu;
Expand All @@ -79,7 +79,7 @@ public long getMemoryInGB() {
return Math.round(getMemoryInMB() / 1024L);
}

public long getNumVirtualCpu() {
public int getNumVirtualCpu() {
return numVirtualCpu;
}

Expand Down Expand Up @@ -140,11 +140,11 @@ public static NodeResources create() {
return create(0,0);
}

public static NodeResources create(long cpu) {
public static NodeResources create(int cpu) {
return create(cpu,0);
}

public static NodeResources create(long cpu, long memory) {
public static NodeResources create(int cpu, long memory) {
return new NodeResources(CURRENT_VERSION, memory, cpu);
}

Expand Down
Loading

0 comments on commit 38e9a73

Please sign in to comment.