diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 7d34ce33e0a7f..3bfbaf48b117b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import lombok.Cleanup; import lombok.Data; import org.apache.avro.reflect.Nullable; @@ -261,7 +262,12 @@ public void testDeadLetterTopicWithProducerName() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; final String subscription = "my-subscription"; final String consumerName = "my-consumer"; - String deadLetterProducerName = String.format("%s-%s-%s-DLQ", topic, subscription, consumerName); + Pattern deadLetterProducerNamePattern = + Pattern.compile("^persistent://my-property/my-ns/dead-letter-topic" + + "-my-subscription" + + "-my-consumer" + + "-[a-zA-Z0-9]{5}" + + "-DLQ$"); final int maxRedeliveryCount = 1; @@ -308,8 +314,9 @@ public void testDeadLetterTopicWithProducerName() throws Exception { int totalInDeadLetter = 0; do { Message message = deadLetterConsumer.receive(); - assertEquals(message.getProducerName(), deadLetterProducerName); - log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + assertTrue(deadLetterProducerNamePattern.matcher(message.getProducerName()).matches()); + log.info("dead letter consumer received message : {} {}, dead letter producer name : {}", + message.getMessageId(), new String(message.getData()), message.getProducerName()); deadLetterConsumer.acknowledge(message); totalInDeadLetter++; } while (totalInDeadLetter < sendMessages); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 004adab56f529..15c6c018b37c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -97,6 +97,7 @@ import org.apache.pulsar.client.impl.metrics.UpDownCounter; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -2263,8 +2264,8 @@ private void initDeadLetterProducerIfNeeded() { ((ProducerBuilderImpl) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) .topic(this.deadLetterPolicy.getDeadLetterTopic()) - .producerName(String.format("%s-%s-%s-DLQ", this.topicName, this.subscription, - this.consumerName)) + .producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription, + this.consumerName, ConsumerName.generateRandomName())) .blockIfQueueFull(false) .enableBatching(false) .enableChunking(true)