Skip to content

Commit

Permalink
[AMQ-9554] Allow session transacted queue browser to view messages th…
Browse files Browse the repository at this point in the history
…at have been dlq'd due to max redeliveries
  • Loading branch information
mattrpav committed Aug 23, 2024
1 parent e45ee4a commit a7df4ca
Showing 1 changed file with 187 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import static org.junit.Assert.fail;

import java.io.IOException;
import java.net.URI;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
Expand All @@ -35,9 +37,16 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.qpid.proton.InterruptException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -68,6 +77,10 @@ public void startBroker() throws Exception {

connectUri = connector.getConnectUri();
factory = new ActiveMQConnectionFactory(connectUri);
factory.setWatchTopicAdvisories(false);
factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0l);
factory.getRedeliveryPolicy().setRedeliveryDelay(0l);
factory.getRedeliveryPolicy().setMaximumRedeliveryDelay(0l);
}

public BrokerService createBroker() throws IOException {
Expand Down Expand Up @@ -217,4 +230,177 @@ public void testMemoryLimit() throws Exception {
browser.close();
assertTrue("got at least maxPageSize", received >= maxPageSize);
}

@Test // https://issues.apache.org/jira/browse/AMQ-9554
public void testBrowseRedeliveryMaxTransacted() throws Exception {

IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy();
individualDeadLetterStrategy.setQueuePrefix("");
individualDeadLetterStrategy.setQueueSuffix(".dlq");
individualDeadLetterStrategy.setUseQueueForQueueMessages(true);
broker.getDestinationPolicy().getDefaultEntry().setDeadLetterStrategy(individualDeadLetterStrategy);
broker.getDestinationPolicy().getDefaultEntry().setPersistJMSRedelivered(true);

String messageId = null;

String queueName = "browse.redeliverd.tx";
String dlqQueueName = "browse.redeliverd.tx.dlq";
String dlqDlqQueueName = "browse.redeliverd.tx.dlq.dlq";

ActiveMQQueue queue = new ActiveMQQueue(queueName + "?consumer.prefetchSize=0");
ActiveMQQueue queueDLQ = new ActiveMQQueue(dlqQueueName + "?consumer.prefetchSize=0");
ActiveMQQueue queueDLQDLQ = new ActiveMQQueue(dlqDlqQueueName);

broker.getAdminView().addQueue(queueName);
broker.getAdminView().addQueue(dlqQueueName);

DestinationView dlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqQueueName);
DestinationView queueView = broker.getAdminView().getBroker().getQueueView(queueName);

verifyQueueStats(0l, 0l, 0l, dlqQueueView);
verifyQueueStats(0l, 0l, 0l, queueView);

Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);

Message sendMessage = session.createTextMessage("Hello world!");
producer.send(sendMessage);
messageId = sendMessage.getJMSMessageID();
session.commit();
producer.close();

verifyQueueStats(0l, 0l, 0l, dlqQueueView);
verifyQueueStats(1l, 0l, 1l, queueView);

// Redeliver message to DLQ
Message message = null;
MessageConsumer consumer = session.createConsumer(queue);
int rollbackCount = 0;
do {
message = consumer.receive(2000l);
if(message != null) {
session.rollback();
rollbackCount++;
}
} while (message != null);

assertEquals(Integer.valueOf(7), Integer.valueOf(rollbackCount));
verifyQueueStats(1l, 0l, 1l, dlqQueueView);
verifyQueueStats(1l, 1l, 0l, queueView);

session.commit();
consumer.close();

// Increment redelivery counter on the message in the DLQ
// Close the consumer to force broker to dispatch
Message messageDLQ = null;
MessageConsumer consumerDLQ = session.createConsumer(queueDLQ);
int dlqRollbackCount = 0;
int dlqRollbackCountLimit = 5;
do {
messageDLQ = consumerDLQ.receive(2000l);
if(messageDLQ != null) {
session.rollback();
session.close();
consumerDLQ.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumerDLQ = session.createConsumer(queueDLQ);
dlqRollbackCount++;
}
} while (messageDLQ != null && dlqRollbackCount < dlqRollbackCountLimit);
session.commit();
consumerDLQ.close();

