diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java index b478f43a0c2..f6b3a3a04f2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java @@ -25,11 +25,14 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -147,26 +150,28 @@ public Future start() { /** * Run cleanup operations for the auditor elector. */ - private void submitShutdownTask() { - executor.submit(new Runnable() { - @Override - public void run() { - if (!running.compareAndSet(true, false)) { - return; - } - - try { - ledgerAuditorManager.close(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("InterruptedException while closing ledger auditor manager", ie); - } catch (Exception ke) { - LOG.error("Exception while closing ledger auditor manager", ke); - } - } - }); + private Future submitShutdownTask() { + return executor.submit(shutdownTask); } + Runnable shutdownTask = new Runnable() { + @Override + public void run() { + if (!running.compareAndSet(true, false)) { + return; + } + + try { + ledgerAuditorManager.close(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("InterruptedException while closing ledger auditor manager", ie); + } catch (Exception ke) { + LOG.error("Exception while closing ledger auditor manager", ke); + } + } + }; + /** * Performing the auditor election using the ZooKeeper ephemeral sequential * znode. The bookie which has created the least sequential will be elect as @@ -238,8 +243,18 @@ public void shutdown() throws InterruptedException { return; } // close auditor manager - submitShutdownTask(); - executor.shutdown(); + try { + submitShutdownTask().get(10, TimeUnit.SECONDS); + executor.shutdown(); + } catch (ExecutionException e) { + LOG.warn("Failed to close auditor manager", e); + executor.shutdownNow(); + shutdownTask.run(); + } catch (TimeoutException e) { + LOG.warn("Failed to close auditor manager in 10 seconds", e); + executor.shutdownNow(); + shutdownTask.run(); + } } if (auditor != null) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java index 7fc733a1113..2e3e09012fb 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.Cleanup; import org.apache.bookkeeper.bookie.BookieImpl; @@ -409,7 +410,16 @@ public void testInnerDelayedAuditOfLostBookies() throws Exception { urLedgerMgr.setLostBookieRecoveryDelay(5); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + AtomicReference shutdownBookieRef = new AtomicReference<>(); + CountDownLatch shutdownLatch = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie = shutDownNonAuditorBookie(); + shutdownBookieRef.set(shutdownBookie); + shutdownLatch.countDown(); + } catch (Exception ignore) { + } + }).start(); if (LOG.isDebugEnabled()) { LOG.debug("Waiting for ledgers to be marked as under replicated"); @@ -425,9 +435,10 @@ public void testInnerDelayedAuditOfLostBookies() throws Exception { urLedgerList.contains(ledgerId)); Map urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie + shutdownLatch.await(); + assertTrue("Bookie " + shutdownBookieRef.get() + "is not listed in the ledger as missing replica :" + data, - data.contains(shutdownBookie)); + data.contains(shutdownBookieRef.get())); } /** @@ -486,7 +497,16 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws urLedgerMgr.setLostBookieRecoveryDelay(50); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + AtomicReference shutdownBookieRef = new AtomicReference<>(); + CountDownLatch shutdownLatch = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie = shutDownNonAuditorBookie(); + shutdownBookieRef.set(shutdownBookie); + shutdownLatch.countDown(); + } catch (Exception ignore) { + } + }).start(); if (LOG.isDebugEnabled()) { LOG.debug("Waiting for ledgers to be marked as under replicated"); @@ -505,9 +525,10 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws urLedgerList.contains(ledgerId)); Map urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie + shutdownLatch.await(); + assertTrue("Bookie " + shutdownBookieRef.get() + "is not listed in the ledger as missing replica :" + data, - data.contains(shutdownBookie)); + data.contains(shutdownBookieRef.get())); } @Test @@ -530,7 +551,16 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Excep urLedgerMgr.setLostBookieRecoveryDelay(3); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + AtomicReference shutdownBookieRef = new AtomicReference<>(); + CountDownLatch shutdownLatch = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie = shutDownNonAuditorBookie(); + shutdownBookieRef.set(shutdownBookie); + shutdownLatch.countDown(); + } catch (Exception ignore) { + } + }).start(); if (LOG.isDebugEnabled()) { LOG.debug("Waiting for ledgers to be marked as under replicated"); @@ -556,9 +586,10 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Excep urLedgerList.contains(ledgerId)); Map urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie + shutdownLatch.await(); + assertTrue("Bookie " + shutdownBookieRef.get() + "is not listed in the ledger as missing replica :" + data, - data.contains(shutdownBookie)); + data.contains(shutdownBookieRef.get())); } @Test @@ -647,7 +678,12 @@ public void testTriggerAuditorWithPendingAuditTask() throws Exception { urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + new Thread(() -> { + try { + shutDownNonAuditorBookie(); + } catch (Exception ignore) { + } + }).start(); if (LOG.isDebugEnabled()) { LOG.debug("Waiting for ledgers to be marked as under replicated"); @@ -698,7 +734,12 @@ public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + new Thread(() -> { + try { + shutDownNonAuditorBookie(); + } catch (Exception ignore) { + } + }).start(); if (LOG.isDebugEnabled()) { LOG.debug("Waiting for ledgers to be marked as under replicated"); @@ -750,8 +791,17 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception { // wait for 10 seconds before starting the recovery work when a bookie fails urLedgerMgr.setLostBookieRecoveryDelay(10); - // shutdown a non auditor bookie to avoid an election - String shutdownBookie1 = shutDownNonAuditorBookie(); + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + AtomicReference shutdownBookieRef1 = new AtomicReference<>(); + CountDownLatch shutdownLatch1 = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie1 = shutDownNonAuditorBookie(); + shutdownBookieRef1.set(shutdownBookie1); + shutdownLatch1.countDown(); + } catch (Exception ignore) { + } + }).start(); // wait for 3 seconds and there shouldn't be any under replicated ledgers // because we have delayed the start of audit by 10 seconds @@ -763,7 +813,16 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception { // the history about having delayed recovery remains. Hence we make sure // we bring down a non auditor bookie. This should cause the audit to take // place immediately and not wait for the remaining 7 seconds to elapse - String shutdownBookie2 = shutDownNonAuditorBookie(); + AtomicReference shutdownBookieRef2 = new AtomicReference<>(); + CountDownLatch shutdownLatch2 = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie2 = shutDownNonAuditorBookie(); + shutdownBookieRef2.set(shutdownBookie2); + shutdownLatch2.countDown(); + } catch (Exception ignore) { + } + }).start(); // 2 second grace period for the ledgers to get reported as under replicated Thread.sleep(2000); @@ -776,9 +835,11 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception { urLedgerList.contains(ledgerId)); Map urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie1 + shutdownBookie2 + shutdownLatch1.await(); + shutdownLatch2.await(); + assertTrue("Bookie " + shutdownBookieRef1.get() + shutdownBookieRef2.get() + " are not listed in the ledger as missing replicas :" + data, - data.contains(shutdownBookie1) && data.contains(shutdownBookie2)); + data.contains(shutdownBookieRef1.get()) && data.contains(shutdownBookieRef2.get())); } /** @@ -808,7 +869,17 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception { // shutdown a non auditor bookie to avoid an election int idx1 = getShutDownNonAuditorBookieIdx(""); ServerConfiguration conf1 = confByIndex(idx1); - String shutdownBookie1 = shutdownBookie(idx1); + + AtomicReference shutdownBookieRef1 = new AtomicReference<>(); + CountDownLatch shutdownLatch1 = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie1 = shutdownBookie(idx1); + shutdownBookieRef1.set(shutdownBookie1); + shutdownLatch1.countDown(); + } catch (Exception ignore) { + } + }).start(); // wait for 2 seconds and there shouldn't be any under replicated ledgers // because we have delayed the start of audit by 5 seconds @@ -821,8 +892,17 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception { // Now to simulate the rolling upgrade, bring down a bookie different from // the one we brought down/up above. - String shutdownBookie2 = shutDownNonAuditorBookie(shutdownBookie1); - + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + AtomicReference shutdownBookieRef2 = new AtomicReference<>(); + CountDownLatch shutdownLatch2 = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie2 = shutDownNonAuditorBookie(); + shutdownBookieRef2.set(shutdownBookie2); + shutdownLatch2.countDown(); + } catch (Exception ignore) { + } + }).start(); // since the first bookie that was brought down/up has come up, there is only // one bookie down at this time. Hence the lost bookie check shouldn't start // immediately; it will start 5 seconds after the second bookie went down @@ -839,11 +919,13 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception { urLedgerList.contains(ledgerId)); Map urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie1 + "wrongly listed as missing the ledger: " + data, - !data.contains(shutdownBookie1)); - assertTrue("Bookie " + shutdownBookie2 + shutdownLatch1.await(); + shutdownLatch2.await(); + assertTrue("Bookie " + shutdownBookieRef1.get() + "wrongly listed as missing the ledger: " + data, + !data.contains(shutdownBookieRef1.get())); + assertTrue("Bookie " + shutdownBookieRef2.get() + " is not listed in the ledger as missing replicas :" + data, - data.contains(shutdownBookie2)); + data.contains(shutdownBookieRef2.get())); LOG.info("*****************Test Complete"); } @@ -1008,7 +1090,7 @@ private Auditor getAuditorBookiesAuditor() throws Exception { return auditorElectors.get(bookieAddr).auditor; } - private String shutDownNonAuditorBookie() throws Exception { + private String shutDownNonAuditorBookie() throws Exception { // shutdown bookie which is not an auditor int indexOf = indexOfServer(getAuditorBookie()); int bkIndexDownBookie;