diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index d24b56d64..bf928d824 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.curator.CuratorZookeeperClient; @@ -171,6 +172,7 @@ public static class Builder { private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet(); private int waitForShutdownTimeoutMs = 0; private Executor runSafeService = null; + private ExecutorService asyncWatchService = null; private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard; private int simulatedSessionExpirationPercent = 100; @@ -506,6 +508,19 @@ public Builder runSafeService(Executor runSafeService) { return this; } + /** + * By default, watches are run sequentially. + * If an executor is provided here, then all watch calls will be run asynchronously via this executor. + * This executor service will be closed when the CuratorFramework is closed. + * + * @param asyncWatchService executorService to use for all watch calls + * @return this + */ + public Builder asyncWatchService(ExecutorService asyncWatchService) { + this.asyncWatchService = asyncWatchService; + return this; + } + /** * Sets the connection state listener manager factory. For example, * you can set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)} @@ -530,6 +545,10 @@ public Executor getRunSafeService() { return runSafeService; } + public ExecutorService getAsyncWatchService() { + return asyncWatchService; + } + public ACLProvider getAclProvider() { return aclProvider; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 816d0bda0..04b24f8b9 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -117,6 +117,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { private final EnsembleTracker ensembleTracker; private final SchemaSet schemaSet; private final Executor runSafeService; + private final ExecutorService asyncWatchService; private final ZookeeperCompatibility zookeeperCompatibility; private volatile ExecutorService executorService; @@ -205,6 +206,7 @@ public void process(WatchedEvent watchedEvent) { builder.withEnsembleTracker() ? new EnsembleTracker(this, builder.getEnsembleProvider()) : null; runSafeService = makeRunSafeService(builder); + asyncWatchService = builder.getAsyncWatchService(); zookeeperCompatibility = builder.getZookeeperCompatibility(); } @@ -294,6 +296,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) { schemaSet = parent.schemaSet; ensembleTracker = parent.ensembleTracker; runSafeService = parent.runSafeService; + asyncWatchService = parent.asyncWatchService; zookeeperCompatibility = parent.zookeeperCompatibility; } @@ -430,7 +433,6 @@ public void close() { Thread.currentThread().interrupt(); } } - if (ensembleTracker != null) { ensembleTracker.close(); } @@ -445,6 +447,18 @@ public void close() { unhandledErrorListeners.clear(); connectionStateManager.close(); client.close(); + + if (asyncWatchService != null) { + asyncWatchService.shutdown(); + try { + if (!asyncWatchService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS)) { + asyncWatchService.shutdownNow(); + } + } catch (InterruptedException e) { + // Interrupted while interrupting; I give up. + Thread.currentThread().interrupt(); + } + } } } @@ -630,6 +644,10 @@ FailedRemoveWatchManager getFailedRemoveWatcherManager() { return failedRemoveWatcherManager; } + ExecutorService getAsyncWatchService() { + return asyncWatchService; + } + RetryLoop newRetryLoop() { return client.newRetryLoop(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java index c3f0d8e29..27028cac6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java @@ -22,6 +22,7 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import java.io.Closeable; +import java.util.concurrent.Executor; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.WatchedEvent; @@ -65,14 +66,25 @@ public void process(WatchedEvent event) { client.getWatcherRemovalManager().noteTriggeredWatcher(this); } + Runnable watchRunnable = null; if (actualWatcher != null) { - actualWatcher.process(new NamespaceWatchedEvent(client, event)); + watchRunnable = () -> actualWatcher.process(new NamespaceWatchedEvent(client, event)); } else if (curatorWatcher != null) { - try { - curatorWatcher.process(new NamespaceWatchedEvent(client, event)); - } catch (Exception e) { - ThreadUtils.checkInterrupted(e); - client.logError("Watcher exception", e); + watchRunnable = () -> { + try { + curatorWatcher.process(new NamespaceWatchedEvent(client, event)); + } catch (Exception e) { + ThreadUtils.checkInterrupted(e); + client.logError("Watcher exception", e); + } + }; + } + if (watchRunnable != null) { + Executor watchExecutor = client.getAsyncWatchService(); + if (watchExecutor != null) { + watchExecutor.execute(watchRunnable); + } else { + watchRunnable.run(); } } }