diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 3ebcd1c20ca87..5893fc4924413 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -267,7 +267,7 @@ public void cleanOwnerships() { @Override public synchronized boolean started() { - return validateChannelState(LeaderElectionServiceStarted, false); + return validateChannelState(Started, true); } public synchronized void start() throws PulsarServerException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index aee6532716cd8..e843222c8501b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3601,6 +3601,9 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t public @Nonnull CompletableFuture isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { requireNonNull(tpName); // Policies priority: topic level -> namespace level -> broker level + if (ExtensibleLoadManagerImpl.isInternalTopic(tpName.toString())) { + return CompletableFuture.completedFuture(true); + } return pulsar.getTopicPoliciesService() .getTopicPoliciesAsync(tpName, TopicPoliciesService.GetType.LOCAL_ONLY) .thenCompose(optionalTopicPolicies -> {