Skip to content

Commit

Permalink
DRILL-8490: Sender operator fake memory leak result to sql failed and…
Browse files Browse the repository at this point in the history
… memory statistics error when ChannelClosedException (apache#2917)
  • Loading branch information
shfshihuafeng authored Jul 5, 2024
1 parent 6564098 commit af62ace
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public AccountingDataTunnel(DataTunnel tunnel, SendingAccountor sendingAccountor

public void sendRecordBatch(FragmentWritableBatch batch) {
sendingAccountor.increment();
sendingAccountor.incrementComplete();
tunnel.sendRecordBatch(statusHandler, batch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ public void interrupted(final InterruptedException e) {
sendingAccountor.decrement();
consumer.interrupt(e);
}

@Override
public void complete() {
sendingAccountor.decrementComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class SendingAccountor {

private final AtomicInteger batchesSent = new AtomicInteger(0);
private final Semaphore wait = new Semaphore(0);
//batch release util data send complete
private final AtomicInteger batchesComplete = new AtomicInteger(0);
private final Semaphore waitComplete = new Semaphore(0);

void increment() {
batchesSent.incrementAndGet();
Expand All @@ -44,6 +47,14 @@ void decrement() {
wait.release();
}

void incrementComplete() {
batchesComplete.incrementAndGet();
}

void decrementComplete() {
waitComplete.release();
}

public synchronized void waitForSendComplete() {
int waitForBatches = batchesSent.get();
boolean isInterrupted = false;
Expand All @@ -61,11 +72,31 @@ public synchronized void waitForSendComplete() {
isInterrupted = true;
}
}

isInterrupted = isInterrupted || waitForOperatorComplete();
if (isInterrupted) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
}
}

public synchronized boolean waitForOperatorComplete() {
int waitForBatches = batchesComplete.get();
boolean isInterrupted = false;
while(waitForBatches != 0) {
try {
waitComplete.acquire(waitForBatches);
waitForBatches = batchesComplete.addAndGet(-1 * waitForBatches);
} catch (InterruptedException e) {
// We should always wait for send complete. If we don't, we'll leak memory or have a memory miss when we try
// to send. This should be safe because: network connections should get disconnected and fail a send if a
// node goes down, otherwise, the receiving side connection should always consume from the rpc layer
// (blocking is cooperative and will get triggered before this)
logger.warn("Interrupted while waiting for send complete. Continuing to wait.", e);

isInterrupted = true;
}
}
return isInterrupted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void sendRecordBatch(RpcOutcomeListener<BitData.AckWithCredit> outcomeLis
}

outcomeListener.interrupted(e);

outcomeListener.complete();
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -141,6 +141,11 @@ public void interrupted(InterruptedException e) {
sendingSemaphore.release();
inner.interrupted(e);
}

@Override
public void complete() {
inner.complete();
}
}

private class SendBatchAsyncListen extends ListeningCommand<BitData.AckWithCredit, DataClientConnection, RpcType, MessageLite> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public boolean blockOnNotWritable(RpcOutcomeListener<?> listener) {
return true;
} catch (final InterruptedException e) {
listener.interrupted(e);

listener.complete();
// Preserve evidence that the interruption occurred so that code higher up
// on the call stack can learn of the
// interruption and respond to it if it wants to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinati

@Override
public void operationComplete(ChannelFuture future) throws Exception {
handler.complete();
if (!future.isSuccess()) {
try {
removeFromMap(coordinationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType, SEND pr
completed = true;
} catch (Exception | AssertionError e) {
listener.failed(new RpcException("Failure sending message.", e));
listener.complete();
} finally {

if (!completed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ public interface RpcOutcomeListener<V> {
* is cancelled due to query cancellations or failures.
*/
void interrupted(final InterruptedException e);

/**
* Called when an operator complete for waiting msg release
*/
default void complete() {
}
}

0 comments on commit af62ace

Please sign in to comment.