Skip to content

Commit

Permalink
[improve][broker] PIP-376: Make topic policies service pluggable (#23319
Browse files Browse the repository at this point in the history
)
  • Loading branch information
BewareMyPower committed Sep 19, 2024
1 parent 03330b3 commit 4b3b273
Show file tree
Hide file tree
Showing 32 changed files with 571 additions and 679 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ "please enable the system topic first.")
private boolean topicLevelPoliciesEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The class name of the topic policies service. The default config only takes affect when the "
+ "systemTopicEnable config is true"
)
private String topicPoliciesServiceClassName =
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService";

@FieldContext(
category = CATEGORY_SERVER,
doc = "List of interceptors for entry metadata.")
Expand Down Expand Up @@ -3793,10 +3801,6 @@ public int getTopicOrderedExecutorThreadNum() {
? numWorkerThreadsForNonPersistentTopic : topicOrderedExecutorThreadNum;
}

public boolean isSystemTopicAndTopicLevelPoliciesEnabled() {
return topicLevelPoliciesEnabled && systemTopicEnabled;
}

public Map<String, String> lookupProperties() {
final var map = new HashMap<String, String>();
properties.forEach((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,11 +989,8 @@ public void start() throws PulsarServerException {
this.nsService.initialize();

// Start topic level policies service
if (config.isSystemTopicAndTopicLevelPoliciesEnabled()) {
this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);
}

this.topicPoliciesService.start();
this.topicPoliciesService = initTopicPoliciesService();
this.topicPoliciesService.start(this);

// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();
Expand Down Expand Up @@ -2137,4 +2134,22 @@ public void initConfigMetadataSynchronizerIfNeeded() {
mutex.unlock();
}
}

private TopicPoliciesService initTopicPoliciesService() throws Exception {
if (!config.isTopicLevelPoliciesEnabled()) {
return TopicPoliciesService.DISABLED;
}
final var className = Optional.ofNullable(config.getTopicPoliciesServiceClassName())
.orElse(SystemTopicBasedTopicPoliciesService.class.getName());
if (className.equals(SystemTopicBasedTopicPoliciesService.class.getName())) {
if (config.isSystemTopicEnabled()) {
return new SystemTopicBasedTopicPoliciesService(this);
} else {
LOG.warn("System topic is disabled while the topic policies service is {}, disable it", className);
return TopicPoliciesService.DISABLED;
}
}
return (TopicPoliciesService) Reflections.createInstance(className,
Thread.currentThread().getContextClassLoader());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -365,14 +366,8 @@ protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRe

protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName,
boolean isGlobal) {
try {
checkTopicLevelPolicyEnable();
return pulsar().getTopicPoliciesService()
.getTopicPoliciesAsyncWithRetry(topicName, null, pulsar().getExecutor(), isGlobal);
} catch (Exception e) {
log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e);
return FutureUtil.failedFuture(e);
}
final var type = isGlobal ? TopicPoliciesService.GetType.GLOBAL_ONLY : TopicPoliciesService.GetType.LOCAL_ONLY;
return pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName, type);
}

protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
Expand All @@ -396,13 +391,6 @@ protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retent
return true;
}

protected void checkTopicLevelPolicyEnable() {
if (!config().isSystemTopicAndTopicLevelPoliciesEnabled()) {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Topic level policies is disabled, to enable the topic level policy and retry.");
}
}

protected DispatchRateImpl dispatchRate() {
return DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config().getDispatchThrottlingRatePerTopicInMsg())
Expand Down Expand Up @@ -784,11 +772,8 @@ protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityS
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsyncWithoutAuth() {
CompletableFuture<SchemaCompatibilityStrategy> future = CompletableFuture.completedFuture(null);
if (config().isSystemTopicAndTopicLevelPoliciesEnabled()) {
future = getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
}
CompletableFuture<SchemaCompatibilityStrategy> future = getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));

