Skip to content

Commit

Permalink
[fix][client] Fix DLQ producer name conflicts when there are same nam…
Browse files Browse the repository at this point in the history
…e consumers
  • Loading branch information
geniusjoe committed Nov 11, 2024
1 parent 137df29 commit 419d195
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.reflect.Nullable;
Expand Down Expand Up @@ -261,7 +262,12 @@ public void testDeadLetterTopicWithProducerName() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final String subscription = "my-subscription";
final String consumerName = "my-consumer";
String deadLetterProducerName = String.format("%s-%s-%s-DLQ", topic, subscription, consumerName);
Pattern deadLetterProducerNamePattern =
Pattern.compile("^persistent://my-property/my-ns/dead-letter-topic"
+ "-my-subscription"
+ "-my-consumer"
+ "-[a-zA-Z0-9]{5}"
+ "-DLQ$");

final int maxRedeliveryCount = 1;

Expand Down Expand Up @@ -308,8 +314,9 @@ public void testDeadLetterTopicWithProducerName() throws Exception {
int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
assertEquals(message.getProducerName(), deadLetterProducerName);
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
assertTrue(deadLetterProducerNamePattern.matcher(message.getProducerName()).matches());
log.info("dead letter consumer received message : {} {}, dead letter producer name : {}",
message.getMessageId(), new String(message.getData()), message.getProducerName());
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.pulsar.client.impl.metrics.UpDownCounter;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand Down Expand Up @@ -2263,8 +2264,8 @@ private void initDeadLetterProducerIfNeeded() {
((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.producerName(String.format("%s-%s-%s-DLQ", this.topicName, this.subscription,
this.consumerName))
.producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription,
this.consumerName, ConsumerName.generateRandomName()))
.blockIfQueueFull(false)
.enableBatching(false)
.enableChunking(true)
Expand Down

0 comments on commit 419d195

Please sign in to comment.