diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 2433bb9efac..978d5355ebe 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -119,6 +119,9 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { private final BookKeeperClientStats clientStats; private final double bookieQuarantineRatio; + // Inner high priority thread for WatchTask. Disable external use. + private final OrderedScheduler highPriorityTaskExecutor; + // whether the event loop group is one we created, or is owned by whoever // instantiated us boolean ownEventLoopGroup = false; @@ -424,6 +427,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo // initialize resources this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build(); + this.highPriorityTaskExecutor = + OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperWatchTaskScheduler").build(); this.mainWorkerPool = OrderedExecutor.newBuilder() .name("BookKeeperClientWorker") .numThreads(conf.getNumWorkerThreads()) @@ -449,7 +454,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo } this.metadataDriver.initialize( conf, - scheduler, + highPriorityTaskExecutor, rootStatsLogger, Optional.ofNullable(zkc)); } catch (ConfigurationException ce) { @@ -551,6 +556,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo statsLogger = NullStatsLogger.INSTANCE; clientStats = BookKeeperClientStats.newInstance(statsLogger); scheduler = null; + highPriorityTaskExecutor = null; requestTimer = null; metadataDriver = null; placementPolicy = null; @@ -1462,6 +1468,13 @@ public void close() throws BKException, InterruptedException { if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { LOG.warn("The scheduler did not shutdown cleanly"); } + + // Close the watchTask scheduler + highPriorityTaskExecutor.shutdown(); + if (!highPriorityTaskExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.warn("The highPriorityTaskExecutor for WatchTask did not shutdown cleanly"); + } + mainWorkerPool.shutdown(); if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) { LOG.warn("The mainWorkerPool did not shutdown cleanly"); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index 14b71a163d2..bb534b1e583 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -37,7 +37,9 @@ import java.util.Collections; import java.util.Enumeration; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,12 +52,14 @@ import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.client.api.WriteHandle; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.test.TestStatsProvider; import org.apache.bookkeeper.util.StaticDNSResolver; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; @@ -1298,4 +1302,48 @@ public void testBookieAddressResolverPassedToDNSToSwitchMapping() throws Excepti } } + @Test + public void testBookieWatcher() throws Exception { + ClientConfiguration conf = new ClientConfiguration(); + conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + + StaticDNSResolver tested = new StaticDNSResolver(); + try (BookKeeper bkc = BookKeeper + .forConfig(conf) + .dnsResolver(tested) + .build()) { + final Map bookieInfo = bkc.getBookieInfo(); + + // 1. check all bookies in client cache successfully. + bookieInfo.forEach((bookieId, info) -> { + final CompletableFuture> bookieServiceInfo = bkc.getMetadataClientDriver() + .getRegistrationClient().getBookieServiceInfo(bookieId); + assertTrue(bookieServiceInfo.isDone()); + assertFalse(bookieServiceInfo.isCompletedExceptionally()); + }); + + // 2. add a task to scheduler, blocking zk watch for bookies cache + bkc.getClientCtx().getScheduler().schedule(() -> { + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }, 0, TimeUnit.MILLISECONDS); + + // 3. restart one bookie, so the client should update cache by WatchTask + restartBookie(bookieInfo.keySet().iterator().next()); + + // 4. after restart bookie, check again for the client cache + final CompletableFuture> bookieServiceInfo = + bkc.getMetadataClientDriver().getRegistrationClient() + .getBookieServiceInfo(bookieInfo.keySet().iterator().next()); + assertTrue(bookieServiceInfo.isDone()); + // 5. Previously, we used scheduler, and here getting bookie from client cache would fail. + // 6. After this PR, we introduced independent internal thread pool watchTaskScheduler, + // and here it will succeed. + assertFalse(bookieServiceInfo.isCompletedExceptionally()); + } + } + }