Skip to content

Commit

Permalink
Merge pull request #3661 from SavinduDimal/transport-listner-stop
Browse files Browse the repository at this point in the history
Stop transport listeners asynchronously during server shutdown
  • Loading branch information
chamilaadhi authored Sep 15, 2023
2 parents 20d2581 + 2e9cfe8 commit a3df9e5
Showing 1 changed file with 59 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@
import javax.management.QueryExp;
import java.io.File;
import java.lang.management.ManagementPermission;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* Class for handling Server management functionalilty.
Expand Down Expand Up @@ -75,13 +81,44 @@ public void startMaintenance() throws Exception {
secMan.checkPermission(new ManagementPermission("control"));
}
log.info("Starting to switch to maintenance mode...");
stopTransportListeners();
destroyTransportListeners();
waitForRequestCompletion();
}

/**
* Stop Transport Listeners asynchronously and wait for the completion of the tasks
*/
private void stopTransportListeners() {
ExecutorService transportListenerShutdownPool = Executors.newFixedThreadPool(inTransports.size());
List<Future<Void>> listenerShutdownFutures = new ArrayList<>();
for (TransportInDescription tinDesc : inTransports.values()) {
TransportListener transport = tinDesc.getReceiver();
transport.stop();
Future<Void> future = transportListenerShutdownPool.submit(new TransportListenerShutdownTask(transport));
listenerShutdownFutures.add(future);
}

// Wait until shutting down the transport listeners before proceeding
for (Future<Void> future : listenerShutdownFutures) {
try {
future.get();
} catch (Exception e) {
log.error("Error while completing transport listener shutdown", e);
}
}
transportListenerShutdownPool.shutdown();
log.info("Stopped all transport listeners");
}

waitForRequestCompletion();
/**
* Destroy Transport Listeners
*/
private void destroyTransportListeners() {
// Destroy the TransportListener at the end to clear up resources
for (TransportInDescription tinDesc : inTransports.values()) {
TransportListener transport = tinDesc.getReceiver();
transport.destroy();
}
}

/**
Expand Down Expand Up @@ -264,4 +301,24 @@ public void endMaintenance() throws Exception {
}
log.info("Switched to normal mode");
}

/**
* Callable task to pause and shutdown a transport listener
*/
private class TransportListenerShutdownTask implements Callable<Void> {
private TransportListener transport;

public TransportListenerShutdownTask(TransportListener transport) {
this.transport = transport;
}

public Void call() throws Exception {
try {
transport.stop();
} catch (Exception e) {
log.error("Error while stopping Transport Listener", e);
}
return null;
}
}
}

0 comments on commit a3df9e5

Please sign in to comment.