diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bbfb5498e2657..5b6d551ac8bca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -2679,13 +2680,13 @@ public CompletableFuture getInternalStats(boolean info.entries = -1; info.size = -1; - Optional compactedTopicContext = getCompactedTopicContext(); - if (compactedTopicContext.isPresent()) { - CompactedTopicContext ledgerContext = compactedTopicContext.get(); - info.ledgerId = ledgerContext.getLedger().getId(); - info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; - info.size = ledgerContext.getLedger().getLength(); - } + futures.add(getCompactedTopicContextAsync().thenAccept(v -> { + if (v != null) { + info.ledgerId = v.getLedger().getId(); + info.entries = v.getLedger().getLastAddConfirmed() + 1; + info.size = v.getLedger().getLength(); + } + })); stats.compactedLedger = info; @@ -2804,12 +2805,24 @@ public Optional getCompactedTopicContext() { if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) { return pulsarCompactedService.getCompactedTopic().getCompactedTopicContext(); } - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException | InterruptedException | TimeoutException e) { log.warn("[{}]Fail to get ledger information for compacted topic.", topic); } return Optional.empty(); } + public CompletableFuture getCompactedTopicContextAsync() { + if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) { + CompletableFuture res = + pulsarCompactedService.getCompactedTopic().getCompactedTopicContextFuture(); + if (res == null) { + return CompletableFuture.completedFuture(null); + } + return res; + } + return CompletableFuture.completedFuture(null); + } + public long getBacklogSize() { return ledger.getEstimatedBacklogSize(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index dfafbc41cb45c..149d4691c8809 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -32,6 +32,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.bookkeeper.client.BKException; @@ -304,8 +306,10 @@ static CompletableFuture> readEntries(LedgerHandle lh, long from, lo * Getter for CompactedTopicContext. * @return CompactedTopicContext */ - public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException { - return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get()); + public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException, + TimeoutException { + return compactedTopicContext == null ? Optional.empty() : + Optional.of(compactedTopicContext.get(30, TimeUnit.SECONDS)); } @Override