Skip to content

Commit

Permalink
QPIDJMS-602: fix potential for session shutdown NPE during competing …
Browse files Browse the repository at this point in the history
…local and remote closures

Make the ProviderFuture creation safe from NPE, validate a future is returned and noop the
completion wait if not since the provider ref is gone already.
Add additional try-finally to ensure executor shutdown occurs.

(cherry picked from commit 82557f9)
  • Loading branch information
gemmellr committed Sep 27, 2024
1 parent 723ce06 commit 1f2cfe3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,12 @@ ProviderFuture newProviderFuture() {
}

ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
return provider.newProviderFuture(synchronization);
Provider localProvider = provider;
if (localProvider != null) {
return localProvider.newProviderFuture(synchronization);
} else {
return null;
}
}

//----- Property setters and getters -------------------------------------//
Expand Down
60 changes: 33 additions & 27 deletions qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,39 +380,45 @@ protected boolean shutdown(Throwable cause) throws JMSException {
// Ensure that no asynchronous completion sends remain blocked after close but wait
// using the close timeout for the asynchronous sends to complete normally.
final ExecutorService completionExecutor = getCompletionExecutor();
try {
synchronized (sessionInfo) {
// Producers are now quiesced and we can await completion of asynchronous sends
// that are still pending a result or timeout once we've done a quick check to
// see if any are actually pending or have completed already.
asyncSendsCompletion = connection.newProviderFuture();

if (asyncSendsCompletion != null) {
completionExecutor.execute(() -> {
if (asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
}
});
}
}

synchronized (sessionInfo) {
// Producers are now quiesced and we can await completion of asynchronous sends
// that are still pending a result or timeout once we've done a quick check to
// see if any are actually pending or have completed already.
asyncSendsCompletion = connection.newProviderFuture();

completionExecutor.execute(() -> {
if (asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
try {
if (asyncSendsCompletion != null) {
asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
}
} catch (Exception ex) {
LOG.trace("Exception during wait for asynchronous sends to complete", ex);
} finally {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}
});
}

try {
asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (Exception ex) {
LOG.trace("Exception during wait for asynchronous sends to complete", ex);
} finally {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
// as a last task we want to fail any stragglers in the asynchronous send queue and then
// shutdown the queue to prevent any more submissions while the cleanup goes on.
completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
}

// as a last task we want to fail any stragglers in the asynchronous send queue and then
// shutdown the queue to prevent any more submissions while the cleanup goes on.
completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
} finally {
completionExecutor.shutdown();
}

try {
completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
try {
completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}
}

if (shutdownError != null) {
Expand Down

0 comments on commit 1f2cfe3

Please sign in to comment.