diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 0c93a5b642cf6..a097d41539f48 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -33,7 +33,7 @@ import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.commons.collections4.MapUtils; -import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; +import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet; /** * Configuration class for a ManagedLedger. @@ -281,7 +281,7 @@ public ManagedLedgerConfig setPassword(String password) { } /** - * should use {@link ConcurrentOpenLongPairRangeSet} to store unacked ranges. + * should use {@link OpenLongPairRangeSet} to store unacked ranges. * @return */ public boolean isUnackedRangesOpenCacheSetEnabled() { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 353ac0fb7e26b..5d9e8da37b269 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1099,7 +1099,12 @@ public long getNumberOfEntriesSinceFirstNotAckedMessage() { @Override public int getTotalNonContiguousDeletedMessagesRange() { - return individualDeletedMessages.size(); + lock.readLock().lock(); + try { + return individualDeletedMessages.size(); + } finally { + lock.readLock().unlock(); + } } @Override @@ -2377,8 +2382,9 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb callback.deleteFailed(getManagedLedgerException(e), ctx); return; } finally { + boolean empty = individualDeletedMessages.isEmpty(); lock.writeLock().unlock(); - if (individualDeletedMessages.isEmpty()) { + if (empty) { callback.deleteComplete(ctx); } } @@ -2656,10 +2662,15 @@ public void operationFailed(MetaStoreException e) { } private boolean shouldPersistUnackRangesToLedger() { - return cursorLedger != null - && !isCursorLedgerReadOnly - && getConfig().getMaxUnackedRangesToPersist() > 0 - && individualDeletedMessages.size() > getConfig().getMaxUnackedRangesToPersistInMetadataStore(); + lock.readLock().lock(); + try { + return cursorLedger != null + && !isCursorLedgerReadOnly + && getConfig().getMaxUnackedRangesToPersist() > 0 + && individualDeletedMessages.size() > getConfig().getMaxUnackedRangesToPersistInMetadataStore(); + } finally { + lock.readLock().unlock(); + } } private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map properties, @@ -3019,7 +3030,7 @@ private static List buildStringPropertiesMap(Map } private List buildIndividualDeletedMessageRanges() { - lock.readLock().lock(); + lock.writeLock().lock(); try { if (individualDeletedMessages.isEmpty()) { this.individualDeletedMessagesSerializedSize = 0; @@ -3061,7 +3072,7 @@ private List buildIndividualDeletedMessageRanges() { individualDeletedMessages.resetDirtyKeys(); return rangeList; } finally { - lock.readLock().unlock(); + lock.writeLock().unlock(); } } @@ -3423,8 +3434,13 @@ public LongPairRangeSet getIndividuallyDeletedMessagesSet() { public boolean isMessageDeleted(Position position) { checkArgument(position instanceof PositionImpl); - return ((PositionImpl) position).compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); + lock.readLock().lock(); + try { + return ((PositionImpl) position).compareTo(markDeletePosition) <= 0 + || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()); + } finally { + lock.readLock().unlock(); + } } //this method will return a copy of the position's ack set @@ -3453,13 +3469,19 @@ public long[] getBatchPositionAckSet(Position position) { * @return next available position */ public PositionImpl getNextAvailablePosition(PositionImpl position) { - Range range = individualDeletedMessages.rangeContaining(position.getLedgerId(), - position.getEntryId()); - if (range != null) { - PositionImpl nextPosition = range.upperEndpoint().getNext(); - return (nextPosition != null && nextPosition.compareTo(position) > 0) ? nextPosition : position.getNext(); + lock.readLock().lock(); + try { + Range range = individualDeletedMessages.rangeContaining(position.getLedgerId(), + position.getEntryId()); + if (range != null) { + PositionImpl nextPosition = range.upperEndpoint().getNext(); + return (nextPosition != null && nextPosition.compareTo(position) > 0) + ? nextPosition : position.getNext(); + } + return position.getNext(); + } finally { + lock.readLock().unlock(); } - return position.getNext(); } public Position getNextLedgerPosition(long currentLedgerId) { @@ -3510,7 +3532,12 @@ public ManagedLedger getManagedLedger() { @Override public Range getLastIndividualDeletedRange() { - return individualDeletedMessages.lastRange(); + lock.readLock().lock(); + try { + return individualDeletedMessages.lastRange(); + } finally { + lock.readLock().unlock(); + } } @Override @@ -3640,15 +3667,20 @@ public ManagedLedgerConfig getConfig() { public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException { NonDurableCursorImpl newNonDurableCursor = (NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName); - if (individualDeletedMessages != null) { - this.individualDeletedMessages.forEach(range -> { - newNonDurableCursor.individualDeletedMessages.addOpenClosed( - range.lowerEndpoint().getLedgerId(), - range.lowerEndpoint().getEntryId(), - range.upperEndpoint().getLedgerId(), - range.upperEndpoint().getEntryId()); - return true; - }); + lock.readLock().lock(); + try { + if (individualDeletedMessages != null) { + this.individualDeletedMessages.forEach(range -> { + newNonDurableCursor.individualDeletedMessages.addOpenClosed( + range.lowerEndpoint().getLedgerId(), + range.lowerEndpoint().getEntryId(), + range.upperEndpoint().getLedgerId(), + range.upperEndpoint().getEntryId()); + return true; + }); + } + } finally { + lock.readLock().unlock(); } if (batchDeletedIndexes != null) { for (Map.Entry entry : this.batchDeletedIndexes.entrySet()) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java index f235ffc63ace5..299fd3dc74cb4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java @@ -25,8 +25,8 @@ import java.util.Collection; import java.util.List; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; import org.apache.pulsar.common.util.collections.LongPairRangeSet; +import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet; /** * Wraps other Range classes, and adds LRU, marking dirty data and other features on this basis. @@ -55,7 +55,7 @@ public RangeSetWrapper(LongPairConsumer rangeConverter, this.config = managedCursor.getManagedLedger().getConfig(); this.rangeConverter = rangeConverter; this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled() - ? new ConcurrentOpenLongPairRangeSet<>(4096, rangeConverter) + ? new OpenLongPairRangeSet<>(4096, rangeConverter) : new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer); this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled(); } @@ -148,16 +148,16 @@ public int cardinality(long lowerKey, long lowerValue, long upperKey, long upper @VisibleForTesting void add(Range range) { - if (!(rangeSet instanceof ConcurrentOpenLongPairRangeSet)) { + if (!(rangeSet instanceof OpenLongPairRangeSet)) { throw new UnsupportedOperationException("Only ConcurrentOpenLongPairRangeSet support this method"); } - ((ConcurrentOpenLongPairRangeSet) rangeSet).add(range); + ((OpenLongPairRangeSet) rangeSet).add(range); } @VisibleForTesting void remove(Range range) { - if (rangeSet instanceof ConcurrentOpenLongPairRangeSet) { - ((ConcurrentOpenLongPairRangeSet) rangeSet).remove((Range) range); + if (rangeSet instanceof OpenLongPairRangeSet) { + ((OpenLongPairRangeSet) rangeSet).remove((Range) range); } else { ((DefaultRangeSet) rangeSet).remove(range); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java similarity index 97% rename from pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java index 72215d7296cc3..c053c106be206 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java @@ -28,6 +28,7 @@ import java.util.NavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.concurrent.NotThreadSafe; import org.apache.commons.lang.mutable.MutableInt; /** @@ -41,7 +42,8 @@ * So, this rangeSet is not suitable for large number of unique keys. * */ -public class ConcurrentOpenLongPairRangeSet> implements LongPairRangeSet { +@NotThreadSafe +public class OpenLongPairRangeSet> implements LongPairRangeSet { protected final NavigableMap rangeBitSetMap = new ConcurrentSkipListMap<>(); private boolean threadSafe = true; @@ -54,15 +56,15 @@ public class ConcurrentOpenLongPairRangeSet> implements private volatile boolean updatedAfterCachedForSize = true; private volatile boolean updatedAfterCachedForToString = true; - public ConcurrentOpenLongPairRangeSet(LongPairConsumer consumer) { + public OpenLongPairRangeSet(LongPairConsumer consumer) { this(1024, true, consumer); } - public ConcurrentOpenLongPairRangeSet(int size, LongPairConsumer consumer) { + public OpenLongPairRangeSet(int size, LongPairConsumer consumer) { this(size, true, consumer); } - public ConcurrentOpenLongPairRangeSet(int size, boolean threadSafe, LongPairConsumer consumer) { + public OpenLongPairRangeSet(int size, boolean threadSafe, LongPairConsumer consumer) { this.threadSafe = threadSafe; this.bitSetSize = size; this.consumer = consumer; diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java index f6103061a420c..730f4b4ceca22 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java @@ -34,8 +34,8 @@ public class DefaultRangeSetTest { public void testBehavior() { LongPairRangeSet.DefaultRangeSet set = new LongPairRangeSet.DefaultRangeSet<>(consumer, reverseConsumer); - ConcurrentOpenLongPairRangeSet rangeSet = - new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet rangeSet = + new OpenLongPairRangeSet<>(consumer); assertNull(set.firstRange()); assertNull(set.lastRange()); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSetTest.java similarity index 92% rename from pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSetTest.java index 40bb337935742..4dd0f5551f1f9 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSetTest.java @@ -37,14 +37,14 @@ import com.google.common.collect.Range; import com.google.common.collect.TreeRangeSet; -public class ConcurrentOpenLongPairRangeSetTest { +public class OpenLongPairRangeSetTest { static final LongPairConsumer consumer = LongPair::new; static final RangeBoundConsumer reverseConsumer = pair -> pair; @Test public void testIsEmpty() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); assertTrue(set.isEmpty()); // lowerValueOpen and upperValue are both -1 so that an empty set will be added set.addOpenClosed(0, -1, 0, -1); @@ -55,7 +55,7 @@ public void testIsEmpty() { @Test public void testAddForSameKey() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); // add 0 to 5 set.add(Range.closed(new LongPair(0, 0), new LongPair(0, 5))); // add 8,9,10 @@ -76,7 +76,7 @@ public void testAddForSameKey() { @Test public void testAddForDifferentKey() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); // [98,100],[(1,5),(1,5)],[(1,10,1,15)],[(1,20),(1,20)],[(2,0),(2,10)] set.addOpenClosed(0, 98, 0, 99); set.addOpenClosed(0, 100, 1, 5); @@ -93,7 +93,7 @@ public void testAddForDifferentKey() { @Test public void testAddCompareCompareWithGuava() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); com.google.common.collect.RangeSet gSet = TreeRangeSet.create(); // add 10K values for key 0 @@ -132,14 +132,14 @@ public void testAddCompareCompareWithGuava() { @Test public void testNPE() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); assertNull(set.span()); } @Test public void testDeleteCompareWithGuava() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); com.google.common.collect.RangeSet gSet = TreeRangeSet.create(); // add 10K values for key 0 @@ -193,7 +193,7 @@ public void testDeleteCompareWithGuava() { @Test public void testRemoveRangeInSameKey() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); set.addOpenClosed(0, 1, 0, 50); set.addOpenClosed(0, 97, 0, 99); set.addOpenClosed(0, 99, 1, 5); @@ -217,7 +217,7 @@ public void testRemoveRangeInSameKey() { @Test public void testSpanWithGuava() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); com.google.common.collect.RangeSet gSet = TreeRangeSet.create(); set.add(Range.openClosed(new LongPair(0, 97), new LongPair(0, 99))); gSet.add(Range.openClosed(new LongPair(0, 97), new LongPair(0, 99))); @@ -242,7 +242,7 @@ public void testSpanWithGuava() { @Test public void testFirstRange() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); assertNull(set.firstRange()); Range range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)); set.add(range); @@ -260,7 +260,7 @@ public void testFirstRange() { @Test public void testLastRange() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); assertNull(set.lastRange()); Range range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)); set.add(range); @@ -282,7 +282,7 @@ public void testLastRange() { @Test public void testToString() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); Range range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)); set.add(range); assertEquals(set.toString(), "[(0:97..0:99]]"); @@ -296,7 +296,7 @@ public void testToString() { @Test public void testDeleteForDifferentKey() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); set.addOpenClosed(0, 97, 0, 99); set.addOpenClosed(0, 99, 1, 5); set.addOpenClosed(1, 9, 1, 15); @@ -327,7 +327,7 @@ public void testDeleteForDifferentKey() { @Test public void testDeleteWithAtMost() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99))); set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5))); set.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15))); @@ -353,7 +353,7 @@ public void testDeleteWithAtMost() { @Test public void testDeleteWithLeastMost() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99))); set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5))); set.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15))); @@ -382,7 +382,7 @@ public void testDeleteWithLeastMost() { @Test public void testRangeContaining() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99))); set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5))); com.google.common.collect.RangeSet gSet = TreeRangeSet.create(); @@ -423,7 +423,7 @@ public void testRangeContaining() { */ @Test public void testCacheFlagConflict() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); set.add(Range.openClosed(new LongPair(0, 1), new LongPair(0, 2))); set.add(Range.openClosed(new LongPair(0, 3), new LongPair(0, 4))); assertEquals(set.toString(), "[(0:1..0:2],(0:3..0:4]]"); @@ -466,7 +466,7 @@ private List> getConnectedRange(Set> gRanges) { @Test public void testCardinality() { - ConcurrentOpenLongPairRangeSet set = new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = new OpenLongPairRangeSet<>(consumer); int v = set.cardinality(0, 0, Integer.MAX_VALUE, Integer.MAX_VALUE); assertEquals(v, 0 ); set.addOpenClosed(1, 0, 1, 20); @@ -486,8 +486,8 @@ public void testCardinality() { @Test public void testForEachResultTheSameAsForEachWithRangeBoundMapper() { - ConcurrentOpenLongPairRangeSet set = - new ConcurrentOpenLongPairRangeSet<>(consumer); + OpenLongPairRangeSet set = + new OpenLongPairRangeSet<>(consumer); LongPairRangeSet.DefaultRangeSet defaultRangeSet = new LongPairRangeSet.DefaultRangeSet<>(consumer, reverseConsumer);