Skip to content

Commit

Permalink
[fix][broker] Key_Shared subscription: Reject consumers with incompat…
Browse files Browse the repository at this point in the history
…ible policy (#23449)

(cherry picked from commit 1344167)
  • Loading branch information
pdolif authored and lhotari committed Oct 22, 2024
1 parent 197fb92 commit 3478404
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
package org.apache.pulsar.broker.service;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.ToLongFunction;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.util.FutureUtil;

public abstract class AbstractSubscription implements Subscription {
protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
Expand All @@ -39,4 +43,46 @@ private long sumConsumers(ToLongFunction<Consumer> toCounter) {
.map(dispatcher -> dispatcher.getConsumers().stream().mapToLong(toCounter).sum())
.orElse(0L);
}

/**
* Checks if the given consumer is compatible with the given dispatcher. They are incompatible if
* <ul>
* <li>the subscription type of the consumer differs from the subscription type of the dispatcher or</li>
* <li>if both the consumer and dispatcher are of {@link CommandSubscribe.SubType#Key_Shared} type but
* their policies differ ({@link org.apache.pulsar.common.api.proto.KeySharedMode#AUTO_SPLIT}/
* {@link org.apache.pulsar.common.api.proto.KeySharedMode#STICKY} or allowOutOfOrderDelivery true/false).</li>
* </ul>
* @param dispatcher The dispatcher of the subscription
* @param consumer New consumer to be added to the subscription
* @return Optional containing failed future with {@link BrokerServiceException.SubscriptionBusyException} if
* consumer and dispatcher are incompatible or empty optional otherwise.
*/
protected Optional<CompletableFuture<Void>> checkForConsumerCompatibilityErrorWithDispatcher(Dispatcher dispatcher,
Consumer consumer) {
if (consumer.subType() != dispatcher.getType()) {
return Optional.of(FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException(
String.format("Subscription is of different type. Active subscription type of '%s' is different "
+ "than the connecting consumer's type '%s'.",
dispatcher.getType(), consumer.subType()))));
} else if (dispatcher.getType() == CommandSubscribe.SubType.Key_Shared) {
KeySharedMeta dispatcherKsm = dispatcher.getConsumers().get(0).getKeySharedMeta();
KeySharedMeta consumerKsm = consumer.getKeySharedMeta();
if (dispatcherKsm.getKeySharedMode() != consumerKsm.getKeySharedMode()) {
return Optional.of(FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException(
String.format("Subscription is of different type. Active subscription key_shared mode of '%s' "
+ "is different than the connecting consumer's key_shared mode '%s'.",
dispatcherKsm.getKeySharedMode(), consumerKsm.getKeySharedMode()))));
}
if (dispatcherKsm.isAllowOutOfOrderDelivery() != consumerKsm.isAllowOutOfOrderDelivery()) {
return Optional.of(FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException(
String.format("Subscription is of different type. %s",
dispatcherKsm.isAllowOutOfOrderDelivery()
? "Active subscription allows out of order delivery while the connecting "
+ "consumer does not allow it." :
"Active subscription does not allow out of order delivery while the connecting "
+ "consumer allows it."))));
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
Expand Down Expand Up @@ -159,8 +158,10 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
});
}
} else {
if (consumer.subType() != dispatcher.getType()) {
return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription is of different type"));
Optional<CompletableFuture<Void>> compatibilityError =
checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer);
if (compatibilityError.isPresent()) {
return compatibilityError.get();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,10 @@ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
});
}
} else {
if (consumer.subType() != dispatcher.getType()) {
return FutureUtil.failedFuture(
new SubscriptionBusyException("Subscription is of different type"));
Optional<CompletableFuture<Void>> compatibilityError =
checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer);
if (compatibilityError.isPresent()) {
return compatibilityError.get();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class SubscriptionConsumerCompatibilityTest {

private PulsarTestContext pulsarTestContext;
private ManagedLedger ledgerMock;
private ManagedCursorImpl cursorMock;
private final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
private final String subName = "subscriptionName";

@BeforeMethod
public void setup() throws Exception {
pulsarTestContext = PulsarTestContext.builderForNonStartableContext().build();

ledgerMock = mock(ManagedLedgerImpl.class);
ManagedLedgerConfig managedLedgerConfigMock = mock(ManagedLedgerConfig.class);
doReturn(managedLedgerConfigMock).when(ledgerMock).getConfig();

cursorMock = mock(ManagedCursorImpl.class);
doReturn("mockCursor").when(cursorMock).getName();
}

@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
if (pulsarTestContext != null) {
pulsarTestContext.close();
pulsarTestContext = null;
}
}

@DataProvider(name = "incompatibleKeySharedPolicies")
public Object[][] incompatibleKeySharedPolicies() {
KeySharedMeta ksmSticky = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
ksmSticky.addHashRange().setStart(0).setEnd(2);

KeySharedMeta ksmStickyAllowOutOfOrder = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
.setAllowOutOfOrderDelivery(true);
ksmStickyAllowOutOfOrder.addHashRange().setStart(3).setEnd(5);

KeySharedMeta ksmAutoSplit = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT);
KeySharedMeta ksmAutoSplitAllowOutOfOrder = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)
.setAllowOutOfOrderDelivery(true);

String errorMessagePrefix = "Subscription is of different type. ";
String errorMessageSubscriptionModeSticky = errorMessagePrefix + "Active subscription key_shared mode of "
+ "'STICKY' is different than the connecting consumer's key_shared mode 'AUTO_SPLIT'.";
String errorMessageSubscriptionModeAutoSplit = errorMessagePrefix + "Active subscription key_shared mode of "
+ "'AUTO_SPLIT' is different than the connecting consumer's key_shared mode 'STICKY'.";
String errorMessageOutOfOrderNotAllowed = errorMessagePrefix + "Active subscription does not allow out of "
+ "order delivery while the connecting consumer allows it.";
String errorMessageOutOfOrderAllowed = errorMessagePrefix + "Active subscription allows out of order delivery "
+ "while the connecting consumer does not allow it.";

return new Object[][] {
{ ksmAutoSplit, ksmSticky, errorMessageSubscriptionModeAutoSplit },
{ ksmAutoSplit, ksmStickyAllowOutOfOrder, errorMessageSubscriptionModeAutoSplit },
{ ksmAutoSplit, ksmAutoSplitAllowOutOfOrder, errorMessageOutOfOrderNotAllowed },

{ ksmAutoSplitAllowOutOfOrder, ksmSticky, errorMessageSubscriptionModeAutoSplit },
{ ksmAutoSplitAllowOutOfOrder, ksmStickyAllowOutOfOrder, errorMessageSubscriptionModeAutoSplit },
{ ksmAutoSplitAllowOutOfOrder, ksmAutoSplit, errorMessageOutOfOrderAllowed },

{ ksmSticky, ksmStickyAllowOutOfOrder, errorMessageOutOfOrderNotAllowed },
{ ksmSticky, ksmAutoSplit, errorMessageSubscriptionModeSticky },
{ ksmSticky, ksmAutoSplitAllowOutOfOrder, errorMessageSubscriptionModeSticky },

{ ksmStickyAllowOutOfOrder, ksmSticky, errorMessageOutOfOrderAllowed },
{ ksmStickyAllowOutOfOrder, ksmAutoSplit, errorMessageSubscriptionModeSticky },
{ ksmStickyAllowOutOfOrder, ksmAutoSplitAllowOutOfOrder, errorMessageSubscriptionModeSticky }
};
}

