Skip to content

Commit

Permalink
[fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify …
Browse files Browse the repository at this point in the history
…problem. (#21312)
  • Loading branch information
horizonzy authored Oct 18, 2023
1 parent d35618a commit af6449d
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 33 deletions.
2 changes: 0 additions & 2 deletions pulsar-metadata/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
</exclusions>
</dependency>

<!-- zookeeper server -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down Expand Up @@ -85,7 +84,6 @@
</exclusions>
</dependency>

<!-- zookeeper server -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>testmocks</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,17 @@ long getLedgerNodeVersion() {
private final String urLockPath;
private final String layoutPath;
private final String lostBookieRecoveryDelayPath;
private final String replicationDisablePath;
private final String checkAllLedgersCtimePath;
private final String placementPolicyCheckCtimePath;
private final String replicasCheckCtimePath;

private final MetadataStoreExtended store;

private BookkeeperInternalCallbacks.GenericCallback<Void> replicationEnabledListener;
private BookkeeperInternalCallbacks.GenericCallback<Void> lostBookieRecoveryDelayListener;
private final List<BookkeeperInternalCallbacks.GenericCallback<Void>> replicationEnabledCallbacks =
new ArrayList<>();
private final List<BookkeeperInternalCallbacks.GenericCallback<Void>> lostBookieRecoveryDelayCallbacks =
new ArrayList<>();

private static class PulsarUnderreplicatedLedger extends UnderreplicatedLedger {
PulsarUnderreplicatedLedger(long ledgerId) {
Expand All @@ -136,6 +139,7 @@ public PulsarLedgerUnderreplicationManager(AbstractConfiguration<?> conf, Metada
urLedgerPath = basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
lostBookieRecoveryDelayPath = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
replicationDisablePath = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
checkAllLedgersCtimePath = basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
placementPolicyCheckCtimePath = basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
replicasCheckCtimePath = basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME;
Expand Down Expand Up @@ -229,17 +233,34 @@ private void handleNotification(Notification n) {
synchronized (this) {
// Notify that there were some changes on the under-replicated z-nodes
notifyAll();

if (n.getType() == NotificationType.Deleted) {
if (n.getPath().equals(basePath + '/' + BookKeeperConstants.DISABLE_NODE)) {
log.info("LedgerReplication is enabled externally through MetadataStore, "
+ "since DISABLE_NODE ZNode is deleted");
if (replicationEnabledListener != null) {
replicationEnabledListener.operationComplete(0, null);
if (lostBookieRecoveryDelayPath.equals(n.getPath())) {
final List<BookkeeperInternalCallbacks.GenericCallback<Void>> callbackList;
synchronized (lostBookieRecoveryDelayCallbacks) {
callbackList = new ArrayList<>(lostBookieRecoveryDelayCallbacks);
lostBookieRecoveryDelayCallbacks.clear();
}
for (BookkeeperInternalCallbacks.GenericCallback<Void> callback : callbackList) {
try {
callback.operationComplete(0, null);
} catch (Exception e) {
log.warn("lostBookieRecoveryDelayCallbacks handle error", e);
}
} else if (n.getPath().equals(lostBookieRecoveryDelayPath)) {
if (lostBookieRecoveryDelayListener != null) {
lostBookieRecoveryDelayListener.operationComplete(0, null);
}
return;
}
if (replicationDisablePath.equals(n.getPath()) && n.getType() == NotificationType.Deleted) {
log.info("LedgerReplication is enabled externally through MetadataStore, "
+ "since DISABLE_NODE ZNode is deleted");
final List<BookkeeperInternalCallbacks.GenericCallback<Void>> callbackList;
synchronized (replicationEnabledCallbacks) {
callbackList = new ArrayList<>(replicationEnabledCallbacks);
replicationEnabledCallbacks.clear();
}
for (BookkeeperInternalCallbacks.GenericCallback<Void> callback : callbackList) {
try {
callback.operationComplete(0, null);
} catch (Exception e) {
log.warn("replicationEnabledCallbacks handle error", e);
}
}
}
Expand Down Expand Up @@ -671,8 +692,7 @@ public void disableLedgerReplication()
log.debug("disableLedegerReplication()");
}
try {
String path = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
store.put(path, "".getBytes(UTF_8), Optional.of(-1L)).get();
store.put(replicationDisablePath, "".getBytes(UTF_8), Optional.of(-1L)).get();
log.info("Auto ledger re-replication is disabled!");
} catch (ExecutionException ee) {
log.error("Exception while stopping auto ledger re-replication", ee);
Expand All @@ -692,7 +712,7 @@ public void enableLedgerReplication()
log.debug("enableLedegerReplication()");
}
try {
store.delete(basePath + '/' + BookKeeperConstants.DISABLE_NODE, Optional.empty()).get();
store.delete(replicationDisablePath, Optional.empty()).get();
log.info("Resuming automatic ledger re-replication");
} catch (ExecutionException ee) {
log.error("Exception while resuming ledger replication", ee);
Expand All @@ -712,7 +732,7 @@ public boolean isLedgerReplicationEnabled()
log.debug("isLedgerReplicationEnabled()");
}
try {
return !store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE).get();
return !store.exists(replicationDisablePath).get();
} catch (ExecutionException ee) {
log.error("Error while checking the state of "
+ "ledger re-replication", ee);
Expand All @@ -731,13 +751,11 @@ public void notifyLedgerReplicationEnabled(final BookkeeperInternalCallbacks.Gen
if (log.isDebugEnabled()) {
log.debug("notifyLedgerReplicationEnabled()");
}

synchronized (this) {
replicationEnabledListener = cb;
synchronized (replicationEnabledCallbacks) {
replicationEnabledCallbacks.add(cb);
}

try {
if (!store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE).get()) {
if (!store.exists(replicationDisablePath).get()) {
log.info("LedgerReplication is enabled externally through metadata store, "
+ "since DISABLE_NODE node is deleted");
cb.operationComplete(0, null);
Expand Down Expand Up @@ -826,8 +844,8 @@ public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableE
public void notifyLostBookieRecoveryDelayChanged(BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws
ReplicationException.UnavailableException {
log.debug("notifyLostBookieRecoveryDelayChanged()");
synchronized (this) {
lostBookieRecoveryDelayListener = cb;
synchronized (lostBookieRecoveryDelayCallbacks) {
lostBookieRecoveryDelayCallbacks.add(cb);
}
try {
if (!store.exists(lostBookieRecoveryDelayPath).get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNotSame;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import static org.testng.AssertJUnit.fail;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -40,15 +40,20 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.util.OrderedScheduler;
Expand All @@ -60,11 +65,7 @@
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.Cleanup;
Expand Down Expand Up @@ -614,6 +615,8 @@ public void testDisableLedgerReplication(String provider, Supplier<String> urlSu
final String missingReplica = "localhost:3181";

// disabling replication
AtomicInteger callbackCount = new AtomicInteger();
lum.notifyLedgerReplicationEnabled((rc, result) -> callbackCount.incrementAndGet());
lum.disableLedgerReplication();
log.info("Disabled Ledeger Replication");

Expand All @@ -631,6 +634,7 @@ public void testDisableLedgerReplication(String provider, Supplier<String> urlSu
} catch (TimeoutException te) {
// expected behaviour, as the replication is disabled
}
assertEquals(callbackCount.get(), 1, "Notify callback times mismatch");
}

/**
Expand All @@ -651,7 +655,8 @@ public void testEnableLedgerReplication(String provider, Supplier<String> urlSup
log.debug("Unexpected exception while marking urLedger", e);
fail("Unexpected exception while marking urLedger" + e.getMessage());
}

AtomicInteger callbackCount = new AtomicInteger();
lum.notifyLedgerReplicationEnabled((rc, result) -> callbackCount.incrementAndGet());
// disabling replication
lum.disableLedgerReplication();
log.debug("Disabled Ledeger Replication");
Expand Down Expand Up @@ -688,6 +693,7 @@ public void testEnableLedgerReplication(String provider, Supplier<String> urlSup
znodeLatch.await(5, TimeUnit.SECONDS);
log.debug("Enabled Ledeger Replication");
assertEquals(znodeLatch.getCount(), 0, "Failed to disable ledger replication!");
assertEquals(callbackCount.get(), 2, "Notify callback times mismatch");
} finally {
thread1.interrupt();
}
Expand Down Expand Up @@ -749,6 +755,17 @@ public void testReplicasCheckCTime(String provider, Supplier<String> urlSupplier
assertEquals(underReplicaMgr1.getReplicasCheckCTime(), curTime);
}

@Test(timeOut = 60000, dataProvider = "impl")
public void testLostBookieRecoveryDelay(String provider, Supplier<String> urlSupplier) throws Exception {
methodSetup(urlSupplier);

AtomicInteger callbackCount = new AtomicInteger();
lum.notifyLostBookieRecoveryDelayChanged((rc, result) -> callbackCount.incrementAndGet());
// disabling replication
lum.setLostBookieRecoveryDelay(10);
Awaitility.await().until(() -> callbackCount.get() == 2);
}

private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica) throws Exception {
Long ledgerA = 0xfeadeefdacL;
String znodeA = getUrLedgerZnode(ledgerA);
Expand Down

0 comments on commit af6449d

Please sign in to comment.