From 4b3b273c1c57741f9f9da2118eb4ec5dfeee2220 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 19 Sep 2024 22:33:32 +0800 Subject: [PATCH] [improve][broker] PIP-376: Make topic policies service pluggable (#23319) --- .../pulsar/broker/ServiceConfiguration.java | 12 +- .../apache/pulsar/broker/PulsarService.java | 25 +- .../pulsar/broker/admin/AdminResource.java | 25 +- .../admin/impl/PersistentTopicsBase.java | 34 +-- .../broker/loadbalance/LoadManager.java | 4 + .../extensions/ExtensibleLoadManagerImpl.java | 4 +- .../ExtensibleLoadManagerWrapper.java | 4 + .../channel/ServiceUnitStateChannel.java | 5 + .../channel/ServiceUnitStateChannelImpl.java | 5 + .../pulsar/broker/service/AbstractTopic.java | 24 +- .../pulsar/broker/service/BrokerService.java | 123 ++-------- .../service/BrokerServiceException.java | 6 - .../SystemTopicBasedTopicPoliciesService.java | 231 ++++++------------ .../broker/service/TopicPoliciesService.java | 193 +++------------ .../broker/service/TopicPolicyListener.java | 11 +- .../nonpersistent/NonPersistentTopic.java | 2 +- .../service/persistent/PersistentTopic.java | 22 +- .../pulsar/broker/admin/NamespacesTest.java | 20 +- .../broker/admin/TopicPoliciesTest.java | 77 +++--- .../service/InmemoryTopicPoliciesService.java | 81 ++++++ ...memoryTopicPoliciesServiceServiceTest.java | 91 +++++++ .../broker/service/OneWayReplicatorTest.java | 3 +- .../service/OneWayReplicatorTestBase.java | 4 +- .../PersistentTopicInitializeDelayTest.java | 4 +- .../broker/service/PersistentTopicTest.java | 3 +- .../service/ReplicatorTopicPoliciesTest.java | 13 +- ...temTopicBasedTopicPoliciesServiceTest.java | 130 +++------- .../broker/service/TopicPolicyTestUtils.java | 74 ++++++ .../persistent/PersistentTopicTest.java | 3 +- .../systopic/PartitionedSystemTopicTest.java | 4 +- .../broker/transaction/TransactionTest.java | 8 +- .../client/api/OrphanPersistentTopicTest.java | 5 +- 32 files changed, 571 insertions(+), 679 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesService.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesServiceServiceTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 42dc959426692..cdd27412e3052 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1554,6 +1554,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + "please enable the system topic first.") private boolean topicLevelPoliciesEnabled = true; + @FieldContext( + category = CATEGORY_SERVER, + doc = "The class name of the topic policies service. The default config only takes affect when the " + + "systemTopicEnable config is true" + ) + private String topicPoliciesServiceClassName = + "org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService"; + @FieldContext( category = CATEGORY_SERVER, doc = "List of interceptors for entry metadata.") @@ -3793,10 +3801,6 @@ public int getTopicOrderedExecutorThreadNum() { ? numWorkerThreadsForNonPersistentTopic : topicOrderedExecutorThreadNum; } - public boolean isSystemTopicAndTopicLevelPoliciesEnabled() { - return topicLevelPoliciesEnabled && systemTopicEnabled; - } - public Map lookupProperties() { final var map = new HashMap(); properties.forEach((key, value) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 87196d3f3a9a6..a2f6fb9e9773b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -989,11 +989,8 @@ public void start() throws PulsarServerException { this.nsService.initialize(); // Start topic level policies service - if (config.isSystemTopicAndTopicLevelPoliciesEnabled()) { - this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this); - } - - this.topicPoliciesService.start(); + this.topicPoliciesService = initTopicPoliciesService(); + this.topicPoliciesService.start(this); // Register heartbeat and bootstrap namespaces. this.nsService.registerBootstrapNamespaces(); @@ -2137,4 +2134,22 @@ public void initConfigMetadataSynchronizerIfNeeded() { mutex.unlock(); } } + + private TopicPoliciesService initTopicPoliciesService() throws Exception { + if (!config.isTopicLevelPoliciesEnabled()) { + return TopicPoliciesService.DISABLED; + } + final var className = Optional.ofNullable(config.getTopicPoliciesServiceClassName()) + .orElse(SystemTopicBasedTopicPoliciesService.class.getName()); + if (className.equals(SystemTopicBasedTopicPoliciesService.class.getName())) { + if (config.isSystemTopicEnabled()) { + return new SystemTopicBasedTopicPoliciesService(this); + } else { + LOG.warn("System topic is disabled while the topic policies service is {}, disable it", className); + return TopicPoliciesService.DISABLED; + } + } + return (TopicPoliciesService) Reflections.createInstance(className, + Thread.currentThread().getContextClassLoader()); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index d42dff39a8a0d..3268f07b13d88 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -45,6 +45,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.ClusterResources; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; @@ -365,14 +366,8 @@ protected CompletableFuture> getTopicPoliciesAsyncWithRe protected CompletableFuture> getTopicPoliciesAsyncWithRetry(TopicName topicName, boolean isGlobal) { - try { - checkTopicLevelPolicyEnable(); - return pulsar().getTopicPoliciesService() - .getTopicPoliciesAsyncWithRetry(topicName, null, pulsar().getExecutor(), isGlobal); - } catch (Exception e) { - log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e); - return FutureUtil.failedFuture(e); - } + final var type = isGlobal ? TopicPoliciesService.GetType.GLOBAL_ONLY : TopicPoliciesService.GetType.LOCAL_ONLY; + return pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName, type); } protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) { @@ -396,13 +391,6 @@ protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retent return true; } - protected void checkTopicLevelPolicyEnable() { - if (!config().isSystemTopicAndTopicLevelPoliciesEnabled()) { - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Topic level policies is disabled, to enable the topic level policy and retry."); - } - } - protected DispatchRateImpl dispatchRate() { return DispatchRateImpl.builder() .dispatchThrottlingRateInMsg(config().getDispatchThrottlingRatePerTopicInMsg()) @@ -784,11 +772,8 @@ protected CompletableFuture getSchemaCompatibilityS } protected CompletableFuture getSchemaCompatibilityStrategyAsyncWithoutAuth() { - CompletableFuture future = CompletableFuture.completedFuture(null); - if (config().isSystemTopicAndTopicLevelPoliciesEnabled()) { - future = getTopicPoliciesAsyncWithRetry(topicName) - .thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null)); - } + CompletableFuture future = getTopicPoliciesAsyncWithRetry(topicName) + .thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null)); return future.thenCompose((topicSchemaCompatibilityStrategy) -> { if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4d04dfeda7a74..bdbd70afbaeac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -84,6 +84,7 @@ import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -447,20 +448,9 @@ protected CompletableFuture internalCreateNonPartitionedTopicAsync(boolean return CompletableFuture.completedFuture(null); } // update remote cluster - return namespaceResources().getPoliciesAsync(namespaceName) - .thenCompose(policies -> { - if (!policies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - // Combine namespace level policies and topic level policies. - Set replicationClusters = policies.get().replication_clusters; - TopicPolicies topicPolicies = - pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName); - if (topicPolicies != null && topicPolicies.getReplicationClusters() != null) { - replicationClusters = topicPolicies.getReplicationClustersSet(); - } - // Do check replicated clusters. - if (replicationClusters.size() == 0) { + return getReplicationClusters() + .thenCompose(replicationClusters -> { + if (replicationClusters == null || replicationClusters.isEmpty()) { return CompletableFuture.completedFuture(null); } boolean containsCurrentCluster = @@ -495,6 +485,20 @@ protected CompletableFuture internalCreateNonPartitionedTopicAsync(boolean }); } + private CompletableFuture> getReplicationClusters() { + return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(optionalPolicies -> { + if (optionalPolicies.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + // Query the topic-level policies only if the namespace-level policies exist + final var namespacePolicies = optionalPolicies.get(); + return pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName, + TopicPoliciesService.GetType.DEFAULT + ).thenApply(optionalTopicPolicies -> optionalTopicPolicies.map(TopicPolicies::getReplicationClustersSet) + .orElse(namespacePolicies.replication_clusters)); + }); + } + protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { if (metadata != null && metadata.partitions > 0) { @@ -3655,7 +3659,7 @@ protected CompletableFuture internalSetReplicatorDispatchRate(DispatchRate } protected CompletableFuture preValidation(boolean authoritative) { - if (!config().isSystemTopicAndTopicLevelPoliciesEnabled()) { + if (!config().isTopicLevelPoliciesEnabled()) { return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED, "Topic level policies is disabled, to enable the topic level policy and retry.")); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index 0dd5d948480ab..db2fb2ffd0fa6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -53,6 +53,10 @@ public interface LoadManager { void start() throws PulsarServerException; + default boolean started() { + return true; + } + /** * Is centralized decision making to assign a new bundle. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 8e34f2f697fb1..f22bcc836f6e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -182,7 +182,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private SplitManager splitManager; - private volatile boolean started = false; + volatile boolean started = false; private boolean configuredSystemTopics = false; @@ -320,7 +320,7 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer private static boolean configureSystemTopics(PulsarService pulsar) { try { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar) - && pulsar.getConfiguration().isSystemTopicAndTopicLevelPoliciesEnabled()) { + && pulsar.getConfiguration().isTopicLevelPoliciesEnabled()) { Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC); if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) { pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 25eb27bc58d27..6a48607977ba9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -50,6 +50,10 @@ public void start() throws PulsarServerException { loadManager.start(); } + public boolean started() { + return loadManager.started && loadManager.getServiceUnitStateChannel().started(); + } + @Override public void initialize(PulsarService pulsar) { loadManager.initialize(pulsar); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 9be76e1b0f44d..6319fc332a678 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -43,6 +43,11 @@ public interface ServiceUnitStateChannel extends Closeable { */ void start() throws PulsarServerException; + /** + * Whether the channel started. + */ + boolean started(); + /** * Closes the ServiceUnitStateChannel. * @throws PulsarServerException if it fails to close the channel. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 1063f8124ece8..3ebcd1c20ca87 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -265,6 +265,11 @@ public void cleanOwnerships() { doCleanup(brokerId); } + @Override + public synchronized boolean started() { + return validateChannelState(LeaderElectionServiceStarted, false); + } + public synchronized void start() throws PulsarServerException { if (!validateChannelState(LeaderElectionServiceStarted, false)) { throw new IllegalStateException("Invalid channel state:" + channelState.name()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 9e5d6ef7191d1..3fdfeeee6e152 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -87,7 +87,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractTopic implements Topic, TopicPolicyListener { +public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; @@ -509,17 +509,13 @@ protected boolean isProducersExceeded(boolean isRemote) { } protected void registerTopicPolicyListener() { - if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) { - brokerService.getPulsar().getTopicPoliciesService() - .registerListener(TopicName.getPartitionedTopicName(topic), this); - } + brokerService.getPulsar().getTopicPoliciesService() + .registerListener(TopicName.getPartitionedTopicName(topic), this); } protected void unregisterTopicPolicyListener() { - if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) { - brokerService.getPulsar().getTopicPoliciesService() - .unregisterListener(TopicName.getPartitionedTopicName(topic), this); - } + brokerService.getPulsar().getTopicPoliciesService() + .unregisterListener(TopicName.getPartitionedTopicName(topic), this); } protected boolean isSameAddressProducersExceeded(Producer producer) { @@ -1253,16 +1249,8 @@ public InactiveTopicPolicies getInactiveTopicPolicies() { return topicPolicies.getInactiveTopicPolicies().get(); } - /** - * Get {@link TopicPolicies} for this topic. - * @return TopicPolicies, if they exist. Otherwise, the value will not be present. - */ - public Optional getTopicPolicies() { - return brokerService.getTopicPolicies(TopicName.get(topic)); - } - public CompletableFuture deleteTopicPolicies() { - return brokerService.deleteTopicPolicies(TopicName.get(topic)); + return brokerService.pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(TopicName.get(topic)); } protected int getWaitingProducersCount() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index cb5e0853d53f3..aee6532716cd8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -63,7 +63,6 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; @@ -171,6 +170,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Metrics; @@ -1177,15 +1177,8 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } private CompletableFuture> getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) { - Objects.requireNonNull(topicName); - final ServiceConfiguration serviceConfiguration = pulsar.getConfiguration(); - if (serviceConfiguration.isSystemTopicAndTopicLevelPoliciesEnabled() - && !NamespaceService.isSystemServiceNamespace(topicName.getNamespace()) - && !SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) { - return pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName); - } else { - return CompletableFuture.completedFuture(Optional.empty()); - } + return pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName, + TopicPoliciesService.GetType.DEFAULT); } public CompletableFuture deleteTopic(String topic, boolean forceDelete) { @@ -1239,13 +1232,7 @@ private CompletableFuture deleteTopicInternal(String topic, boolean forceD deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); deleteTopicAuthenticationFuture .thenCompose(__ -> deleteSchema(tn)) - .thenCompose(__ -> { - if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic) - && getPulsar().getConfiguration().isSystemTopicEnabled()) { - return deleteTopicPolicies(tn); - } - return CompletableFuture.completedFuture(null); - }).whenComplete((v, ex) -> { + .thenCompose(__ -> pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(tn)).whenComplete((v, ex) -> { if (ex != null) { future.completeExceptionally(ex); return; @@ -3611,71 +3598,25 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t return null; } - /** - * @deprecated Avoid using the deprecated method - * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking - * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. - */ - @Deprecated - public boolean isAllowAutoSubscriptionCreation(final String topic) { - TopicName topicName = TopicName.get(topic); - return isAllowAutoSubscriptionCreation(topicName); - } - - /** - * @deprecated Avoid using the deprecated method - * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking - * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. - */ - @Deprecated - public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { - AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = - getAutoSubscriptionCreationOverride(topicName); - if (autoSubscriptionCreationOverride != null) { - return autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); - } else { - return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); - } - } - - /** - * @deprecated Avoid using the deprecated method - * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking - * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. - */ - @Deprecated - private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { - Optional topicPolicies = getTopicPolicies(topicName); - if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) { - return topicPolicies.get().getAutoSubscriptionCreationOverride(); - } - - Optional policies = - pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject()); - // If namespace policies have the field set, it will override the broker-level setting - if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { - return policies.get().autoSubscriptionCreationOverride; - } - log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName); - return null; - } - - public @Nonnull CompletionStage isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { + public @Nonnull CompletableFuture isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { requireNonNull(tpName); - // topic level policies - final var topicPolicies = getTopicPolicies(tpName); - if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) { - return CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride() - .isAllowAutoSubscriptionCreation()); - } - // namespace level policies - return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject()) - .thenApply(policies -> { - if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { - return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); + // Policies priority: topic level -> namespace level -> broker level + return pulsar.getTopicPoliciesService() + .getTopicPoliciesAsync(tpName, TopicPoliciesService.GetType.LOCAL_ONLY) + .thenCompose(optionalTopicPolicies -> { + Boolean allowed = optionalTopicPolicies.map(TopicPolicies::getAutoSubscriptionCreationOverride) + .map(AutoSubscriptionCreationOverrideImpl::isAllowAutoSubscriptionCreation) + .orElse(null); + if (allowed != null) { + return CompletableFuture.completedFuture(allowed); } - // broker level policies - return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); + // namespace level policies + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync( + tpName.getNamespaceObject() + ).thenApply(optionalPolicies -> optionalPolicies.map(__ -> __.autoSubscriptionCreationOverride) + .map(AutoSubscriptionCreationOverride::isAllowAutoSubscriptionCreation) + // broker level policies + .orElse(pulsar.getConfiguration().isAllowAutoSubscriptionCreation())); }); } @@ -3688,28 +3629,6 @@ public boolean isSystemTopic(TopicName topicName) { || SystemTopicNames.isSystemTopic(topicName); } - /** - * Get {@link TopicPolicies} for the parameterized topic. - * @param topicName - * @return TopicPolicies, if they exist. Otherwise, the value will not be present. - */ - public Optional getTopicPolicies(TopicName topicName) { - if (!pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) { - return Optional.empty(); - } - return Optional.ofNullable(pulsar.getTopicPoliciesService() - .getTopicPoliciesIfExists(topicName)); - } - - public CompletableFuture deleteTopicPolicies(TopicName topicName) { - final PulsarService pulsarService = pulsar(); - if (!pulsarService.getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) { - return CompletableFuture.completedFuture(null); - } - return pulsar.getTopicPoliciesService() - .deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName())); - } - public CompletableFuture deleteSchema(TopicName topicName) { // delete schema at the upper level when deleting the partitioned topic. if (topicName.isPartitioned()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 6abe40f811d1d..d30dfc319e098 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -218,12 +218,6 @@ public ConsumerAssignException(String msg) { } } - public static class TopicPoliciesCacheNotInitException extends BrokerServiceException { - public TopicPoliciesCacheNotInitException() { - super("Topic policies cache have not init."); - } - } - public static class TopicBacklogQuotaExceededException extends BrokerServiceException { @Getter private final BacklogQuota.RetentionPolicy retentionPolicy; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 5156246bb5efb..18b4c610a5c9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -37,12 +37,10 @@ import javax.annotation.Nonnull; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; import org.apache.commons.lang3.concurrent.LazyInitializer; -import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.api.Message; @@ -56,10 +54,10 @@ import org.apache.pulsar.common.events.TopicPoliciesEvent; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.FutureUtil; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +99,7 @@ protected NamespaceEventsSystemTopicFactory initialize() { final Map> policyCacheInitMap = new ConcurrentHashMap<>(); @VisibleForTesting - final Map>> listeners = new ConcurrentHashMap<>(); + final Map> listeners = new ConcurrentHashMap<>(); private final AsyncLoadingCache> writerCaches; @@ -132,7 +130,7 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { @Override public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { - if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject()) || isSelf(topicName)) { return CompletableFuture.completedFuture(null); } return sendTopicPolicyEvent(topicName, ActionType.DELETE, null); @@ -216,7 +214,7 @@ private void notifyListener(Message msg) { if (msg.getValue() == null) { TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()); if (listeners.get(topicName) != null) { - for (TopicPolicyListener listener : listeners.get(topicName)) { + for (TopicPolicyListener listener : listeners.get(topicName)) { try { listener.onUpdate(null); } catch (Throwable error) { @@ -235,7 +233,7 @@ private void notifyListener(Message msg) { event.getNamespace(), event.getTopic()); if (listeners.get(topicName) != null) { TopicPolicies policies = event.getPolicies(); - for (TopicPolicyListener listener : listeners.get(topicName)) { + for (TopicPolicyListener listener : listeners.get(topicName)) { try { listener.onUpdate(policies); } catch (Throwable error) { @@ -246,115 +244,76 @@ private void notifyListener(Message msg) { } @Override - public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { - return getTopicPolicies(topicName, false); - } - - @Override - public TopicPolicies getTopicPolicies(TopicName topicName, - boolean isGlobal) throws TopicPoliciesCacheNotInitException { - if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { - return null; + public CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type) { + requireNonNull(topicName); + final var namespace = topicName.getNamespaceObject(); + if (NamespaceService.isHeartbeatNamespace(namespace) || isSelf(topicName)) { + return CompletableFuture.completedFuture(Optional.empty()); } - if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) { - NamespaceName namespace = topicName.getNamespaceObject(); - prepareInitPoliciesCacheAsync(namespace); + // When the extensible load manager initializes its channel topic, it will trigger the topic policies + // initialization by calling this method. At the moment, the load manager does not start so the lookup + // for "__change_events" will fail. In this case, just return an empty policies to avoid deadlock. + final var loadManager = pulsarService.getLoadManager().get(); + if (loadManager == null || !loadManager.started()) { + return CompletableFuture.completedFuture(Optional.empty()); } - - MutablePair result = new MutablePair<>(); - policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> { - if (initialized == null || !initialized.isDone()) { - result.setLeft(new TopicPoliciesCacheNotInitException()); + final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); + final var resultFuture = new CompletableFuture>(); + preparedFuture.thenAccept(inserted -> policyCacheInitMap.compute(namespace, (___, existingFuture) -> { + if (!inserted || existingFuture != null) { + final var partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + final var policies = Optional.ofNullable(switch (type) { + case DEFAULT -> Optional.ofNullable(policiesCache.get(partitionedTopicName)) + .orElseGet(() -> globalPoliciesCache.get(partitionedTopicName)); + case GLOBAL_ONLY -> globalPoliciesCache.get(partitionedTopicName); + case LOCAL_ONLY -> policiesCache.get(partitionedTopicName); + }); + resultFuture.complete(policies); } else { - TopicPolicies topicPolicies = - isGlobal ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())) - : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); - result.setRight(topicPolicies); - } - return initialized; - }); - - if (result.getLeft() != null) { - throw result.getLeft(); - } else { - return result.getRight(); - } - } - - @NotNull - @Override - public CompletableFuture> getTopicPoliciesAsync(@NotNull TopicName topicName, - boolean isGlobal) { - requireNonNull(topicName); - final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); - return preparedFuture.thenApply(__ -> { - final TopicPolicies candidatePolicies = isGlobal - ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())) - : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); - return Optional.ofNullable(candidatePolicies); - }); - } - - @NotNull - @Override - public CompletableFuture> getTopicPoliciesAsync(@NotNull TopicName topicName) { - requireNonNull(topicName); - final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); - return preparedFuture.thenApply(__ -> { - final TopicPolicies localPolicies = policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); - if (localPolicies != null) { - return Optional.of(localPolicies); + CompletableFuture.runAsync(() -> { + log.info("The future of {} has been removed from cache, retry getTopicPolicies again", namespace); + // Call it in another thread to avoid recursive update because getTopicPoliciesAsync() could call + // policyCacheInitMap.computeIfAbsent() + getTopicPoliciesAsync(topicName, type).whenComplete((result, e) -> { + if (e == null) { + resultFuture.complete(result); + } else { + resultFuture.completeExceptionally(e); + } + }); + }); } - return Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))); + return existingFuture; + })).exceptionally(e -> { + resultFuture.completeExceptionally(e); + return null; }); + return resultFuture; } - @Override - public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { - return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); - } - - @Override - public CompletableFuture getTopicPoliciesBypassCacheAsync(TopicName topicName) { - CompletableFuture result = new CompletableFuture<>(); - try { - createSystemTopicFactoryIfNeeded(); - } catch (PulsarServerException e) { - result.complete(null); - return result; - } - SystemTopicClient systemTopicClient = getNamespaceEventsSystemTopicFactory() - .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()); - systemTopicClient.newReaderAsync().thenAccept(r -> - fetchTopicPoliciesAsyncAndCloseReader(r, topicName, null, result)); - return result; - } - - @Override - public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { + public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { NamespaceName namespace = namespaceBundle.getNamespaceObject(); if (NamespaceService.isHeartbeatNamespace(namespace)) { - return CompletableFuture.completedFuture(null); + return; } synchronized (this) { if (readerCaches.get(namespace) != null) { ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); - return CompletableFuture.completedFuture(null); } else { - return prepareInitPoliciesCacheAsync(namespace); + prepareInitPoliciesCacheAsync(namespace); } } } @VisibleForTesting - @Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { + @Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { requireNonNull(namespace); return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) .thenCompose(namespacePolicies -> { if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) { log.info("[{}] skip prepare init policies cache since the namespace is deleted", namespace); - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(false); } return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { @@ -384,7 +343,7 @@ public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle name }); // let caller know we've got an exception. return initFuture; - }); + }).thenApply(__ -> true); }); } @@ -404,22 +363,20 @@ protected CompletableFuture> createSystemT return systemTopicClient.newReaderAsync(); } - @Override - public CompletableFuture removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { + private void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { NamespaceName namespace = namespaceBundle.getNamespaceObject(); if (NamespaceService.checkHeartbeatNamespace(namespace) != null || NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) { - return CompletableFuture.completedFuture(null); + return; } AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace); if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) { cleanCacheAndCloseReader(namespace, true, true); } - return CompletableFuture.completedFuture(null); } @Override - public void start() { + public void start(PulsarService pulsarService) { pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener( new NamespaceBundleOwnershipListener() { @@ -478,7 +435,7 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp // replay policy message policiesCache.forEach(((topicName, topicPolicies) -> { if (listeners.get(topicName) != null) { - for (TopicPolicyListener listener : listeners.get(topicName)) { + for (TopicPolicyListener listener : listeners.get(topicName)) { try { listener.onUpdate(topicPolicies); } catch (Throwable error) { @@ -525,7 +482,7 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean * This is an async method for the background reader to continue syncing new messages. * * Note: You should not do any blocking call here. because it will affect - * #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic. + * #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync} method to block loading topic. */ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) { if (closed.get()) { @@ -638,7 +595,8 @@ private void createSystemTopicFactoryIfNeeded() throws PulsarServerException { } } - private NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() { + @VisibleForTesting + NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() { try { return namespaceEventsSystemTopicFactoryLazyInitializer.get(); } catch (Exception e) { @@ -647,58 +605,6 @@ private NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() } } - private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader reader, - TopicName topicName, TopicPolicies policies, - CompletableFuture future) { - if (closed.get()) { - future.completeExceptionally(new BrokerServiceException(getClass().getName() + " is closed.")); - reader.closeAsync().whenComplete((v, e) -> { - if (e != null) { - log.error("[{}] Close reader error.", topicName, e); - } - }); - return; - } - reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { - if (ex != null) { - future.completeExceptionally(ex); - } - if (hasMore != null && hasMore) { - reader.readNextAsync().whenComplete((msg, e) -> { - if (e != null) { - future.completeExceptionally(e); - } - if (msg.getValue() != null - && EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { - TopicPoliciesEvent topicPoliciesEvent = msg.getValue().getTopicPoliciesEvent(); - if (topicName.equals(TopicName.get( - topicPoliciesEvent.getDomain(), - topicPoliciesEvent.getTenant(), - topicPoliciesEvent.getNamespace(), - topicPoliciesEvent.getTopic())) - ) { - fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, - topicPoliciesEvent.getPolicies(), future); - } else { - fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, policies, future); - } - } else { - future.complete(null); - } - }); - } else { - if (!future.isDone()) { - future.complete(policies); - } - reader.closeAsync().whenComplete((v, e) -> { - if (e != null) { - log.error("[{}] Close reader error.", topicName, e); - } - }); - } - }); - } - public static String getEventKey(PulsarEvent event) { return TopicName.get(event.getTopicPoliciesEvent().getDomain(), event.getTopicPoliciesEvent().getTenant(), @@ -718,11 +624,6 @@ long getPoliciesCacheSize() { return policiesCache.size(); } - @VisibleForTesting - long getReaderCacheCount() { - return readerCaches.size(); - } - @VisibleForTesting boolean checkReaderIsCached(NamespaceName namespaceName) { return readerCaches.get(namespaceName) != null; @@ -734,7 +635,7 @@ public CompletableFuture getPoliciesCacheInit(NamespaceName namespaceName) } @Override - public void registerListener(TopicName topicName, TopicPolicyListener listener) { + public boolean registerListener(TopicName topicName, TopicPolicyListener listener) { listeners.compute(topicName, (k, topicListeners) -> { if (topicListeners == null) { topicListeners = new CopyOnWriteArrayList<>(); @@ -742,10 +643,11 @@ public void registerListener(TopicName topicName, TopicPolicyListener listener) { + public void unregisterListener(TopicName topicName, TopicPolicyListener listener) { listeners.compute(topicName, (k, topicListeners) -> { if (topicListeners != null){ topicListeners.remove(listener); @@ -763,7 +665,7 @@ protected Map getPoliciesCache() { } @VisibleForTesting - protected Map>> getListeners() { + protected Map> getListeners() { return listeners; } @@ -792,4 +694,13 @@ public void close() throws Exception { readerCaches.clear(); } } + + private static boolean isSelf(TopicName topicName) { + final var localName = topicName.getLocalName(); + if (!topicName.isPartitioned()) { + return localName.equals(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + } + final var index = localName.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX); + return localName.substring(0, index).equals(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index eca31ec230a8e..9b5d9a28ac216 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -20,38 +20,31 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; -import org.apache.pulsar.client.util.RetryUtil; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; -import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; -import org.apache.pulsar.common.util.Backoff; -import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; -import org.jetbrains.annotations.NotNull; /** * Topic policies service. */ -@InterfaceStability.Evolving +@InterfaceStability.Stable +@InterfaceAudience.LimitedPrivate public interface TopicPoliciesService extends AutoCloseable { TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled(); - long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000; /** - * Delete policies for a topic async. + * Delete policies for a topic asynchronously. * * @param topicName topic name */ CompletableFuture deleteTopicPoliciesAsync(TopicName topicName); /** - * Update policies for a topic async. + * Update policies for a topic asynchronously. * * @param topicName topic name * @param policies policies for the topic name @@ -59,119 +52,56 @@ public interface TopicPoliciesService extends AutoCloseable { CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies); /** - * Get policies for a topic async. - * @param topicName topic name - * @return future of the topic policies - */ - TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException; - - /** - * Get policies from current cache. - * @param topicName topic name - * @return the topic policies - */ - TopicPolicies getTopicPoliciesIfExists(TopicName topicName); - - /** - * Get global policies for a topic async. - * @param topicName topic name - * @return future of the topic policies - */ - TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; - - /** - * When getting TopicPolicies, if the initialization has not been completed, - * we will go back off and try again until time out. - * @param topicName topic name - * @param backoff back off policy - * @param isGlobal is global policies - * @return CompletableFuture<Optional<TopicPolicies>> + * It controls the behavior of {@link TopicPoliciesService#getTopicPoliciesAsync}. */ - default CompletableFuture> getTopicPoliciesAsyncWithRetry(TopicName topicName, - final Backoff backoff, ScheduledExecutorService scheduledExecutorService, boolean isGlobal) { - CompletableFuture> response = new CompletableFuture<>(); - Backoff usedBackoff = backoff == null ? new BackoffBuilder() - .setInitialTime(500, TimeUnit.MILLISECONDS) - .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) - .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) - .create() : backoff; - try { - RetryUtil.retryAsynchronously(() -> { - CompletableFuture> future = new CompletableFuture<>(); - try { - future.complete(Optional.ofNullable(getTopicPolicies(topicName, isGlobal))); - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException exception) { - future.completeExceptionally(exception); - } - return future; - }, usedBackoff, scheduledExecutorService, response); - } catch (Exception e) { - response.completeExceptionally(e); - } - return response; + enum GetType { + DEFAULT, // try getting the local topic policies, if not present, then get the global policies + GLOBAL_ONLY, // only get the global policies + LOCAL_ONLY, // only get the local policies } /** - * Asynchronously retrieves topic policies. - * This triggers the Pulsar broker's internal client to load policies from the - * system topic `persistent://tenant/namespace/__change_event`. - * - * @param topicName The name of the topic. - * @param isGlobal Indicates if the policies are global. - * @return A CompletableFuture containing an Optional of TopicPolicies. - * @throws NullPointerException If the topicName is null. + * Retrieve the topic policies. */ - @Nonnull - CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal); + CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type); /** - * Asynchronously retrieves topic policies. - * This triggers the Pulsar broker's internal client to load policies from the - * system topic `persistent://tenant/namespace/__change_event`. - * - * NOTE: If local policies are not available, it will fallback to using topic global policies. - * @param topicName The name of the topic. - * @return A CompletableFuture containing an Optional of TopicPolicies. - * @throws NullPointerException If the topicName is null. + * Start the topic policy service. */ - @Nonnull - CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName); + default void start(PulsarService pulsar) { + } /** - * Get policies for a topic without cache async. - * @param topicName topic name - * @return future of the topic policies + * Close the resources if necessary. */ - CompletableFuture getTopicPoliciesBypassCacheAsync(TopicName topicName); + default void close() throws Exception { + } /** - * Add owned namespace bundle async. + * Registers a listener for topic policies updates. * - * @param namespaceBundle namespace bundle - */ - CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle); - - /** - * Remove owned namespace bundle async. + *

+ * The listener will receive the latest topic policies when they are updated. If the policies are removed, the + * listener will receive a null value. Note that not every update is guaranteed to trigger the listener. For + * instance, if the policies change from A -> B -> null -> C in quick succession, only the final state (C) is + * guaranteed to be received by the listener. + * In summary, the listener is guaranteed to receive only the latest value. + *

* - * @param namespaceBundle namespace bundle + * @return true if the listener is registered successfully */ - CompletableFuture removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle); + boolean registerListener(TopicName topicName, TopicPolicyListener listener); /** - * Start the topic policy service. + * Unregister the topic policies listener. */ - void start(); - - void registerListener(TopicName topicName, TopicPolicyListener listener); - - void unregisterListener(TopicName topicName, TopicPolicyListener listener); + void unregisterListener(TopicName topicName, TopicPolicyListener listener); class TopicPoliciesServiceDisabled implements TopicPoliciesService { @Override public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { - return FutureUtil.failedFuture(new UnsupportedOperationException("Topic policies service is disabled.")); + return CompletableFuture.completedFuture(null); } @Override @@ -180,68 +110,17 @@ public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, Top } @Override - public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { - return null; - } - - @Override - public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) - throws TopicPoliciesCacheNotInitException { - return null; - } - - @NotNull - @Override - public CompletableFuture> getTopicPoliciesAsync(@NotNull TopicName topicName, - boolean isGlobal) { - return CompletableFuture.completedFuture(Optional.empty()); - } - - @NotNull - @Override - public CompletableFuture> getTopicPoliciesAsync(@NotNull TopicName topicName) { + public CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type) { return CompletableFuture.completedFuture(Optional.empty()); } @Override - public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { - return null; - } - - @Override - public CompletableFuture getTopicPoliciesBypassCacheAsync(TopicName topicName) { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { - //No-op - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { - //No-op - return CompletableFuture.completedFuture(null); - } - - @Override - public void start() { - //No-op - } - - @Override - public void registerListener(TopicName topicName, TopicPolicyListener listener) { - //No-op - } - - @Override - public void unregisterListener(TopicName topicName, TopicPolicyListener listener) { - //No-op + public boolean registerListener(TopicName topicName, TopicPolicyListener listener) { + return false; } @Override - public void close() { + public void unregisterListener(TopicName topicName, TopicPolicyListener listener) { //No-op } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java index 7f7fd154ab035..a597e2ef9aedf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPolicyListener.java @@ -18,6 +18,13 @@ */ package org.apache.pulsar.broker.service; -public interface TopicPolicyListener { - void onUpdate(T data); +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; +import org.apache.pulsar.common.policies.data.TopicPolicies; + +@InterfaceStability.Stable +@InterfaceAudience.LimitedPrivate +public interface TopicPolicyListener { + + void onUpdate(TopicPolicies data); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 1b98ee2f8306d..2abd505d527cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -102,7 +102,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener { +public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener { // Subscriptions to this topic private final ConcurrentOpenHashMap subscriptions; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fc47889c60aac..e951ffab1e230 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -125,6 +125,7 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.SubscriptionOption; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; @@ -1512,14 +1513,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema()) - .thenCompose(ignore -> { - if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic) - && brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) { - return deleteTopicPolicies(); - } else { - return CompletableFuture.completedFuture(null); - } - }) + .thenCompose(ignore -> deleteTopicPolicies()) .thenCompose(ignore -> transactionBufferCleanupAndClose()) .whenComplete((v, ex) -> { if (ex != null) { @@ -4327,12 +4321,12 @@ private void updateSubscriptionsDispatcherRateLimiter() { } protected CompletableFuture initTopicPolicy() { - if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) { - brokerService.getPulsar().getTopicPoliciesService() - .registerListener(TopicName.getPartitionedTopicName(topic), this); - return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate( - brokerService.getPulsar().getTopicPoliciesService() - .getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))), + final var topicPoliciesService = brokerService.pulsar().getTopicPoliciesService(); + final var partitionedTopicName = TopicName.getPartitionedTopicName(topic); + if (topicPoliciesService.registerListener(partitionedTopicName, this)) { + return topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName, + TopicPoliciesService.GetType.DEFAULT + ).thenAcceptAsync(optionalPolicies -> optionalPolicies.ifPresent(this::onUpdate), brokerService.getTopicOrderedExecutor()); } return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 1050d9f33b465..f294866095250 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -79,6 +79,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.namespace.OwnershipCache; import org.apache.pulsar.broker.service.AbstractTopic; +import org.apache.pulsar.broker.service.TopicPolicyTestUtils; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; @@ -113,7 +114,6 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -2103,17 +2103,17 @@ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception { Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(systemTopic).create(); admin.topicPolicies().setMaxConsumers(systemTopic, 5); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + final var policies = TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(), + TopicName.get(systemTopic)); + Assert.assertTrue(policies.isPresent()); + Assert.assertEquals(policies.get().getMaxConsumerPerTopic(), 5); + }); - Integer maxConsumerPerTopic = pulsar - .getTopicPoliciesService() - .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get() - .getMaxConsumerPerTopic(); - - assertEquals(maxConsumerPerTopic, 5); admin.topics().delete(systemTopic, true); - TopicPolicies topicPolicies = pulsar.getTopicPoliciesService() - .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS); - assertNull(topicPolicies); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertTrue( + TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(), TopicName.get(systemTopic)) + .isEmpty())); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 9f56acfb57f23..1351c41e4279e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; +import org.apache.pulsar.broker.service.TopicPolicyTestUtils; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -2092,7 +2093,7 @@ public void testTopicMaxMessageSizeApi() throws Exception{ assertNull(admin.topicPolicies().getMaxMessageSize(persistenceTopic)); admin.topicPolicies().setMaxMessageSize(persistenceTopic,10); Awaitility.await().until(() - -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null); + -> TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(persistenceTopic)) != null); assertEquals(admin.topicPolicies().getMaxMessageSize(persistenceTopic).intValue(),10); admin.topicPolicies().removeMaxMessageSize(persistenceTopic); @@ -2138,7 +2139,7 @@ public void testTopicMaxMessageSize(TopicDomain topicDomain, boolean isPartition assertNull(admin.topicPolicies().getMaxMessageSize(topic)); // set msg size admin.topicPolicies().setMaxMessageSize(topic, 10); - Awaitility.await().until(() -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); + Awaitility.await().until(() -> TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic)) != null); if (isPartitioned) { for (int i = 0; i <3; i++) { String partitionName = TopicName.get(topic).getPartition(i).toString(); @@ -2255,7 +2256,7 @@ public void testMaxSubscriptionsPerTopicApi() throws Exception { // set max subscriptions admin.topicPolicies().setMaxSubscriptionsPerTopic(topic, 10); Awaitility.await().until(() - -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); + -> TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic)) != null); assertEquals(admin.topicPolicies().getMaxSubscriptionsPerTopic(topic).intValue(), 10); // remove max subscriptions admin.topicPolicies().removeMaxSubscriptionsPerTopic(topic); @@ -2278,7 +2279,7 @@ public void testMaxSubscriptionsPerTopicWithExistingSubs() throws Exception { final int topicLevelMaxSubNum = 2; admin.topicPolicies().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum); Awaitility.await().until(() - -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); + -> TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic)) != null); List> consumerList = new ArrayList<>(); String subName = "my-sub-"; for (int i = 0; i < topicLevelMaxSubNum; i++) { @@ -2410,7 +2411,7 @@ public void testMaxSubscriptionsPerTopic() throws Exception { final int topicLevelMaxSubNum = 2; admin.topicPolicies().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum); Awaitility.await().until(() - -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); + -> TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic)) != null); List> consumerList = new ArrayList<>(); for (int i = 0; i < topicLevelMaxSubNum; i++) { @@ -2613,7 +2614,7 @@ public void testSubscriptionTypesEnabled() throws Exception { admin.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypeSet); Awaitility.await().until(() - -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); + -> TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic)) != null); waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> { assertTrue(hierarchyTopicPolicies.getSubscriptionTypesEnabled().get() .contains(CommandSubscribe.SubType.Failover)); @@ -2836,7 +2837,7 @@ public void testPolicyIsDeleteTogetherManually() throws Exception { pulsarClient.newProducer().topic(topic).create().close(); Awaitility.await().untilAsserted(() -> - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))) + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))) .isNull()); int maxConsumersPerSubscription = 10; @@ -2845,7 +2846,7 @@ public void testPolicyIsDeleteTogetherManually() throws Exception { Awaitility.await().untilAsserted(() -> Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isTrue()); Awaitility.await().untilAsserted(() -> - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))) + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))) .isNotNull()); admin.topics().delete(topic); @@ -2853,7 +2854,7 @@ public void testPolicyIsDeleteTogetherManually() throws Exception { Awaitility.await().untilAsserted(() -> Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isFalse()); Awaitility.await().untilAsserted(() -> - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))) + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))) .isNull()); } @@ -2865,8 +2866,8 @@ public void testPoliciesCanBeDeletedWithTopic() throws Exception { pulsarClient.newProducer().topic(topic2).create().close(); Awaitility.await().untilAsserted(() -> { - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNull(); + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))).isNull(); + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic2))).isNull(); }); // Init Topic Policies. Send 4 messages in a row, there should be only 2 messages left after compression admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1); @@ -2874,8 +2875,8 @@ public void testPoliciesCanBeDeletedWithTopic() throws Exception { admin.topicPolicies().setMaxConsumersPerSubscription(topic, 3); admin.topicPolicies().setMaxConsumersPerSubscription(topic2, 4); Awaitility.await().untilAsserted(() -> { - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull(); + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))).isNotNull(); + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic2))).isNotNull(); }); String topicPoliciesTopic = "persistent://" + myNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; PersistentTopic persistentTopic = @@ -2908,7 +2909,7 @@ public void testPoliciesCanBeDeletedWithTopic() throws Exception { admin.topics().delete(topic, true); Awaitility.await().untilAsserted(() -> - assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))); + assertNull(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic)))); persistentTopic.triggerCompaction(); field = PersistentTopic.class.getDeclaredField("currentCompaction"); field.setAccessible(true); @@ -2940,7 +2941,7 @@ public void testPolicyIsDeleteTogetherAutomatically() throws Exception { pulsarClient.newProducer().topic(topic).create().close(); Awaitility.await().untilAsserted(() -> - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))) + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))) .isNull()); int maxConsumersPerSubscription = 10; @@ -2949,7 +2950,7 @@ public void testPolicyIsDeleteTogetherAutomatically() throws Exception { Awaitility.await().untilAsserted(() -> Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isTrue()); Awaitility.await().untilAsserted(() -> - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))) + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))) .isNotNull()); InactiveTopicPolicies inactiveTopicPolicies = @@ -2963,7 +2964,7 @@ public void testPolicyIsDeleteTogetherAutomatically() throws Exception { Awaitility.await().untilAsserted(() -> Assertions.assertThat(pulsar.getBrokerService().getTopic(topic, false).get().isPresent()).isFalse()); Awaitility.await().untilAsserted(() -> - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))) + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))) .isNull()); } @@ -3009,17 +3010,17 @@ public void testLoopCreateAndDeleteTopicPolicies() throws Exception { n++; pulsarClient.newProducer().topic(topic).create().close(); Awaitility.await().untilAsserted(() -> { - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))).isNull(); }); admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1); Awaitility.await().untilAsserted(() -> { - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))).isNotNull(); }); admin.topics().delete(topic); Awaitility.await().untilAsserted(() -> { - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))).isNull(); }); } } @@ -3030,42 +3031,43 @@ public void testGlobalTopicPolicies() throws Exception { pulsarClient.newProducer().topic(topic).create().close(); Awaitility.await().untilAsserted(() -> - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))) + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(topic))) .isNull()); admin.topicPolicies(true).setRetention(topic, new RetentionPolicies(1, 2)); SystemTopicBasedTopicPoliciesService topicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); // check global topic policies can be added correctly. - Awaitility.await().untilAsserted(() -> assertNotNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic), true))); - TopicPolicies topicPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic), true); - assertNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic))); + Awaitility.await().untilAsserted(() -> assertNotNull( + TopicPolicyTestUtils.getGlobalTopicPolicies(topicPoliciesService, TopicName.get(topic)))); + TopicPolicies topicPolicies = TopicPolicyTestUtils.getGlobalTopicPolicies(topicPoliciesService, TopicName.get(topic)); + assertNull(TopicPolicyTestUtils.getLocalTopicPolicies(topicPoliciesService, TopicName.get(topic))); assertEquals(topicPolicies.getRetentionPolicies().getRetentionTimeInMinutes(), 1); assertEquals(topicPolicies.getRetentionPolicies().getRetentionSizeInMB(), 2); // check global topic policies can be updated correctly. admin.topicPolicies(true).setRetention(topic, new RetentionPolicies(3, 4)); Awaitility.await().untilAsserted(() -> { - TopicPolicies tempPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic), true); - assertNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic))); + TopicPolicies tempPolicies = TopicPolicyTestUtils.getGlobalTopicPolicies(topicPoliciesService, TopicName.get(topic)); + assertNull(TopicPolicyTestUtils.getLocalTopicPolicies(topicPoliciesService, (TopicName.get(topic)))); assertEquals(tempPolicies.getRetentionPolicies().getRetentionTimeInMinutes(), 3); assertEquals(tempPolicies.getRetentionPolicies().getRetentionSizeInMB(), 4); }); //Local topic policies and global topic policies can exist together. admin.topicPolicies().setRetention(topic, new RetentionPolicies(10, 20)); - Awaitility.await().untilAsserted(() -> assertNotNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic)))); - TopicPolicies tempPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic), true); + Awaitility.await().untilAsserted(() -> assertNotNull(TopicPolicyTestUtils.getTopicPolicies(topicPoliciesService, (TopicName.get(topic))))); + TopicPolicies tempPolicies = TopicPolicyTestUtils.getGlobalTopicPolicies(topicPoliciesService, TopicName.get(topic)); assertEquals(tempPolicies.getRetentionPolicies().getRetentionTimeInMinutes(), 3); assertEquals(tempPolicies.getRetentionPolicies().getRetentionSizeInMB(), 4); - tempPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic)); + tempPolicies = TopicPolicyTestUtils.getTopicPolicies(topicPoliciesService, (TopicName.get(topic))); assertEquals(tempPolicies.getRetentionPolicies().getRetentionTimeInMinutes(), 10); assertEquals(tempPolicies.getRetentionPolicies().getRetentionSizeInMB(), 20); // check remove global topic policies can be removed correctly. admin.topicPolicies(true).removeRetention(topic); - Awaitility.await().untilAsserted(() -> - assertNull(topicPoliciesService.getTopicPolicies(TopicName.get(topic), true).getRetentionPolicies())); + Awaitility.await().untilAsserted(() -> assertNull(TopicPolicyTestUtils.getGlobalTopicPolicies(topicPoliciesService, + TopicName.get(topic)).getRetentionPolicies())); } @@ -3109,7 +3111,7 @@ public void testShadowTopics() throws Exception { pulsarClient.newProducer().topic(sourceTopic).create().close(); Awaitility.await().untilAsserted(() -> - Assert.assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(sourceTopic)))); + Assert.assertNull(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(sourceTopic)))); //shadow topic must exist Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, ()-> @@ -3139,16 +3141,13 @@ public void testGetTopicPoliciesWhenDeleteTopicPolicy() throws Exception { admin.topics().createNonPartitionedTopic(persistenceTopic); admin.topicPolicies().setMaxConsumers(persistenceTopic, 5); - Integer maxConsumerPerTopic = pulsar - .getTopicPoliciesService() - .getTopicPoliciesBypassCacheAsync(TopicName.get(persistenceTopic)).get() - .getMaxConsumerPerTopic(); + Integer maxConsumerPerTopic = TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(), + TopicName.get(persistenceTopic)).orElseThrow().getMaxConsumerPerTopic(); assertEquals(maxConsumerPerTopic, 5); admin.topics().delete(persistenceTopic, true); - TopicPolicies topicPolicies =pulsar.getTopicPoliciesService() - .getTopicPoliciesBypassCacheAsync(TopicName.get(persistenceTopic)).get(5, TimeUnit.SECONDS); - assertNull(topicPolicies); + assertTrue(TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(), + TopicName.get(persistenceTopic)).isEmpty()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesService.java new file mode 100644 index 0000000000000..88a75fe8f0387 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesService.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; + +public class InmemoryTopicPoliciesService implements TopicPoliciesService { + + private final Map cache = new HashMap<>(); + private final Map> listeners = new HashMap<>(); + + @Override + public synchronized CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { + cache.remove(topicName); + return CompletableFuture.completedFuture(null); + } + + @Override + public synchronized CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { + final var existingPolicies = cache.get(topicName); + if (existingPolicies != policies) { + cache.put(topicName, policies); + CompletableFuture.runAsync(() -> { + final TopicPolicies latestPolicies; + final List listeners; + synchronized (InmemoryTopicPoliciesService.this) { + latestPolicies = cache.get(topicName); + listeners = this.listeners.getOrDefault(topicName, List.of()); + } + for (var listener : listeners) { + listener.onUpdate(latestPolicies); + } + }); + } + return CompletableFuture.completedFuture(null); + } + + @Override + public synchronized CompletableFuture> getTopicPoliciesAsync( + TopicName topicName, GetType type) { + return CompletableFuture.completedFuture(Optional.ofNullable(cache.get(topicName))); + } + + @Override + public synchronized boolean registerListener(TopicName topicName, TopicPolicyListener listener) { + listeners.computeIfAbsent(topicName, __ -> new ArrayList<>()).add(listener); + return true; + } + + @Override + public synchronized void unregisterListener(TopicName topicName, TopicPolicyListener listener) { + listeners.get(topicName).remove(listener); + } + + synchronized boolean containsKey(TopicName topicName) { + return cache.containsKey(topicName); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesServiceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesServiceServiceTest.java new file mode 100644 index 0000000000000..9ec16405ba853 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesServiceServiceTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.collect.Lists; +import java.util.List; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class InmemoryTopicPoliciesServiceServiceTest extends MockedPulsarServiceBaseTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setTopicPoliciesServiceClassName(InmemoryTopicPoliciesService.class.getName()); + conf.setSystemTopicEnabled(false); // verify topic policies don't rely on system topics + super.internalSetup(); + super.setupDefaultTenantAndNamespace(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + // Shadow replicator is created by the topic policies update, this test verifies the listener can be triggered + @Test + public void testShadowReplicator() throws Exception { + final var sourceTopic = TopicName.get("test-shadow-replicator").toString(); + final var shadowTopic = sourceTopic + "-shadow"; + + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic) + .subscriptionName("sub").subscribe(); + producer.send("msg"); + final var msg = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertEquals(msg.getValue(), "msg"); + + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(sourceTopic).get() + .orElseThrow(); + Assert.assertEquals(TopicPolicyTestUtils.getTopicPolicies(persistentTopic).getShadowTopics(), List.of(shadowTopic)); + Assert.assertEquals(persistentTopic.getShadowReplicators().size(), 1); + } + + @Test + public void testTopicPoliciesAdmin() throws Exception { + final var topic = "test-topic-policies-admin"; + admin.topics().createNonPartitionedTopic(topic); + + Assert.assertNull(admin.topicPolicies().getCompactionThreshold(topic)); + admin.topicPolicies().setCompactionThreshold(topic, 1000); + Assert.assertEquals(admin.topicPolicies().getCompactionThreshold(topic).intValue(), 1000); + // Sleep here because "Directory not empty error" might occur if deleting the topic immediately + Thread.sleep(1000); + final var topicPoliciesService = (InmemoryTopicPoliciesService) pulsar.getTopicPoliciesService(); + Assert.assertTrue(topicPoliciesService.containsKey(TopicName.get(topic))); + admin.topics().delete(topic); + Assert.assertFalse(topicPoliciesService.containsKey(TopicName.get(topic))); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 440e90da2b694..d684b4af7c251 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -1110,7 +1110,8 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro Awaitility.await().untilAsserted(() -> { assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(), 2); // Trigger system topic __change_event's initialize. - pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://" + ns + "/1")); + pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://" + ns + "/1"), + TopicPoliciesService.GetType.DEFAULT); }); // Create non-partitioned topic. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index f3076ebdec6c9..200c8dd3b3d9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -409,7 +409,7 @@ protected void setTopicLevelClusters(String topic, List clusters, Pulsar int partitions = ensurePartitionsAreSame(topic); admin.topics().setReplicationClusters(topic, clusters); Awaitility.await().untilAsserted(() -> { - TopicPolicies policies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName); + TopicPolicies policies = TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), topicName); assertEquals(new HashSet<>(policies.getReplicationClusters()), expected); if (partitions == 0) { checkNonPartitionedTopicLevelClusters(topicName.toString(), clusters, admin, pulsar.getBrokerService()); @@ -434,7 +434,7 @@ protected void checkNonPartitionedTopicLevelClusters(String topic, List } PersistentTopic persistentTopic = (PersistentTopic) optional.get(); Set expected = new HashSet<>(clusters); - Set act = new HashSet<>(persistentTopic.getTopicPolicies().get().getReplicationClusters()); + Set act = new HashSet<>(TopicPolicyTestUtils.getTopicPolicies(persistentTopic).getReplicationClusters()); assertEquals(act, expected); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java index ab8d4dbe5cc01..a563077e012da 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicInitializeDelayTest.java @@ -108,7 +108,9 @@ public MyPersistentTopic(String topic, ManagedLedger ledger, BrokerService broke SystemTopicBasedTopicPoliciesService topicPoliciesService = (SystemTopicBasedTopicPoliciesService) brokerService.getPulsar().getTopicPoliciesService(); if (topicPoliciesService.getListeners().containsKey(TopicName.get(topic)) ) { - this.onUpdate(brokerService.getPulsar().getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topic))); + brokerService.getPulsar().getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topic), + TopicPoliciesService.GetType.DEFAULT + ).thenAccept(optionalPolicies -> optionalPolicies.ifPresent(this::onUpdate)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index f9171e883613b..b975041d04ee4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -2279,7 +2279,8 @@ public void testGetReplicationClusters() throws MetadataStoreException { topicPolicies.setReplicationClusters(topicClusters); Optional optionalTopicPolicies = Optional.of(topicPolicies); topicPoliciesFuture.complete(optionalTopicPolicies); - when(topicPoliciesService.getTopicPoliciesIfExists(any())).thenReturn(topicPolicies); + when(topicPoliciesService.getTopicPoliciesAsync(any(), any())) + .thenReturn(CompletableFuture.completedFuture(Optional.of(topicPolicies))); topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); topic.initialize().join(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index c0281f073cfd4..f89ca2bdebb91 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -30,11 +30,9 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; @@ -792,8 +790,7 @@ public void testReplicateAutoSubscriptionCreation() throws Exception { assertNull(admin3.topicPolicies(true).getAutoSubscriptionCreation(topic, false))); } - private void init(String namespace, String topic) - throws PulsarAdminException, PulsarClientException, PulsarServerException { + private void init(String namespace, String topic) throws Exception { final String cluster2 = pulsar2.getConfig().getClusterName(); final String cluster1 = pulsar1.getConfig().getClusterName(); final String cluster3 = pulsar3.getConfig().getClusterName(); @@ -817,11 +814,9 @@ private void init(String namespace, String topic) pulsar3.getClient().newProducer().topic(topic).create().close(); //init topic policies server - Awaitility.await().ignoreExceptions().untilAsserted(() -> { - assertNull(pulsar1.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))); - assertNull(pulsar2.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))); - assertNull(pulsar3.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))); - }); + TopicPolicyTestUtils.getTopicPolicies(pulsar1.getTopicPoliciesService(), TopicName.get(topic)); + TopicPolicyTestUtils.getTopicPolicies(pulsar2.getTopicPoliciesService(), TopicName.get(topic)); + TopicPolicyTestUtils.getTopicPolicies(pulsar3.getTopicPoliciesService(), TopicName.get(topic)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 9caee00cb6134..7e3f4e14daa6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -22,12 +22,9 @@ import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; -import static org.testng.AssertJUnit.assertTrue; -import java.lang.reflect.Field; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -35,18 +32,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.util.Backoff; -import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -98,7 +90,7 @@ protected void cleanup() throws Exception { @Test public void testConcurrentlyRegisterUnregisterListeners() throws ExecutionException, InterruptedException { TopicName topicName = TopicName.get("test"); - class TopicPolicyListenerImpl implements TopicPolicyListener { + class TopicPolicyListenerImpl implements TopicPolicyListener { @Override public void onUpdate(TopicPolicies data) { @@ -108,7 +100,7 @@ public void onUpdate(TopicPolicies data) { CompletableFuture f = CompletableFuture.completedFuture(null).thenRunAsync(() -> { for (int i = 0; i < 100; i++) { - TopicPolicyListener listener = new TopicPolicyListenerImpl(); + TopicPolicyListener listener = new TopicPolicyListenerImpl(); systemTopicBasedTopicPoliciesService.registerListener(topicName, listener); Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName)); Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1); @@ -117,7 +109,7 @@ public void onUpdate(TopicPolicies data) { }); for (int i = 0; i < 100; i++) { - TopicPolicyListener listener = new TopicPolicyListenerImpl(); + TopicPolicyListener listener = new TopicPolicyListenerImpl(); systemTopicBasedTopicPoliciesService.registerListener(topicName, listener); Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName)); Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1); @@ -130,7 +122,7 @@ public void onUpdate(TopicPolicies data) { } @Test - public void testGetPolicy() throws ExecutionException, InterruptedException, TopicPoliciesCacheNotInitException { + public void testGetPolicy() throws Exception { // Init topic policies TopicPolicies initPolicy = TopicPolicies.builder() @@ -145,7 +137,7 @@ public void testGetPolicy() throws ExecutionException, InterruptedException, Top // Assert broker is cache all topic policies Awaitility.await().untilAsserted(() -> - Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1) + Assert.assertEquals(TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC1) .getMaxConsumerPerTopic().intValue(), 10)); // Update policy for TOPIC1 @@ -185,12 +177,12 @@ public void testGetPolicy() throws ExecutionException, InterruptedException, Top systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get(); Awaitility.await().untilAsserted(() -> { - TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); - TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); - TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3); - TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4); - TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5); - TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6); + TopicPolicies policiesGet1 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC1); + TopicPolicies policiesGet2 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC2); + TopicPolicies policiesGet3 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC3); + TopicPolicies policiesGet4 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC4); + TopicPolicies policiesGet5 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC5); + TopicPolicies policiesGet6 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC6); Assert.assertEquals(policiesGet1, policies1); Assert.assertEquals(policiesGet2, policies2); @@ -223,8 +215,8 @@ public void testGetPolicy() throws ExecutionException, InterruptedException, Top // reader for NAMESPACE1 will back fill the reader cache Awaitility.await().untilAsserted(() -> { - TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); - TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); + TopicPolicies policiesGet1 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC1); + TopicPolicies policiesGet2 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC2); Assert.assertEquals(policies1, policiesGet1); Assert.assertEquals(policies2, policiesGet2); }); @@ -235,7 +227,8 @@ public void testGetPolicy() throws ExecutionException, InterruptedException, Top Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); // Check get without cache - TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get(); + TopicPolicies policiesGet1 = TopicPolicyTestUtils.getTopicPoliciesBypassCache(systemTopicBasedTopicPoliciesService, + TOPIC1).orElseThrow(); Assert.assertEquals(policies1, policiesGet1); } @@ -249,7 +242,7 @@ public void testCacheCleanup() throws Exception { Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxConsumers(topic))); Map map = systemTopicBasedTopicPoliciesService.getPoliciesCache(); - Map>> listMap = + Map> listMap = systemTopicBasedTopicPoliciesService.getListeners(); assertNotNull(map.get(topicName)); assertEquals(map.get(topicName).getMaxConsumerPerTopic().intValue(), 1000); @@ -268,7 +261,7 @@ public void testListenerCleanupByPartition() throws Exception { admin.topics().createPartitionedTopic(topic, 3); pulsarClient.newProducer().topic(topic).create().close(); - Map>> listMap = + Map> listMap = systemTopicBasedTopicPoliciesService.getListeners(); Awaitility.await().untilAsserted(() -> { // all 3 topic partition have registered the topic policy listeners. @@ -301,64 +294,6 @@ private void prepareData() throws PulsarAdminException { systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); } - @Test - public void testGetPolicyTimeout() throws Exception { - SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); - Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone())); - service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new CompletableFuture<>()); - long start = System.currentTimeMillis(); - Backoff backoff = new BackoffBuilder() - .setInitialTime(500, TimeUnit.MILLISECONDS) - .setMandatoryStop(5000, TimeUnit.MILLISECONDS) - .setMax(1000, TimeUnit.MILLISECONDS) - .create(); - try { - service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor(), false).get(); - } catch (Exception e) { - assertTrue(e.getCause() instanceof TopicPoliciesCacheNotInitException); - } - long cost = System.currentTimeMillis() - start; - assertTrue("actual:" + cost, cost >= 5000 - 1000); - } - - @Test - public void testGetTopicPoliciesWithRetry() throws Exception { - Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap"); - initMapField.setAccessible(true); - Map initMap = (Map)initMapField.get(systemTopicBasedTopicPoliciesService); - initMap.remove(NamespaceName.get(NAMESPACE1)); - Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches"); - readerCaches.setAccessible(true); - Map>> readers = (Map)readerCaches.get(systemTopicBasedTopicPoliciesService); - readers.remove(NamespaceName.get(NAMESPACE1)); - Backoff backoff = new BackoffBuilder() - .setInitialTime(500, TimeUnit.MILLISECONDS) - .setMandatoryStop(5000, TimeUnit.MILLISECONDS) - .setMax(1000, TimeUnit.MILLISECONDS) - .create(); - TopicPolicies initPolicy = TopicPolicies.builder() - .maxConsumerPerTopic(10) - .build(); - @Cleanup("shutdownNow") - ScheduledExecutorService executors = Executors.newScheduledThreadPool(1); - executors.schedule(new Runnable() { - @Override - public void run() { - try { - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); - } catch (Exception ignore) {} - } - }, 2000, TimeUnit.MILLISECONDS); - Awaitility.await().untilAsserted(() -> { - Optional topicPolicies = systemTopicBasedTopicPoliciesService - .getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor(), false).get(); - Assert.assertTrue(topicPolicies.isPresent()); - if (topicPolicies.isPresent()) { - Assert.assertEquals(topicPolicies.get(), initPolicy); - } - }); - } - @Test public void testHandleNamespaceBeingDeleted() throws Exception { SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); @@ -381,13 +316,13 @@ public void testGetTopicPoliciesWithCleanCache() throws Exception { ConcurrentHashMap spyPoliciesCache = spy(new ConcurrentHashMap()); FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache", spyPoliciesCache, true); - Awaitility.await().untilAsserted(() -> { - Assertions.assertThat(topicPoliciesService.getTopicPolicies(TopicName.get(topic))).isNull(); - }); + Awaitility.await().untilAsserted(() -> Assertions.assertThat( + TopicPolicyTestUtils.getTopicPolicies(topicPoliciesService, TopicName.get(topic))).isNull()); admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1); Awaitility.await().untilAsserted(() -> { - Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); + Assertions.assertThat(TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), + TopicName.get(topic))).isNotNull(); }); Map>> readers = @@ -401,21 +336,18 @@ public void testGetTopicPoliciesWithCleanCache() throws Exception { CompletableFuture result = new CompletableFuture<>(); Thread thread = new Thread(() -> { - TopicPolicies topicPolicies; - for (int i = 0; i < 10; i++) { - try { - topicPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic)); - Assert.assertNotNull(topicPolicies); - Thread.sleep(500); - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { - log.warn("topic policies cache not init, retry..."); - } catch (Throwable e) { - log.error("ops: ", e); - result.completeExceptionally(e); - return; + try { + for (int i = 0; i < 10; i++) { + final var policies = TopicPolicyTestUtils.getTopicPolicies(topicPoliciesService, + TopicName.get(topic)); + if (policies == null) { + throw new Exception("null policies for " + i + "th get"); + } } + result.complete(null); + } catch (Exception e) { + result.completeExceptionally(e); } - result.complete(null); }); Thread thread2 = new Thread(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java new file mode 100644 index 0000000000000..9cf688d62edc6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import lombok.Cleanup; +import org.apache.pulsar.common.events.PulsarEvent; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; + +public class TopicPolicyTestUtils { + + public static TopicPolicies getTopicPolicies(AbstractTopic topic) { + final TopicPolicies topicPolicies; + try { + topicPolicies = getTopicPolicies(topic.brokerService.getPulsar().getTopicPoliciesService(), + TopicName.get(topic.topic)); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + if (topicPolicies == null) { + throw new RuntimeException("No topic policies for " + topic); + } + return topicPolicies; + } + + public static TopicPolicies getTopicPolicies(TopicPoliciesService topicPoliciesService, TopicName topicName) + throws ExecutionException, InterruptedException { + return topicPoliciesService.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.DEFAULT).get() + .orElse(null); + } + + public static TopicPolicies getLocalTopicPolicies(TopicPoliciesService topicPoliciesService, TopicName topicName) + throws ExecutionException, InterruptedException { + return topicPoliciesService.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY).get() + .orElse(null); + } + + public static TopicPolicies getGlobalTopicPolicies(TopicPoliciesService topicPoliciesService, TopicName topicName) + throws ExecutionException, InterruptedException { + return topicPoliciesService.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.GLOBAL_ONLY).get() + .orElse(null); + } + + public static Optional getTopicPoliciesBypassCache(TopicPoliciesService topicPoliciesService, + TopicName topicName) throws Exception { + @Cleanup final var reader = ((SystemTopicBasedTopicPoliciesService) topicPoliciesService) + .getNamespaceEventsSystemTopicFactory() + .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()) + .newReader(); + PulsarEvent event = null; + while (reader.hasMoreEvents()) { + event = reader.readNext().getValue(); + } + return Optional.ofNullable(event).map(e -> e.getTopicPoliciesEvent().getPolicies()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 070f7193874c3..903443d37bb07 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -715,7 +715,8 @@ public void testCheckPersistencePolicies() throws Exception { doReturn(policiesService).when(pulsar).getTopicPoliciesService(); TopicPolicies policies = new TopicPolicies(); policies.setRetentionPolicies(retentionPolicies); - doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService).getTopicPoliciesAsync(TopicName.get(topic)); + doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService) + .getTopicPoliciesAsync(TopicName.get(topic), TopicPoliciesService.GetType.DEFAULT); persistentTopic.onUpdate(policies); verify(persistentTopic, times(1)).checkPersistencePolicies(); Awaitility.await().untilAsserted(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index a2401ebe19a06..e7bfa3278e36d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -37,6 +37,7 @@ import org.apache.pulsar.broker.admin.impl.BrokersBase; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.TopicPolicyTestUtils; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.SchemaRegistry; @@ -359,7 +360,8 @@ public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws Exceptio PersistentTopic persistentTopic = (PersistentTopic) topic.join().get(); persistentTopic.close(); admin.topics().delete(topicName); - TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName)); + TopicPolicies topicPolicies = TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), + TopicName.get(topicName)); assertNull(topicPolicies); String base = TopicName.get(topicName).getPartitionedTopicName(); String id = TopicName.get(base).getSchemaName(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index d3e0391443f0f..cc09fa212198d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -92,6 +92,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService; import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; +import org.apache.pulsar.broker.service.TopicPolicyTestUtils; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -142,7 +143,6 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -550,11 +550,7 @@ public void testSubscriptionRecreateTopic() .getSubscription(subName); subscription.getPendingAckManageLedger().thenAccept(managedLedger -> { long retentionSize = managedLedger.getConfig().getRetentionSizeInMB(); - if (!originPersistentTopic.getTopicPolicies().isPresent()) { - log.error("Failed to getTopicPolicies of :" + originPersistentTopic); - Assert.fail(); - } - TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get(); + TopicPolicyTestUtils.getTopicPolicies(originPersistentTopic); // verify the topic policies exist Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize); MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider(); CompletableFuture future = mlPendingAckStoreProvider.newPendingAckStore(subscription); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index f60aeb78387ad..9396a80cf2557 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -46,7 +46,6 @@ import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -110,7 +109,7 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception { // Assert only one PersistentTopic was not closed. TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService(); - Map>> listeners = + Map> listeners = WhiteboxImpl.getInternalState(topicPoliciesService, "listeners"); assertEquals(listeners.get(TopicName.get(tpName)).size(), 1); @@ -217,7 +216,7 @@ public void testNoOrphanTopicIfInitFailed() throws Exception { // Assert only one PersistentTopic was not closed. TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService(); - Map>> listeners = + Map> listeners = WhiteboxImpl.getInternalState(topicPoliciesService, "listeners"); assertEquals(listeners.get(TopicName.get(tpName)).size(), 1);