Skip to content

Commit

Permalink
CURATOR-727: Allow watches to be executed asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Dec 16, 2024
1 parent ad19795 commit c0e37fc
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
Expand Down Expand Up @@ -171,6 +172,7 @@ public static class Builder {
private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
private int waitForShutdownTimeoutMs = 0;
private Executor runSafeService = null;
private ExecutorService asyncWatchService = null;
private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory =
ConnectionStateListenerManagerFactory.standard;
private int simulatedSessionExpirationPercent = 100;
Expand Down Expand Up @@ -506,6 +508,19 @@ public Builder runSafeService(Executor runSafeService) {
return this;
}

/**
* By default, watches are run sequentially.
* If an executor is provided here, then all watch calls will be run asynchronously via this executor.
* This executor service will be closed when the CuratorFramework is closed.
*
* @param asyncWatchService executorService to use for all watch calls
* @return this
*/
public Builder asyncWatchService(ExecutorService asyncWatchService) {
this.asyncWatchService = asyncWatchService;
return this;
}

/**
* Sets the connection state listener manager factory. For example,
* you can set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
Expand All @@ -530,6 +545,10 @@ public Executor getRunSafeService() {
return runSafeService;
}

public ExecutorService getAsyncWatchService() {
return asyncWatchService;
}

public ACLProvider getAclProvider() {
return aclProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
private final Executor runSafeService;
private final ExecutorService asyncWatchService;
private final ZookeeperCompatibility zookeeperCompatibility;

private volatile ExecutorService executorService;
Expand Down Expand Up @@ -205,6 +206,7 @@ public void process(WatchedEvent watchedEvent) {
builder.withEnsembleTracker() ? new EnsembleTracker(this, builder.getEnsembleProvider()) : null;

runSafeService = makeRunSafeService(builder);
asyncWatchService = builder.getAsyncWatchService();
zookeeperCompatibility = builder.getZookeeperCompatibility();
}

Expand Down Expand Up @@ -294,6 +296,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
schemaSet = parent.schemaSet;
ensembleTracker = parent.ensembleTracker;
runSafeService = parent.runSafeService;
asyncWatchService = parent.asyncWatchService;
zookeeperCompatibility = parent.zookeeperCompatibility;
}

Expand Down Expand Up @@ -430,7 +433,6 @@ public void close() {
Thread.currentThread().interrupt();
}
}

if (ensembleTracker != null) {
ensembleTracker.close();
}
Expand All @@ -445,6 +447,18 @@ public void close() {
unhandledErrorListeners.clear();
connectionStateManager.close();
client.close();

if (asyncWatchService != null) {
asyncWatchService.shutdown();
try {
if (!asyncWatchService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS)) {
asyncWatchService.shutdownNow();
}
} catch (InterruptedException e) {
// Interrupted while interrupting; I give up.
Thread.currentThread().interrupt();
}
}
}
}

Expand Down Expand Up @@ -630,6 +644,10 @@ FailedRemoveWatchManager getFailedRemoveWatcherManager() {
return failedRemoveWatcherManager;
}

ExecutorService getAsyncWatchService() {
return asyncWatchService;
}

RetryLoop newRetryLoop() {
return client.newRetryLoop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.util.concurrent.Executor;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.WatchedEvent;
Expand Down Expand Up @@ -65,14 +66,25 @@ public void process(WatchedEvent event) {
client.getWatcherRemovalManager().noteTriggeredWatcher(this);
}

Runnable watchRunnable = null;
if (actualWatcher != null) {
actualWatcher.process(new NamespaceWatchedEvent(client, event));
watchRunnable = () -> actualWatcher.process(new NamespaceWatchedEvent(client, event));
} else if (curatorWatcher != null) {
try {
curatorWatcher.process(new NamespaceWatchedEvent(client, event));
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
client.logError("Watcher exception", e);
watchRunnable = () -> {
try {
curatorWatcher.process(new NamespaceWatchedEvent(client, event));
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
client.logError("Watcher exception", e);
}
};
}
if (watchRunnable != null) {
Executor watchExecutor = client.getAsyncWatchService();
if (watchExecutor != null) {
watchExecutor.execute(watchRunnable);
} else {
watchRunnable.run();
}
}
}
Expand Down

0 comments on commit c0e37fc

Please sign in to comment.