Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] upgrade bk to 4.16.0 for test #12

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/ci-go-functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ concurrency:
cancel-in-progress: true

env:
MAVEN_OPTS: -Xss1500k -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
MAVEN_OPTS: -Xss1500k -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true -Dmaven.wagon.http.serviceUnavailableRetryStrategy.class=standard -Dmaven.wagon.rto=60000

jobs:
preconditions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-maven-cache-update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ on:
- cron: '30 */12 * * *'

env:
MAVEN_OPTS: -Xss1500k -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
MAVEN_OPTS: -Xss1500k -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true -Dmaven.wagon.http.serviceUnavailableRetryStrategy.class=standard -Dmaven.wagon.rto=60000

jobs:
update-maven-dependencies-cache:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-owasp-dependency-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ on:
workflow_dispatch:

env:
MAVEN_OPTS: -Xss1500k -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
MAVEN_OPTS: -Xss1500k -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true -Dmaven.wagon.http.serviceUnavailableRetryStrategy.class=standard -Dmaven.wagon.rto=60000

jobs:
run-owasp-dependency-check:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pulsar-ci-flaky.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ concurrency:
cancel-in-progress: true

env:
MAVEN_OPTS: -Xss1500k -Xmx1024m -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
MAVEN_OPTS: -Xss1500k -Xmx1024m -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true -Dmaven.wagon.http.serviceUnavailableRetryStrategy.class=standard -Dmaven.wagon.rto=60000
# defines the retention period for the intermediate build artifacts needed for rerunning a failed build job
# it's possible to rerun individual failed jobs when the build artifacts are available
# if the artifacts have already been expired, the complete workflow can be rerun by closing and reopening the PR or by rebasing the PR
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ concurrency:
cancel-in-progress: true

env:
MAVEN_OPTS: -Xss1500k -Xmx1024m -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
MAVEN_OPTS: -Xss1500k -Xmx1024m -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true -Dmaven.wagon.http.serviceUnavailableRetryStrategy.class=standard -Dmaven.wagon.rto=60000
# defines the retention period for the intermediate build artifacts needed for rerunning a failed build job
# it's possible to rerun individual failed jobs when the build artifacts are available
# if the artifacts have already been expired, the complete workflow can be rerun by closing and reopening the PR or by rebasing the PR
Expand Down
54 changes: 28 additions & 26 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -343,35 +343,37 @@ The Apache Software License, Version 2.0
- org.apache.logging.log4j-log4j-slf4j-impl-2.18.0.jar
- org.apache.logging.log4j-log4j-web-2.18.0.jar
* Java Native Access JNA
- net.java.dev.jna-jna-5.12.1.jar
- net.java.dev.jna-jna-jpms-5.12.1.jar
- net.java.dev.jna-jna-platform-jpms-5.12.1.jar
* BookKeeper
- org.apache.bookkeeper-bookkeeper-common-4.15.4.jar
- org.apache.bookkeeper-bookkeeper-common-allocator-4.15.4.jar
- org.apache.bookkeeper-bookkeeper-proto-4.15.4.jar
- org.apache.bookkeeper-bookkeeper-server-4.15.4.jar
- org.apache.bookkeeper-bookkeeper-tools-framework-4.15.4.jar
- org.apache.bookkeeper-circe-checksum-4.15.4.jar
- org.apache.bookkeeper-cpu-affinity-4.15.4.jar
- org.apache.bookkeeper-statelib-4.15.4.jar
- org.apache.bookkeeper-stream-storage-api-4.15.4.jar
- org.apache.bookkeeper-stream-storage-common-4.15.4.jar
- org.apache.bookkeeper-stream-storage-java-client-4.15.4.jar
- org.apache.bookkeeper-stream-storage-java-client-base-4.15.4.jar
- org.apache.bookkeeper-stream-storage-proto-4.15.4.jar
- org.apache.bookkeeper-stream-storage-server-4.15.4.jar
- org.apache.bookkeeper-stream-storage-service-api-4.15.4.jar
- org.apache.bookkeeper-stream-storage-service-impl-4.15.4.jar
- org.apache.bookkeeper.http-http-server-4.15.4.jar
- org.apache.bookkeeper.http-vertx-http-server-4.15.4.jar
- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.15.4.jar
- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.15.4.jar
- org.apache.distributedlog-distributedlog-common-4.15.4.jar
- org.apache.distributedlog-distributedlog-core-4.15.4-tests.jar
- org.apache.distributedlog-distributedlog-core-4.15.4.jar
- org.apache.distributedlog-distributedlog-protocol-4.15.4.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.15.4.jar
- org.apache.bookkeeper-bookkeeper-common-4.16.0.jar
- org.apache.bookkeeper-bookkeeper-common-allocator-4.16.0.jar
- org.apache.bookkeeper-bookkeeper-proto-4.16.0.jar
- org.apache.bookkeeper-bookkeeper-server-4.16.0.jar
- org.apache.bookkeeper-bookkeeper-tools-framework-4.16.0.jar
- org.apache.bookkeeper-circe-checksum-4.16.0.jar
- org.apache.bookkeeper-cpu-affinity-4.16.0.jar
- org.apache.bookkeeper-statelib-4.16.0.jar
- org.apache.bookkeeper-stream-storage-api-4.16.0.jar
- org.apache.bookkeeper-stream-storage-common-4.16.0.jar
- org.apache.bookkeeper-stream-storage-java-client-4.16.0.jar
- org.apache.bookkeeper-stream-storage-java-client-base-4.16.0.jar
- org.apache.bookkeeper-stream-storage-proto-4.16.0.jar
- org.apache.bookkeeper-stream-storage-server-4.16.0.jar
- org.apache.bookkeeper-stream-storage-service-api-4.16.0.jar
- org.apache.bookkeeper-stream-storage-service-impl-4.16.0.jar
- org.apache.bookkeeper.http-http-server-4.16.0.jar
- org.apache.bookkeeper.http-vertx-http-server-4.16.0.jar
- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.16.0.jar
- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.16.0.jar
- org.apache.distributedlog-distributedlog-common-4.16.0.jar
- org.apache.distributedlog-distributedlog-core-4.16.0-tests.jar
- org.apache.distributedlog-distributedlog-core-4.16.0.jar
- org.apache.distributedlog-distributedlog-protocol-4.16.0.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.16.0.jar
- org.apache.bookkeeper-bookkeeper-slogger-api-4.16.0.jar
- org.apache.bookkeeper-bookkeeper-slogger-slf4j-4.16.0.jar
- org.apache.bookkeeper-native-io-4.16.0.jar
* Apache HTTP Client
- org.apache.httpcomponents-httpclient-4.5.13.jar
- org.apache.httpcomponents-httpcore-4.4.15.jar
Expand Down
6 changes: 3 additions & 3 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,9 @@ The Apache Software License, Version 2.0
- log4j-web-2.18.0.jar