// Browse in tx mode works when we are at the edge of maxRedeliveries
// aka browse does not increment redeliverCounter as expected
Queue brokerQueueDLQ = resolveQueue(broker, queueDLQ);

for(int i=0; i<16; i++) {
QueueBrowser browser = session.createBrowser(queueDLQ);
Enumeration<?> enumeration = browser.getEnumeration();
ActiveMQMessage activemqMessage = null;
int received = 0;
while (enumeration.hasMoreElements()) {
activemqMessage = (ActiveMQMessage)enumeration.nextElement();
received++;
}
browser.close();
assertEquals(Integer.valueOf(1), Integer.valueOf(received));
assertEquals(Integer.valueOf(6), Integer.valueOf(activemqMessage.getRedeliveryCounter()));

// Confirm broker-side redeliveryCounter
QueueMessageReference queueMessageReference = brokerQueueDLQ.getMessage(messageId);
assertEquals(Integer.valueOf(6), Integer.valueOf(queueMessageReference.getRedeliveryCounter()));
}

session.close();
connection.close();

// Change redelivery max and the browser will fail
factory.getRedeliveryPolicy().setMaximumRedeliveries(3);
final Connection browseConnection = factory.createConnection();
browseConnection.start();

final AtomicInteger browseCounter = new AtomicInteger(0);
final AtomicInteger jmsExceptionCounter = new AtomicInteger(0);

final Session browseSession = browseConnection.createSession(true, Session.SESSION_TRANSACTED);

Thread browseThread = new Thread() {
public void run() {

QueueBrowser browser = null;
try {
browser = browseSession.createBrowser(queueDLQ);
Enumeration<?> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
Message message = (Message)enumeration.nextElement();
if(message != null) {
browseCounter.incrementAndGet();
}
}
} catch (JMSException e) {
jmsExceptionCounter.incrementAndGet();
} catch (InterruptException ie) {
if(browser != null) { try { browser.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
if(browseSession != null) { try { browseSession.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
if(browseConnection != null) { try { browseConnection.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
Thread.currentThread().interrupt();
}
}
};
browseThread.start();
Thread.sleep(2000l);
browseThread.interrupt();

assertEquals(Integer.valueOf(0), Integer.valueOf(browseCounter.get()));
assertEquals(Integer.valueOf(0), Integer.valueOf(jmsExceptionCounter.get()));

// ActiveMQConsumer sends a poison ack, messages gets moved to .dlq.dlq AND remains on the .dlq
DestinationView dlqDlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqDlqQueueName);
verifyQueueStats(1l, 1l, 0l, queueView);
verifyQueueStats(1l, 0l, 1l, dlqQueueView);
verifyQueueStats(1l, 0l, 1l, dlqDlqQueueView);
}

protected static void verifyQueueStats(long enqueueCount, long dequeueCount, long queueSize, DestinationView queueView) {
assertEquals(Long.valueOf(enqueueCount), Long.valueOf(queueView.getEnqueueCount()));
assertEquals(Long.valueOf(dequeueCount), Long.valueOf(queueView.getDequeueCount()));
assertEquals(Long.valueOf(queueSize), Long.valueOf(queueView.getQueueSize()));
}

protected static Queue resolveQueue(BrokerService brokerService, ActiveMQQueue activemqQueue) throws Exception {
Set<Destination> destinations = brokerService.getBroker().getDestinations(activemqQueue);
if(destinations == null || destinations.isEmpty()) {
return null;
}

if(destinations.size() > 1) {
fail("Expected one-and-only one queue for: " + activemqQueue);
}

return (Queue)destinations.iterator().next();
}
}

0 comments on commit a7df4ca

Please sign in to comment.