From cd03c72a39fc569e8e33934f26d772831aef0bfd Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Wed, 16 Oct 2024 20:39:16 +0800 Subject: [PATCH] CURATOR-710: Fix leaking watch in EnsembleTracker CURATOR-667(#474) fixes asynchronous event path for `getConfig` to "/zookeeper/config" by using `CuratorFramework::usingNamespace(null)` to fetch data. It causes watcher not registering to possible `WatcherRemovalManager`, so leaking in `WatcherRemoveCuratorFramework::removeWatchers`. --- .../framework/imps/GetConfigBuilderImpl.java | 13 ++-- .../curator/framework/imps/Watching.java | 16 +++-- .../imps/TestWatcherRemovalManager.java | 64 +++++++++++++++++++ 3 files changed, 84 insertions(+), 9 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java index 91f20d4d3..54ae6f571 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -33,6 +33,7 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperation, ErrorListenerEnsembleable { private final CuratorFrameworkImpl client; + private final WatcherRemovalManager watcherRemovalManager; private Backgrounding backgrounding; private Watching watching; @@ -40,14 +41,16 @@ public class GetConfigBuilderImpl public GetConfigBuilderImpl(CuratorFrameworkImpl client) { this.client = (CuratorFrameworkImpl) client.usingNamespace(null); + this.watcherRemovalManager = client.getWatcherRemovalManager(); backgrounding = new Backgrounding(); - watching = new Watching(this.client); + watching = new Watching(this.client).setWatcherRemovalManager(watcherRemovalManager); } public GetConfigBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, Watcher watcher, Stat stat) { this.client = (CuratorFrameworkImpl) client.usingNamespace(null); + this.watcherRemovalManager = client.getWatcherRemovalManager(); this.backgrounding = backgrounding; - this.watching = new Watching(this.client, watcher); + this.watching = new Watching(this.client, watcher).setWatcherRemovalManager(watcherRemovalManager); this.stat = stat; } @@ -110,19 +113,19 @@ public BackgroundEnsembleable usingWatcher(CuratorWatcher watcher) { @Override public BackgroundEnsembleable watched() { - watching = new Watching(client, true); + watching = new Watching(client, true).setWatcherRemovalManager(watcherRemovalManager); return new InternalBackgroundEnsembleable(); } @Override public BackgroundEnsembleable usingWatcher(Watcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(client, watcher).setWatcherRemovalManager(watcherRemovalManager); return new InternalBackgroundEnsembleable(); } @Override public BackgroundEnsembleable usingWatcher(CuratorWatcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(client, watcher).setWatcherRemovalManager(watcherRemovalManager); return new InternalBackgroundEnsembleable(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index 92b16731c..5cde149fc 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -28,10 +28,12 @@ public class Watching { private final CuratorWatcher curatorWatcher; private final boolean watched; private final CuratorFrameworkImpl client; + private WatcherRemovalManager watcherRemovalManager; private NamespaceWatcher namespaceWatcher; public Watching(CuratorFrameworkImpl client, boolean watched) { this.client = client; + this.watcherRemovalManager = client.getWatcherRemovalManager(); this.watcher = null; this.curatorWatcher = null; this.watched = watched; @@ -39,6 +41,7 @@ public Watching(CuratorFrameworkImpl client, boolean watched) { public Watching(CuratorFrameworkImpl client, Watcher watcher) { this.client = client; + this.watcherRemovalManager = client.getWatcherRemovalManager(); this.watcher = watcher; this.curatorWatcher = null; this.watched = false; @@ -46,6 +49,7 @@ public Watching(CuratorFrameworkImpl client, Watcher watcher) { public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) { this.client = client; + this.watcherRemovalManager = client.getWatcherRemovalManager(); this.watcher = null; this.curatorWatcher = watcher; this.watched = false; @@ -53,11 +57,17 @@ public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) { public Watching(CuratorFrameworkImpl client) { this.client = client; + this.watcherRemovalManager = client.getWatcherRemovalManager(); watcher = null; watched = false; curatorWatcher = null; } + Watching setWatcherRemovalManager(WatcherRemovalManager watcherRemovalManager) { + this.watcherRemovalManager = watcherRemovalManager; + return this; + } + Watcher getWatcher(String unfixedPath) { namespaceWatcher = null; if (watcher != null) { @@ -85,10 +95,8 @@ void commitWatcher(int rc, boolean isExists) { doCommit = (rc == KeeperException.Code.OK.intValue()); } - if (doCommit && (namespaceWatcher != null)) { - if (client.getWatcherRemovalManager() != null) { - client.getWatcherRemovalManager().add(namespaceWatcher); - } + if (doCommit && namespaceWatcher != null && watcherRemovalManager != null) { + watcherRemovalManager.add(namespaceWatcher); } } } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java index 960a86b9c..b5e90c628 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -34,12 +35,27 @@ import org.apache.curator.test.Timing; import org.apache.curator.test.WatchersDebug; import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.utils.DebugUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class TestWatcherRemovalManager extends CuratorTestBase { + private static final String superUserPasswordDigest = "curator-test:zghsj3JfJqK7DbWf0RQ1BgbJH9w="; // ran from + private static final String superUserPassword = "curator-test"; + + @BeforeEach + @Override + public void setup() throws Exception { + System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", superUserPasswordDigest); + super.setup(); + } + @Test public void testSameWatcherDifferentPaths1Triggered() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); @@ -302,6 +318,54 @@ public void testBasicNamespace3() throws Exception { } } + @Test + public void testEnsembleTracker() throws Exception { + // given: client with ensemble tracker + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .namespace("hey") + .ensembleTracker(true) + .authorization("digest", superUserPassword.getBytes()) + .build(); + try { + client.start(); + + // We are using standalone, so "/zookeeper/config" will be empty. + // So let's set it directly. + QuorumMaj quorumMaj = new QuorumMaj(Collections.singletonMap( + 1L, + new QuorumPeer.QuorumServer(1, "127.0.0.1:2182:2183:participant;" + server.getConnectString()))); + quorumMaj.setVersion(1); + client.usingNamespace(null) + .setData() + .forPath(ZooDefs.CONFIG_NODE, quorumMaj.toString().getBytes()); + + // when: zookeeper config node data fetched + while (client.getCurrentConfig().getVersion() == 0) { + Thread.sleep(100); + } + + // then: the watcher must be attached + assertEquals( + 1, + WatchersDebug.getDataWatches(client.getZookeeperClient().getZooKeeper()) + .size()); + + // when: ensemble tracker closed + System.setProperty(DebugUtils.PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true"); + ((CuratorFrameworkImpl) client).getEnsembleTracker().close(); + + // then: the watcher must be removed + assertEquals( + 0, + WatchersDebug.getDataWatches(client.getZookeeperClient().getZooKeeper()) + .size()); + } finally { + TestCleanState.closeAndTestClean(client); + } + } + @Test public void testSameWatcher() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));