From d2d9b9419f0874b9fcac1b5d69cae991a4ae2894 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 25 Oct 2024 11:24:57 +0800 Subject: [PATCH] [fix][standalone] correctly delete bookie registration znode (#23497) Signed-off-by: Zixuan Liu (cherry picked from commit ebb3cb5384d188b3c9d6b3fe89c2f66254103bf7) --- .../zookeeper/LocalBookkeeperEnsemble.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index a1b0b75acce29..13b171217a4c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -297,22 +297,16 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { } int bookiePort = portManager.get(); - + String bookieId = "bk" + i + "test"; // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully - String registrationZnode = String.format("/ledgers/available/%s:%d", - baseConf.getAdvertisedAddress(), bookiePort); - if (zkc.exists(registrationZnode, null) != null) { - try { - zkc.delete(registrationZnode, -1); - } catch (NoNodeException nne) { - // Ignore if z-node was just expired - } - } + deleteBookieRegistrationZnode( + String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort)); + deleteBookieRegistrationZnode(String.format("/ledgers/available/%s", bookieId)); bsConfs[i] = new ServerConfiguration(baseConf); // override settings bsConfs[i].setBookiePort(bookiePort); - bsConfs[i].setBookieId("bk" + i + "test"); + bsConfs[i].setBookieId(bookieId); String zkServers = "127.0.0.1:" + zkPort; String metadataServiceUriStr = "zk://" + zkServers + "/ledgers"; @@ -327,6 +321,16 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { } } + private void deleteBookieRegistrationZnode(String registrationZnode) throws InterruptedException, KeeperException { + if (zkc.exists(registrationZnode, null) != null) { + try { + zkc.delete(registrationZnode, -1); + } catch (NoNodeException nne) { + // Ignore if z-node was just expired + } + } + } + public void runStreamStorage(CompositeConfiguration conf) throws Exception { String zkServers = "127.0.0.1:" + zkPort; String metadataServiceUriStr = "zk://" + zkServers + "/ledgers";