diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java index 67d78e56e79..61ef523979a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java @@ -43,6 +43,7 @@ public AccountingDataTunnel(DataTunnel tunnel, SendingAccountor sendingAccountor public void sendRecordBatch(FragmentWritableBatch batch) { sendingAccountor.increment(); + sendingAccountor.incrementComplete(); tunnel.sendRecordBatch(statusHandler, batch); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java index e78cda9612e..15bb78c6a18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java @@ -62,4 +62,9 @@ public void interrupted(final InterruptedException e) { sendingAccountor.decrement(); consumer.interrupt(e); } + + @Override + public void complete() { + sendingAccountor.decrementComplete(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java index d588b602add..656b1aaad17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java @@ -35,6 +35,9 @@ public class SendingAccountor { private final AtomicInteger batchesSent = new AtomicInteger(0); private final Semaphore wait = new Semaphore(0); + //batch release util data send complete + private final AtomicInteger batchesComplete = new AtomicInteger(0); + private final Semaphore waitComplete = new Semaphore(0); void increment() { batchesSent.incrementAndGet(); @@ -44,6 +47,14 @@ void decrement() { wait.release(); } + void incrementComplete() { + batchesComplete.incrementAndGet(); + } + + void decrementComplete() { + waitComplete.release(); + } + public synchronized void waitForSendComplete() { int waitForBatches = batchesSent.get(); boolean isInterrupted = false; @@ -61,11 +72,31 @@ public synchronized void waitForSendComplete() { isInterrupted = true; } } - + isInterrupted = isInterrupted || waitForOperatorComplete(); if (isInterrupted) { // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the // interruption and respond to it if it wants to. Thread.currentThread().interrupt(); } } + + public synchronized boolean waitForOperatorComplete() { + int waitForBatches = batchesComplete.get(); + boolean isInterrupted = false; + while(waitForBatches != 0) { + try { + waitComplete.acquire(waitForBatches); + waitForBatches = batchesComplete.addAndGet(-1 * waitForBatches); + } catch (InterruptedException e) { + // We should always wait for send complete. If we don't, we'll leak memory or have a memory miss when we try + // to send. This should be safe because: network connections should get disconnected and fail a send if a + // node goes down, otherwise, the receiving side connection should always consume from the rpc layer + // (blocking is cooperative and will get triggered before this) + logger.warn("Interrupted while waiting for send complete. Continuing to wait.", e); + + isInterrupted = true; + } + } + return isInterrupted; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index f60d6680768..da0601fd12f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -84,7 +84,7 @@ public void sendRecordBatch(RpcOutcomeListener outcomeLis } outcomeListener.interrupted(e); - + outcomeListener.complete(); // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the // interruption and respond to it if it wants to. Thread.currentThread().interrupt(); @@ -141,6 +141,11 @@ public void interrupted(InterruptedException e) { sendingSemaphore.release(); inner.interrupted(e); } + + @Override + public void complete() { + inner.complete(); + } } private class SendBatchAsyncListen extends ListeningCommand { diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java index 6e11236fc44..c02c1ed9bad 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java @@ -77,7 +77,7 @@ public boolean blockOnNotWritable(RpcOutcomeListener listener) { return true; } catch (final InterruptedException e) { listener.interrupted(e); - + listener.complete(); // Preserve evidence that the interruption occurred so that code higher up // on the call stack can learn of the // interruption and respond to it if it wants to. diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java index 11b829ef200..cc38cc00376 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java @@ -115,6 +115,7 @@ public RpcListener(RpcOutcomeListener handler, Class clazz, int coordinati @Override public void operationComplete(ChannelFuture future) throws Exception { + handler.complete(); if (!future.isSuccess()) { try { removeFromMap(coordinationId); diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 69c1bb33118..aa85dc44d58 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -120,6 +120,7 @@ void send(RpcOutcomeListener listener, C connection, T rpcType, SEND pr completed = true; } catch (Exception | AssertionError e) { listener.failed(new RpcException("Failure sending message.", e)); + listener.complete(); } finally { if (!completed) { diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java index 4afa1598461..953c56bc4f6 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java @@ -34,4 +34,10 @@ public interface RpcOutcomeListener { * is cancelled due to query cancellations or failures. */ void interrupted(final InterruptedException e); + + /** + * Called when an operator complete for waiting msg release + */ + default void complete() { + } }