Skip to content

Commit

Permalink
Fix auditor elector executor block problem. (apache#4165)
Browse files Browse the repository at this point in the history
### Motivation
Now, when we shut down the auditor elector. The shutdown behavior via submitting a shutdown task to do it.
In some cases, the follower auditor elector executor is always blocked due to waiting leader election, so the shutdown task will lie in the task queue forever, get no chance to execute.

In the pulsar, the case happen. See apache/pulsar#21797 (comment)

So in the auditor elector shutdown, if the remain task can't done in time, we should invoke shutdownNow to interrupt the blocked thread.

(cherry picked from commit c3748dd)
  • Loading branch information
horizonzy authored and hangc0276 committed Jan 18, 2024
1 parent 7875f15 commit 11a4020
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -147,26 +150,28 @@ public Future<?> start() {
/**
* Run cleanup operations for the auditor elector.
*/
private void submitShutdownTask() {
executor.submit(new Runnable() {
@Override
public void run() {
if (!running.compareAndSet(true, false)) {
return;
}

try {
ledgerAuditorManager.close();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("InterruptedException while closing ledger auditor manager", ie);
} catch (Exception ke) {
LOG.error("Exception while closing ledger auditor manager", ke);
}
}
});
private Future<?> submitShutdownTask() {
return executor.submit(shutdownTask);
}

Runnable shutdownTask = new Runnable() {
@Override
public void run() {
if (!running.compareAndSet(true, false)) {
return;
}

try {
ledgerAuditorManager.close();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("InterruptedException while closing ledger auditor manager", ie);
} catch (Exception ke) {
LOG.error("Exception while closing ledger auditor manager", ke);
}
}
};

/**
* Performing the auditor election using the ZooKeeper ephemeral sequential
* znode. The bookie which has created the least sequential will be elect as
Expand Down Expand Up @@ -238,8 +243,18 @@ public void shutdown() throws InterruptedException {
return;
}
// close auditor manager
submitShutdownTask();
executor.shutdown();
try {
submitShutdownTask().get(10, TimeUnit.SECONDS);
executor.shutdown();
} catch (ExecutionException e) {
LOG.warn("Failed to close auditor manager", e);
executor.shutdownNow();
shutdownTask.run();
} catch (TimeoutException e) {
LOG.warn("Failed to close auditor manager in 10 seconds", e);
executor.shutdownNow();
shutdownTask.run();
}
}

if (auditor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.bookie.BookieImpl;
Expand Down Expand Up @@ -409,7 +410,16 @@ public void testInnerDelayedAuditOfLostBookies() throws Exception {
urLedgerMgr.setLostBookieRecoveryDelay(5);

// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
CountDownLatch shutdownLatch = new CountDownLatch(1);
new Thread(() -> {
try {
String shutdownBookie = shutDownNonAuditorBookie();
shutdownBookieRef.set(shutdownBookie);
shutdownLatch.countDown();
} catch (Exception ignore) {
}
}).start();

if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for ledgers to be marked as under replicated");
Expand All @@ -425,9 +435,10 @@ public void testInnerDelayedAuditOfLostBookies() throws Exception {
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie
shutdownLatch.await();
assertTrue("Bookie " + shutdownBookieRef.get()
+ "is not listed in the ledger as missing replica :" + data,
data.contains(shutdownBookie));
data.contains(shutdownBookieRef.get()));
}

/**
Expand Down Expand Up @@ -486,7 +497,16 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws
urLedgerMgr.setLostBookieRecoveryDelay(50);

// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
CountDownLatch shutdownLatch = new CountDownLatch(1);
new Thread(() -> {
try {
String shutdownBookie = shutDownNonAuditorBookie();
shutdownBookieRef.set(shutdownBookie);
shutdownLatch.countDown();
} catch (Exception ignore) {
}
}).start();

if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for ledgers to be marked as under replicated");
Expand All @@ -505,9 +525,10 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartImmediately() throws
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie
shutdownLatch.await();
assertTrue("Bookie " + shutdownBookieRef.get()
+ "is not listed in the ledger as missing replica :" + data,
data.contains(shutdownBookie));
data.contains(shutdownBookieRef.get()));
}

@Test
Expand All @@ -530,7 +551,16 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Excep
urLedgerMgr.setLostBookieRecoveryDelay(3);

// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
CountDownLatch shutdownLatch = new CountDownLatch(1);
new Thread(() -> {
try {
String shutdownBookie = shutDownNonAuditorBookie();
shutdownBookieRef.set(shutdownBookie);
shutdownLatch.countDown();
} catch (Exception ignore) {
}
}).start();

if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for ledgers to be marked as under replicated");
Expand All @@ -556,9 +586,10 @@ public void testRescheduleOfDelayedAuditOfLostBookiesToStartLater() throws Excep
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie
shutdownLatch.await();
assertTrue("Bookie " + shutdownBookieRef.get()
+ "is not listed in the ledger as missing replica :" + data,
data.contains(shutdownBookie));
data.contains(shutdownBookieRef.get()));
}

@Test
Expand Down Expand Up @@ -647,7 +678,12 @@ public void testTriggerAuditorWithPendingAuditTask() throws Exception {
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);

// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
new Thread(() -> {
try {
shutDownNonAuditorBookie();
} catch (Exception ignore) {
}
}).start();

if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for ledgers to be marked as under replicated");
Expand Down Expand Up @@ -698,7 +734,12 @@ public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws
urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);

// shutdown a non auditor bookie; choosing non-auditor to avoid another election
String shutdownBookie = shutDownNonAuditorBookie();
new Thread(() -> {
try {
shutDownNonAuditorBookie();
} catch (Exception ignore) {
}
}).start();

if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for ledgers to be marked as under replicated");
Expand Down Expand Up @@ -750,8 +791,17 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception {
// wait for 10 seconds before starting the recovery work when a bookie fails
urLedgerMgr.setLostBookieRecoveryDelay(10);

// shutdown a non auditor bookie to avoid an election
String shutdownBookie1 = shutDownNonAuditorBookie();
// shutdown a non auditor bookie; choosing non-auditor to avoid another election
AtomicReference<String> shutdownBookieRef1 = new AtomicReference<>();
CountDownLatch shutdownLatch1 = new CountDownLatch(1);
new Thread(() -> {
try {
String shutdownBookie1 = shutDownNonAuditorBookie();
shutdownBookieRef1.set(shutdownBookie1);
shutdownLatch1.countDown();
} catch (Exception ignore) {
}
}).start();

// wait for 3 seconds and there shouldn't be any under replicated ledgers
// because we have delayed the start of audit by 10 seconds
Expand All @@ -763,7 +813,16 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception {
// the history about having delayed recovery remains. Hence we make sure
// we bring down a non auditor bookie. This should cause the audit to take
// place immediately and not wait for the remaining 7 seconds to elapse
String shutdownBookie2 = shutDownNonAuditorBookie();
AtomicReference<String> shutdownBookieRef2 = new AtomicReference<>();
CountDownLatch shutdownLatch2 = new CountDownLatch(1);
new Thread(() -> {
try {
String shutdownBookie2 = shutDownNonAuditorBookie();
shutdownBookieRef2.set(shutdownBookie2);
shutdownLatch2.countDown();
} catch (Exception ignore) {
}
}).start();

// 2 second grace period for the ledgers to get reported as under replicated
Thread.sleep(2000);
Expand All @@ -776,9 +835,11 @@ public void testDelayedAuditWithMultipleBookieFailures() throws Exception {
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie1 + shutdownBookie2
shutdownLatch1.await();
shutdownLatch2.await();
assertTrue("Bookie " + shutdownBookieRef1.get() + shutdownBookieRef2.get()
+ " are not listed in the ledger as missing replicas :" + data,
data.contains(shutdownBookie1) && data.contains(shutdownBookie2));
data.contains(shutdownBookieRef1.get()) && data.contains(shutdownBookieRef2.get()));
}

/**
Expand Down Expand Up @@ -808,7 +869,17 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception {
// shutdown a non auditor bookie to avoid an election
int idx1 = getShutDownNonAuditorBookieIdx("");
ServerConfiguration conf1 = confByIndex(idx1);
String shutdownBookie1 = shutdownBookie(idx1);

AtomicReference<String> shutdownBookieRef1 = new AtomicReference<>();
CountDownLatch shutdownLatch1 = new CountDownLatch(1);
new Thread(() -> {
try {
String shutdownBookie1 = shutdownBookie(idx1);
shutdownBookieRef1.set(shutdownBookie1);
shutdownLatch1.countDown();
} catch (Exception ignore) {
}
}).start();

// wait for 2 seconds and there shouldn't be any under replicated ledgers
// because we have delayed the start of audit by 5 seconds
Expand All @@ -821,8 +892,17 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception {

// Now to simulate the rolling upgrade, bring down a bookie different from
// the one we brought down/up above.
String shutdownBookie2 = shutDownNonAuditorBookie(shutdownBookie1);

// shutdown a non auditor bookie; choosing non-auditor to avoid another election
AtomicReference<String> shutdownBookieRef2 = new AtomicReference<>();
CountDownLatch shutdownLatch2 = new CountDownLatch(1);
new Thread(() -> {
try {
String shutdownBookie2 = shutDownNonAuditorBookie();
shutdownBookieRef2.set(shutdownBookie2);
shutdownLatch2.countDown();
} catch (Exception ignore) {
}
}).start();
// since the first bookie that was brought down/up has come up, there is only
// one bookie down at this time. Hence the lost bookie check shouldn't start
// immediately; it will start 5 seconds after the second bookie went down
Expand All @@ -839,11 +919,13 @@ public void testDelayedAuditWithRollingUpgrade() throws Exception {
urLedgerList.contains(ledgerId));
Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
String data = urLedgerData.get(ledgerId);
assertTrue("Bookie " + shutdownBookie1 + "wrongly listed as missing the ledger: " + data,
!data.contains(shutdownBookie1));
assertTrue("Bookie " + shutdownBookie2
shutdownLatch1.await();
shutdownLatch2.await();
assertTrue("Bookie " + shutdownBookieRef1.get() + "wrongly listed as missing the ledger: " + data,
!data.contains(shutdownBookieRef1.get()));
assertTrue("Bookie " + shutdownBookieRef2.get()
+ " is not listed in the ledger as missing replicas :" + data,
data.contains(shutdownBookie2));
data.contains(shutdownBookieRef2.get()));
LOG.info("*****************Test Complete");
}

Expand Down Expand Up @@ -1008,7 +1090,7 @@ private Auditor getAuditorBookiesAuditor() throws Exception {
return auditorElectors.get(bookieAddr).auditor;
}

private String shutDownNonAuditorBookie() throws Exception {
private String shutDownNonAuditorBookie() throws Exception {
// shutdown bookie which is not an auditor
int indexOf = indexOfServer(getAuditorBookie());
int bkIndexDownBookie;
Expand Down

0 comments on commit 11a4020

Please sign in to comment.