return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -447,20 +448,9 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
return CompletableFuture.completedFuture(null);
}
// update remote cluster
return namespaceResources().getPoliciesAsync(namespaceName)
.thenCompose(policies -> {
if (!policies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
// Combine namespace level policies and topic level policies.
Set<String> replicationClusters = policies.get().replication_clusters;
TopicPolicies topicPolicies =
pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName);
if (topicPolicies != null && topicPolicies.getReplicationClusters() != null) {
replicationClusters = topicPolicies.getReplicationClustersSet();
}
// Do check replicated clusters.
if (replicationClusters.size() == 0) {
return getReplicationClusters()
.thenCompose(replicationClusters -> {
if (replicationClusters == null || replicationClusters.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
boolean containsCurrentCluster =
Expand Down Expand Up @@ -495,6 +485,20 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
});
}

private CompletableFuture<Set<String>> getReplicationClusters() {
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(optionalPolicies -> {
if (optionalPolicies.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
// Query the topic-level policies only if the namespace-level policies exist
final var namespacePolicies = optionalPolicies.get();
return pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.DEFAULT
).thenApply(optionalTopicPolicies -> optionalTopicPolicies.map(TopicPolicies::getReplicationClustersSet)
.orElse(namespacePolicies.replication_clusters));
});
}

protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
if (metadata != null && metadata.partitions > 0) {
Expand Down Expand Up @@ -3655,7 +3659,7 @@ protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRate
}

protected CompletableFuture<Void> preValidation(boolean authoritative) {
if (!config().isSystemTopicAndTopicLevelPoliciesEnabled()) {
if (!config().isTopicLevelPoliciesEnabled()) {
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Topic level policies is disabled, to enable the topic level policy and retry."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public interface LoadManager {

void start() throws PulsarServerException;

default boolean started() {
return true;
}

/**
* Is centralized decision making to assign a new bundle.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private SplitManager splitManager;

private volatile boolean started = false;
volatile boolean started = false;

private boolean configuredSystemTopics = false;

Expand Down Expand Up @@ -320,7 +320,7 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer
private static boolean configureSystemTopics(PulsarService pulsar) {
try {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& pulsar.getConfiguration().isSystemTopicAndTopicLevelPoliciesEnabled()) {
&& pulsar.getConfiguration().isTopicLevelPoliciesEnabled()) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public void start() throws PulsarServerException {
loadManager.start();
}

public boolean started() {
return loadManager.started && loadManager.getServiceUnitStateChannel().started();
}

@Override
public void initialize(PulsarService pulsar) {
loadManager.initialize(pulsar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public interface ServiceUnitStateChannel extends Closeable {
*/
void start() throws PulsarServerException;

/**
* Whether the channel started.
*/
boolean started();

/**
* Closes the ServiceUnitStateChannel.
* @throws PulsarServerException if it fails to close the channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ public void cleanOwnerships() {
doCleanup(brokerId);
}

@Override
public synchronized boolean started() {
return validateChannelState(LeaderElectionServiceStarted, false);
}

public synchronized void start() throws PulsarServerException {
if (!validateChannelState(LeaderElectionServiceStarted, false)) {
throw new IllegalStateException("Invalid channel state:" + channelState.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicPolicies> {
public abstract class AbstractTopic implements Topic, TopicPolicyListener {

protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

Expand Down Expand Up @@ -509,17 +509,13 @@ protected boolean isProducersExceeded(boolean isRemote) {
}

protected void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
}
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
}

protected void unregisterTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.unregisterListener(TopicName.getPartitionedTopicName(topic), this);
}
brokerService.getPulsar().getTopicPoliciesService()
.unregisterListener(TopicName.getPartitionedTopicName(topic), this);
}

protected boolean isSameAddressProducersExceeded(Producer producer) {
Expand Down Expand Up @@ -1253,16 +1249,8 @@ public InactiveTopicPolicies getInactiveTopicPolicies() {
return topicPolicies.getInactiveTopicPolicies().get();
}

/**
* Get {@link TopicPolicies} for this topic.
* @return TopicPolicies, if they exist. Otherwise, the value will not be present.
*/
public Optional<TopicPolicies> getTopicPolicies() {
return brokerService.getTopicPolicies(TopicName.get(topic));
}

public CompletableFuture<Void> deleteTopicPolicies() {
return brokerService.deleteTopicPolicies(TopicName.get(topic));
return brokerService.pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(TopicName.get(topic));
}

protected int getWaitingProducersCount() {
Expand Down
Loading

0 comments on commit 4b3b273

Please sign in to comment.