@Test(dataProvider = "incompatibleKeySharedPolicies")
public void testIncompatibleKeySharedPoliciesNotAllowedInNonPersistentSub(KeySharedMeta consumer1Ksm, KeySharedMeta consumer2Ksm,
String expectedErrorMessage) throws Exception {
NonPersistentTopic topic = new NonPersistentTopic(successTopicName, pulsarTestContext.getBrokerService());
NonPersistentSubscription sub = new NonPersistentSubscription(topic, subName, Map.of());

// two consumers with incompatible key_shared policies
Consumer keySharedConsumerMock1 = createKeySharedMockConsumer("consumer-1", consumer1Ksm);
Consumer keySharedConsumerMock2 = createKeySharedMockConsumer("consumer-2", consumer2Ksm);

// first consumer defines key_shared mode of subscription and whether out of order delivery is allowed
sub.addConsumer(keySharedConsumerMock1).get(5, TimeUnit.SECONDS);

try {
// add second consumer with incompatible key_shared policy
sub.addConsumer(keySharedConsumerMock2).get(5, TimeUnit.SECONDS);
fail(BrokerServiceException.SubscriptionBusyException.class.getSimpleName() + " not thrown");
} catch (Exception e) {
// subscription throws exception when consumer with incompatible key_shared policy is added
Throwable cause = e.getCause();
assertTrue(cause instanceof BrokerServiceException.SubscriptionBusyException);
assertEquals(cause.getMessage(), expectedErrorMessage);
}
}

@Test(dataProvider = "incompatibleKeySharedPolicies")
public void testIncompatibleKeySharedPoliciesNotAllowedInPersistentSub(KeySharedMeta consumer1Ksm, KeySharedMeta consumer2Ksm,
String expectedErrorMessage) throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, pulsarTestContext.getBrokerService());
PersistentSubscription sub = new PersistentSubscription(topic, subName, cursorMock, false);

// two consumers with incompatible key_shared policies
Consumer keySharedConsumerMock1 = createKeySharedMockConsumer("consumer-1", consumer1Ksm);
Consumer keySharedConsumerMock2 = createKeySharedMockConsumer("consumer-2", consumer2Ksm);

// first consumer defines key_shared mode of subscription and whether out of order delivery is allowed
sub.addConsumer(keySharedConsumerMock1).get(5, TimeUnit.SECONDS);

try {
// add second consumer with incompatible key_shared policy
sub.addConsumer(keySharedConsumerMock2).get(5, TimeUnit.SECONDS);
fail(BrokerServiceException.SubscriptionBusyException.class.getSimpleName() + " not thrown");
} catch (Exception e) {
// subscription throws exception when consumer with incompatible key_shared policy is added
Throwable cause = e.getCause();
assertTrue(cause instanceof BrokerServiceException.SubscriptionBusyException);
assertEquals(cause.getMessage(), expectedErrorMessage);
}
}

protected Consumer createKeySharedMockConsumer(String name, KeySharedMeta ksm) {
Consumer consumer = BrokerTestUtil.createMockConsumer(name);
doReturn(CommandSubscribe.SubType.Key_Shared).when(consumer).subType();
doReturn(ksm).when(consumer).getKeySharedMeta();
doReturn(mock(PendingAcksMap.class)).when(consumer).getPendingAcks();
return consumer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -35,7 +34,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand All @@ -60,8 +58,6 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -83,11 +79,6 @@ public class PersistentSubscriptionTest {
final TxnID txnID1 = new TxnID(1,1);
final TxnID txnID2 = new TxnID(1,2);

private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);

private OrderedExecutor executor;
private EventLoopGroup eventLoopGroup;

@BeforeMethod
public void setup() throws Exception {
pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
Expand Down

0 comments on commit 3478404

Please sign in to comment.