* BookKeeper
- bookkeeper-common-allocator-4.15.4.jar
- cpu-affinity-4.15.4.jar
- circe-checksum-4.15.4.jar
- bookkeeper-common-allocator-4.16.0.jar
- cpu-affinity-4.16.0.jar
- circe-checksum-4.16.0.jar
* AirCompressor
- aircompressor-0.20.jar
* AsyncHttpClient
Expand Down
12 changes: 10 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ flexible messaging model and an intuitive client API.</description>
<!-- apache commons -->
<commons-compress.version>1.21</commons-compress.version>

<bookkeeper.version>4.15.4</bookkeeper.version>
<bookkeeper.version>4.16.0</bookkeeper.version>
<zookeeper.version>3.8.1</zookeeper.version>
<commons-cli.version>1.5.0</commons-cli.version>
<commons-text.version>1.10.0</commons-text.version>
Expand All @@ -149,7 +149,7 @@ flexible messaging model and an intuitive client API.</description>
<bouncycastle.bcpkix-fips.version>1.0.6</bouncycastle.bcpkix-fips.version>
<bouncycastle.bc-fips.version>1.0.2.3</bouncycastle.bc-fips.version>
<jackson.version>2.13.4.20221013</jackson.version>
<reflections.version>0.9.11</reflections.version>
<reflections.version>0.10.2</reflections.version>
<swagger.version>1.6.2</swagger.version>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
<docker-maven.version>0.40.2</docker-maven.version>
Expand Down Expand Up @@ -2455,5 +2455,13 @@ flexible messaging model and an intuitive client API.</description>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>bk-staging</id>
<name>bk-staging</name>
<url>https://repository.apache.org/content/repositories/orgapachebookkeeper-1082/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.Beta;
import java.util.NavigableSet;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

/**
Expand Down Expand Up @@ -81,8 +82,10 @@ public interface DelayedDeliveryTracker extends AutoCloseable {

/**
* Clear all delayed messages from the tracker.
*
* @return CompletableFuture<Void>
*/
void clear();
CompletableFuture<Void> clear();

/**
* Close the subscription tracker and release all resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -147,8 +148,9 @@ public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
}

@Override
public void clear() {
public CompletableFuture<Void> clear() {
this.priorityQueue.clear();
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.RoaringBitmap;

@Slf4j
Expand All @@ -47,6 +48,9 @@ abstract class Bucket {
protected final String dispatcherName;

protected final ManagedCursor cursor;

protected final FutureUtil.Sequencer<Void> sequencer;

protected final BucketSnapshotStorage bucketSnapshotStorage;

long startLedgerId;
Expand All @@ -67,9 +71,10 @@ abstract class Bucket {
private volatile CompletableFuture<Long> snapshotCreateFuture;


Bucket(String dispatcherName, ManagedCursor cursor,
Bucket(String dispatcherName, ManagedCursor cursor, FutureUtil.Sequencer<Void> sequencer,
BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
this(dispatcherName, cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null);
this(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0,
null, null);
}

boolean containsMessage(long ledgerId, long entryId) {
Expand Down Expand Up @@ -154,12 +159,16 @@ CompletableFuture<Long> asyncSaveBucketSnapshot(

private CompletableFuture<Void> putBucketKeyId(String bucketKey, Long bucketId) {
Objects.requireNonNull(bucketId);
return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, String.valueOf(bucketId)),
ManagedLedgerException.BadVersionException.class, MaxRetryTimes);
return sequencer.sequential(() -> {
return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, String.valueOf(bucketId)),
ManagedLedgerException.BadVersionException.class, MaxRetryTimes);
});
}

protected CompletableFuture<Void> removeBucketCursorProperty(String bucketKey) {
return executeWithRetry(() -> cursor.removeCursorProperty(bucketKey),
ManagedLedgerException.BadVersionException.class, MaxRetryTimes);
return sequencer.sequential(() -> {
return executeWithRetry(() -> cursor.removeCursorProperty(bucketKey),
ManagedLedgerException.BadVersionException.class, MaxRetryTimes);
});
}
}
Loading