From 8e490a3acc151dbf599bc7b2227360a1b41df006 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 10 Sep 2024 15:21:27 +0800 Subject: [PATCH 1/7] [fix] [broker] Fix compayibility for system topics' offloader --- .../mledger/impl/ManagedLedgerImpl.java | 31 +++-- .../impl/ReadonlyWrapperLedgerOffloader.java | 69 +++++++++++ .../mledger/impl/OffloadPrefixReadTest.java | 114 ++++++++++++++++-- .../test/MockedBookKeeperTestCase.java | 8 +- .../pulsar/broker/service/BrokerService.java | 39 +++--- .../broker/service/BrokerServiceTest.java | 21 +++- 6 files changed, 240 insertions(+), 42 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadonlyWrapperLedgerOffloader.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 92c55e572a49a..7f6307ce71345 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -94,6 +94,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerAttributes; @@ -2451,8 +2452,7 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture p } public void maybeOffloadInBackground(CompletableFuture promise) { - if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE - || config.getLedgerOffloader().getOffloadPolicies() == null) { + if (getOffloadPoliciesIfEnabled() == null) { return; } @@ -2468,8 +2468,8 @@ public void maybeOffloadInBackground(CompletableFuture promise) { private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds, CompletableFuture finalPromise) { - if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE - || config.getLedgerOffloader().getOffloadPolicies() == null) { + LedgerOffloader ledgerOffloader = config.getLedgerOffloader(); + if (getOffloadPoliciesIfEnabled() == null) { String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name); finalPromise.completeExceptionally(new IllegalArgumentException(msg)); return; @@ -2572,6 +2572,17 @@ void internalTrimConsumedLedgers(CompletableFuture promise) { internalTrimLedgers(false, promise); } + private OffloadPolicies getOffloadPoliciesIfEnabled() { + LedgerOffloader ledgerOffloader = config.getLedgerOffloader(); + if (ledgerOffloader == null + || ledgerOffloader == NullLedgerOffloader.INSTANCE + || ledgerOffloader instanceof ReadonlyWrapperLedgerOffloader + || ledgerOffloader.getOffloadPolicies() == null) { + return null; + } + return ledgerOffloader.getOffloadPolicies(); + } + void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { if (!factory.isMetadataServiceAvailable()) { // Defer trimming of ledger if we cannot connect to metadata service @@ -2587,10 +2598,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { List ledgersToDelete = new ArrayList<>(); List offloadedLedgersToDelete = new ArrayList<>(); - Optional optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null - && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE - ? config.getLedgerOffloader().getOffloadPolicies() - : null); + Optional optionalOffloadPolicies = Optional.ofNullable(getOffloadPoliciesIfEnabled()); synchronized (this) { if (log.isDebugEnabled()) { log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(), @@ -3117,8 +3125,11 @@ public void offloadFailed(ManagedLedgerException e, Object ctx) { @Override public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) { - if (config.getLedgerOffloader() != null && config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE) { - callback.offloadFailed(new ManagedLedgerException("NullLedgerOffloader"), ctx); + LedgerOffloader ledgerOffloader = config.getLedgerOffloader(); + if (ledgerOffloader != null && (ledgerOffloader == NullLedgerOffloader.INSTANCE + || ledgerOffloader instanceof ReadonlyWrapperLedgerOffloader)) { + String msg = String.format("[%s] does not support offload", ledgerOffloader.getClass().getSimpleName()); + callback.offloadFailed(new ManagedLedgerException(msg), ctx); return; } Position requestOffloadTo = pos; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadonlyWrapperLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadonlyWrapperLedgerOffloader.java new file mode 100644 index 0000000000000..430bbf697ef80 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadonlyWrapperLedgerOffloader.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.util.FutureUtil; + +public class ReadonlyWrapperLedgerOffloader implements LedgerOffloader { + private LedgerOffloader delegate; + + public ReadonlyWrapperLedgerOffloader(LedgerOffloader delegate) { + this.delegate = delegate; + } + + @Override + public String getOffloadDriverName() { + return delegate.getOffloadDriverName(); + } + + @Override + public CompletableFuture offload(ReadHandle ledger, + UUID uid, + Map extraMetadata) { + return FutureUtil.failedFuture(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture readOffloaded(long ledgerId, UUID uid, + Map offloadDriverMetadata) { + return delegate.readOffloaded(ledgerId, uid, offloadDriverMetadata); + } + + @Override + public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, + Map offloadDriverMetadata) { + return delegate.deleteOffloaded(ledgerId, uid, offloadDriverMetadata); + } + + @Override + public OffloadPolicies getOffloadPolicies() { + return delegate.getOffloadPolicies(); + } + + @Override + public void close() { + delegate.close(); + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 6d8ecba868847..9a5c2475c5206 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; import java.util.ArrayList; @@ -54,6 +55,8 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.util.MockClock; @@ -61,12 +64,34 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; +import org.awaitility.Awaitility; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { - @Test - public void testOffloadRead() throws Exception { + + private final String offloadTypeReadOnly = "readOnly"; + + @Override + protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig config) { + super.initManagedLedgerFactoryConfig(config); + // disable cache. + config.setMaxCacheSize(0); + } + + @DataProvider(name = "offloadAndDeleteTypes") + public Object[][] offloadAndDeleteTypes() { + return new Object[][]{ + {"normal", true}, + {"normal", false}, + {offloadTypeReadOnly, true}, + {offloadTypeReadOnly, false}, + }; + } + + @Test(dataProvider = "offloadAndDeleteTypes") + public void testOffloadRead(String offloadType, boolean deleteMl) throws Exception { MockLedgerOffloader offloader = spy(MockLedgerOffloader.class); ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); @@ -89,6 +114,10 @@ public void testOffloadRead() throws Exception { Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete()); Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete()); + if (offloadTypeReadOnly.equals(offloadType)) { + config.setLedgerOffloader(new ReadonlyWrapperLedgerOffloader(offloader)); + } + UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(), ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb()); UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(), @@ -116,13 +145,30 @@ public void testOffloadRead() throws Exception { verify(offloader, times(2)) .readOffloaded(anyLong(), (UUID) any(), anyMap()); - ledger.close(); - // Ensure that all the read handles had been closed - assertEquals(offloader.openedReadHandles.get(), 0); + if (!deleteMl) { + ledger.close(); + // Ensure that all the read handles had been closed + assertEquals(offloader.openedReadHandles.get(), 0); + } else { + // Verify: the ledger offloaded will be deleted after managed ledger is deleted. + ledger.delete(); + Awaitility.await().untilAsserted(() -> { + assertTrue(offloader.offloads.size() <= 1); + assertTrue(ledger.ledgers.size() <= 1); + }); + } } - @Test - public void testBookkeeperFirstOffloadRead() throws Exception { + @DataProvider(name = "offloadTypes") + public Object[][] offloadTypes() { + return new Object[][]{ + {"normal"}, + {offloadTypeReadOnly}, + }; + } + + @Test(dataProvider = "offloadTypes") + public void testBookkeeperFirstOffloadRead(String offloadType) throws Exception { MockLedgerOffloader offloader = spy(MockLedgerOffloader.class); MockClock clock = new MockClock(); offloader.getOffloadPolicies() @@ -187,6 +233,10 @@ public void testBookkeeperFirstOffloadRead() throws Exception { Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted()); Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted()); + if (offloadTypeReadOnly.equals(offloadType)) { + config.setLedgerOffloader(new ReadonlyWrapperLedgerOffloader(offloader)); + } + for (Entry e : cursor.readEntries(10)) { Assert.assertEquals(new String(e.getData()), "entry-" + i++); } @@ -196,6 +246,56 @@ public void testBookkeeperFirstOffloadRead() throws Exception { .readOffloaded(anyLong(), (UUID) any(), anyMap()); verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap()); + // Verify: the ledger offloaded will be trimmed after if no backlog. + while (cursor.hasMoreEntries()) { + cursor.readEntries(1); + } + config.setRetentionTime(0, TimeUnit.MILLISECONDS); + config.setRetentionSizeInMB(0); + CompletableFuture trimFuture = new CompletableFuture(); + ledger.trimConsumedLedgersInBackground(trimFuture); + trimFuture.join(); + Awaitility.await().untilAsserted(() -> { + assertTrue(offloader.offloads.size() <= 1); + assertTrue(ledger.ledgers.size() <= 1); + }); + + // cleanup. + ledger.delete(); + } + + + + @Test + public void testSkipOffloadIfReadOnly() throws Exception { + LedgerOffloader ol = new ReadonlyWrapperLedgerOffloader(spy(MockLedgerOffloader.class)); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + config.setLedgerOffloader(ol); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config); + + for (int i = 0; i < 25; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + assertEquals(ledger.getLedgersInfoAsList().size(), 3); + + try { + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + } catch (ManagedLedgerException mle) { + assertTrue(mle.getMessage().contains("does not support offload")); + } + + assertEquals(ledger.getLedgersInfoAsList().size(), 3); + Assert.assertFalse(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete()); + Assert.assertFalse(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete()); + Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete()); + + // cleanup. + ledger.delete(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index 645563eb78c4d..c7685cfaa6594 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -83,13 +83,17 @@ public final void setUp(Method method) throws Exception { } ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); - // increase default cache eviction interval so that caching could be tested with less flakyness - managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200); + initManagedLedgerFactoryConfig(managedLedgerFactoryConfig); factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); setUpTestCase(); } + protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig config) { + // increase default cache eviction interval so that caching could be tested with less flakyness + config.setCacheEvictionIntervalMs(200); + } + protected void setUpTestCase() throws Exception { } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c0e3e7d356be0..aa34f2735c421 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -95,6 +95,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; +import org.apache.bookkeeper.mledger.impl.ReadonlyWrapperLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -2018,29 +2019,25 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T topicLevelOffloadPolicies, OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), getPulsar().getConfig().getProperties()); - if (NamespaceService.isSystemServiceNamespace(namespace.toString()) - || SystemTopicNames.isSystemTopic(topicName)) { - /* - Avoid setting broker internal system topics using off-loader because some of them are the - preconditions of other topics. The slow replying log speed will cause a delay in all the topic - loading.(timeout) - */ - managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); - } else { - if (topicLevelOffloadPolicies != null) { - try { - LedgerOffloader topicLevelLedgerOffLoader = - pulsar().createManagedLedgerOffloader(offloadPolicies); - managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); - } catch (PulsarServerException e) { - throw new RuntimeException(e); - } - } else { - //If the topic level policy is null, use the namespace level - managedLedgerConfig - .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); + if (topicLevelOffloadPolicies != null) { + try { + LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies); + managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); + } catch (PulsarServerException e) { + throw new RuntimeException(e); } + } else { + //If the topic level policy is null, use the namespace level + managedLedgerConfig + .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } + if (managedLedgerConfig.getLedgerOffloader() != NullLedgerOffloader.INSTANCE && + (NamespaceService.isSystemServiceNamespace(namespace.toString()) + || SystemTopicNames.isSystemTopic(topicName))) { + managedLedgerConfig.setLedgerOffloader( + new ReadonlyWrapperLedgerOffloader(managedLedgerConfig.getLedgerOffloader())); + } + managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad()); managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index aa236e09da99d..e301b57cbb91d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -76,6 +76,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; +import org.apache.bookkeeper.mledger.impl.ReadonlyWrapperLedgerOffloader; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -1883,6 +1884,10 @@ public void close() { final String namespace = "prop/" + UUID.randomUUID(); admin.namespaces().createNamespace(namespace); admin.namespaces().setOffloadPolicies(namespace, offloadPolicies); + Awaitility.await().untilAsserted(() -> { + OffloadPolicies policiesGot = admin.namespaces().getOffloadPolicies(namespace); + assertNotNull(policiesGot); + }); // Inject the cache to avoid real load off-loader jar final Map ledgerOffloaderMap = pulsar.getLedgerOffloaderMap(); @@ -1896,8 +1901,20 @@ public void close() { // (2) test system topic for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) { - managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join(); - Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), NullLedgerOffloader.INSTANCE); + boolean offloadPoliciesExists = false; + try { + OffloadPolicies policiesGot = + admin.namespaces().getOffloadPolicies(TopicName.get(eventTopicName).getNamespace()); + offloadPoliciesExists = policiesGot != null; + } catch (PulsarAdminException.NotFoundException notFoundException) { + offloadPoliciesExists = false; + } + var managedLedgerConfig2 = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join(); + if (offloadPoliciesExists) { + Assert.assertTrue(managedLedgerConfig2.getLedgerOffloader() instanceof ReadonlyWrapperLedgerOffloader); + } else { + Assert.assertEquals(managedLedgerConfig2.getLedgerOffloader(), NullLedgerOffloader.INSTANCE); + } } } } From 17dea21706b1f28dec553384cdfdc074c9b8a208 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 10 Sep 2024 16:46:42 +0800 Subject: [PATCH 2/7] address comments --- .../bookkeeper/mledger/LedgerOffloader.java | 4 ++++ .../mledger/impl/ManagedLedgerImpl.java | 19 ++++++++----------- ...java => NonAppendableLedgerOffloader.java} | 9 +++++++-- .../mledger/impl/NullLedgerOffloader.java | 5 +++++ .../mledger/impl/OffloadPrefixReadTest.java | 6 +++--- .../pulsar/broker/service/BrokerService.java | 4 ++-- .../broker/service/BrokerServiceTest.java | 4 ++-- 7 files changed, 31 insertions(+), 20 deletions(-) rename managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/{ReadonlyWrapperLedgerOffloader.java => NonAppendableLedgerOffloader.java} (91%) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java index 868a8e4265365..11148ef1a59f5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java @@ -230,5 +230,9 @@ default void scanLedgers(OffloadedLedgerMetadataConsumer consumer, Map offloadDriverMetadata) throws ManagedLedgerException { throw ManagedLedgerException.getManagedLedgerException(new UnsupportedOperationException()); } + + default boolean isAppendable() { + return true; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7f6307ce71345..8cb5a3ee6acec 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2452,7 +2452,7 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture p } public void maybeOffloadInBackground(CompletableFuture promise) { - if (getOffloadPoliciesIfEnabled() == null) { + if (getOffloadPoliciesIfAppendable().isEmpty()) { return; } @@ -2468,8 +2468,7 @@ public void maybeOffloadInBackground(CompletableFuture promise) { private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds, CompletableFuture finalPromise) { - LedgerOffloader ledgerOffloader = config.getLedgerOffloader(); - if (getOffloadPoliciesIfEnabled() == null) { + if (getOffloadPoliciesIfAppendable().isEmpty()) { String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name); finalPromise.completeExceptionally(new IllegalArgumentException(msg)); return; @@ -2572,15 +2571,14 @@ void internalTrimConsumedLedgers(CompletableFuture promise) { internalTrimLedgers(false, promise); } - private OffloadPolicies getOffloadPoliciesIfEnabled() { + private Optional getOffloadPoliciesIfAppendable() { LedgerOffloader ledgerOffloader = config.getLedgerOffloader(); if (ledgerOffloader == null - || ledgerOffloader == NullLedgerOffloader.INSTANCE - || ledgerOffloader instanceof ReadonlyWrapperLedgerOffloader + || !ledgerOffloader.isAppendable() || ledgerOffloader.getOffloadPolicies() == null) { - return null; + return Optional.empty(); } - return ledgerOffloader.getOffloadPolicies(); + return Optional.ofNullable(ledgerOffloader.getOffloadPolicies()); } void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { @@ -2598,7 +2596,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { List ledgersToDelete = new ArrayList<>(); List offloadedLedgersToDelete = new ArrayList<>(); - Optional optionalOffloadPolicies = Optional.ofNullable(getOffloadPoliciesIfEnabled()); + Optional optionalOffloadPolicies = getOffloadPoliciesIfAppendable(); synchronized (this) { if (log.isDebugEnabled()) { log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(), @@ -3126,8 +3124,7 @@ public void offloadFailed(ManagedLedgerException e, Object ctx) { @Override public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) { LedgerOffloader ledgerOffloader = config.getLedgerOffloader(); - if (ledgerOffloader != null && (ledgerOffloader == NullLedgerOffloader.INSTANCE - || ledgerOffloader instanceof ReadonlyWrapperLedgerOffloader)) { + if (ledgerOffloader != null && !ledgerOffloader.isAppendable()) { String msg = String.format("[%s] does not support offload", ledgerOffloader.getClass().getSimpleName()); callback.offloadFailed(new ManagedLedgerException(msg), ctx); return; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadonlyWrapperLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java similarity index 91% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadonlyWrapperLedgerOffloader.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java index 430bbf697ef80..f3001ec8050e2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadonlyWrapperLedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java @@ -26,10 +26,10 @@ import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.util.FutureUtil; -public class ReadonlyWrapperLedgerOffloader implements LedgerOffloader { +public class NonAppendableLedgerOffloader implements LedgerOffloader { private LedgerOffloader delegate; - public ReadonlyWrapperLedgerOffloader(LedgerOffloader delegate) { + public NonAppendableLedgerOffloader(LedgerOffloader delegate) { this.delegate = delegate; } @@ -66,4 +66,9 @@ public OffloadPolicies getOffloadPolicies() { public void close() { delegate.close(); } + + @Override + public boolean isAppendable() { + return false; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java index 938ceb0c7dfbc..fe646bc82e55a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java @@ -70,4 +70,9 @@ public OffloadPolicies getOffloadPolicies() { public void close() { } + + @Override + public boolean isAppendable() { + return false; + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 9a5c2475c5206..18d52a23475e8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -115,7 +115,7 @@ public void testOffloadRead(String offloadType, boolean deleteMl) throws Excepti Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete()); if (offloadTypeReadOnly.equals(offloadType)) { - config.setLedgerOffloader(new ReadonlyWrapperLedgerOffloader(offloader)); + config.setLedgerOffloader(new NonAppendableLedgerOffloader(offloader)); } UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(), @@ -234,7 +234,7 @@ public void testBookkeeperFirstOffloadRead(String offloadType) throws Exception Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted()); if (offloadTypeReadOnly.equals(offloadType)) { - config.setLedgerOffloader(new ReadonlyWrapperLedgerOffloader(offloader)); + config.setLedgerOffloader(new NonAppendableLedgerOffloader(offloader)); } for (Entry e : cursor.readEntries(10)) { @@ -268,7 +268,7 @@ public void testBookkeeperFirstOffloadRead(String offloadType) throws Exception @Test public void testSkipOffloadIfReadOnly() throws Exception { - LedgerOffloader ol = new ReadonlyWrapperLedgerOffloader(spy(MockLedgerOffloader.class)); + LedgerOffloader ol = new NonAppendableLedgerOffloader(spy(MockLedgerOffloader.class)); ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); config.setMinimumRolloverTime(0, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index aa34f2735c421..99e043ad643ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -95,7 +95,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; -import org.apache.bookkeeper.mledger.impl.ReadonlyWrapperLedgerOffloader; +import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -2035,7 +2035,7 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T (NamespaceService.isSystemServiceNamespace(namespace.toString()) || SystemTopicNames.isSystemTopic(topicName))) { managedLedgerConfig.setLedgerOffloader( - new ReadonlyWrapperLedgerOffloader(managedLedgerConfig.getLedgerOffloader())); + new NonAppendableLedgerOffloader(managedLedgerConfig.getLedgerOffloader())); } managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index e301b57cbb91d..2f27d5917f025 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -76,7 +76,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; -import org.apache.bookkeeper.mledger.impl.ReadonlyWrapperLedgerOffloader; +import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -1911,7 +1911,7 @@ public void close() { } var managedLedgerConfig2 = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join(); if (offloadPoliciesExists) { - Assert.assertTrue(managedLedgerConfig2.getLedgerOffloader() instanceof ReadonlyWrapperLedgerOffloader); + Assert.assertTrue(managedLedgerConfig2.getLedgerOffloader() instanceof NonAppendableLedgerOffloader); } else { Assert.assertEquals(managedLedgerConfig2.getLedgerOffloader(), NullLedgerOffloader.INSTANCE); } From a0887adf38d601b84fad71e6d031139dfb5f0c6a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 10 Sep 2024 16:54:32 +0800 Subject: [PATCH 3/7] checkstyle --- .../org/apache/pulsar/broker/service/BrokerService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 99e043ad643ee..5075b42983487 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -94,8 +94,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; -import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -2031,8 +2031,8 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T managedLedgerConfig .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } - if (managedLedgerConfig.getLedgerOffloader() != NullLedgerOffloader.INSTANCE && - (NamespaceService.isSystemServiceNamespace(namespace.toString()) + if (managedLedgerConfig.getLedgerOffloader().isAppendable() + && (NamespaceService.isSystemServiceNamespace(namespace.toString()) || SystemTopicNames.isSystemTopic(topicName))) { managedLedgerConfig.setLedgerOffloader( new NonAppendableLedgerOffloader(managedLedgerConfig.getLedgerOffloader())); From 450aecfbe1c75b442ef6f0c7f9393bfa8e1bc44b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 10 Sep 2024 16:58:14 +0800 Subject: [PATCH 4/7] rename readonly->appendable --- .../mledger/impl/OffloadPrefixReadTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 18d52a23475e8..48751417e1714 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -71,7 +71,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { - private final String offloadTypeReadOnly = "readOnly"; + private final String offloadTypeAppendable = "NonAppendable"; @Override protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig config) { @@ -85,8 +85,8 @@ public Object[][] offloadAndDeleteTypes() { return new Object[][]{ {"normal", true}, {"normal", false}, - {offloadTypeReadOnly, true}, - {offloadTypeReadOnly, false}, + {offloadTypeAppendable, true}, + {offloadTypeAppendable, false}, }; } @@ -114,7 +114,7 @@ public void testOffloadRead(String offloadType, boolean deleteMl) throws Excepti Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete()); Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete()); - if (offloadTypeReadOnly.equals(offloadType)) { + if (offloadTypeAppendable.equals(offloadType)) { config.setLedgerOffloader(new NonAppendableLedgerOffloader(offloader)); } @@ -163,7 +163,7 @@ public void testOffloadRead(String offloadType, boolean deleteMl) throws Excepti public Object[][] offloadTypes() { return new Object[][]{ {"normal"}, - {offloadTypeReadOnly}, + {offloadTypeAppendable}, }; } @@ -233,7 +233,7 @@ public void testBookkeeperFirstOffloadRead(String offloadType) throws Exception Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted()); Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted()); - if (offloadTypeReadOnly.equals(offloadType)) { + if (offloadTypeAppendable.equals(offloadType)) { config.setLedgerOffloader(new NonAppendableLedgerOffloader(offloader)); } From 56e217a20d9ec862ea3a6c862d7d9307ec543353 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Sep 2024 09:43:18 +0800 Subject: [PATCH 5/7] - --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 5075b42983487..c3fc2923fccd6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -95,7 +95,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader; -import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; From 0236f093421a5413faa8c4f3f8e4311bac996352 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Sep 2024 10:52:52 +0800 Subject: [PATCH 6/7] fix test --- .../org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java | 2 +- .../java/org/apache/pulsar/broker/service/BrokerService.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 331e7b0317394..3f9f4f8da12f2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -95,7 +95,7 @@ public void testNullOffloader() throws Exception { ledger.offloadPrefix(p); fail("Should have thrown an exception"); } catch (ManagedLedgerException e) { - assertEquals(e.getMessage(), "NullLedgerOffloader"); + assertTrue(e.getMessage().contains("does not support offload")); } assertEquals(ledger.getLedgersInfoAsList().size(), 5); assertEquals(ledger.getLedgersInfoAsList().stream() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c3fc2923fccd6..17e5288b5f179 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2030,7 +2030,8 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T managedLedgerConfig .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } - if (managedLedgerConfig.getLedgerOffloader().isAppendable() + if (managedLedgerConfig.getLedgerOffloader() != null + && managedLedgerConfig.getLedgerOffloader().isAppendable() && (NamespaceService.isSystemServiceNamespace(namespace.toString()) || SystemTopicNames.isSystemTopic(topicName))) { managedLedgerConfig.setLedgerOffloader( From 54488931000a1d8d6ac9f7e8a51407c51c4fa570 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 11 Sep 2024 14:30:39 +0800 Subject: [PATCH 7/7] fix test --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- .../org/apache/pulsar/broker/admin/AdminApiOffloadTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index e3b272babb7bb..bb38114ef7117 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3849,7 +3849,7 @@ public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exc config.setLedgerOffloader(ledgerOffloader); ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE); - verify(ledgerOffloader, times(1)).getOffloadPolicies(); + verify(ledgerOffloader, times(1)).isAppendable(); } @Test(timeOut = 30000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index eac816bd81089..1ea29c9d431bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -126,6 +126,7 @@ private void testOffload(String topicName, String mlName) throws Exception { CompletableFuture promise = new CompletableFuture<>(); doReturn(promise).when(offloader).offload(any(), any(), any()); + doReturn(true).when(offloader).isAppendable(); MessageId currentId = MessageId.latest; try (Producer p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {