From 3f9b72cd9b60782c2d732cc73cef246fba029e22 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Sat, 31 Aug 2024 18:33:14 +0800 Subject: [PATCH] fix #1276 --- pulsar/consumer_partition.go | 5 +- pulsar/consumer_zero_queue_test.go | 83 ++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index d8001dc122..0c6176766e 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1494,7 +1494,10 @@ func (pc *partitionConsumer) dispatcher() { messages[0] = nil messages = messages[1:] - pc.availablePermits.inc() + // for the zeroQueueConsumer, the permits controlled by itself + if pc.options.receiverQueueSize > 0 { + pc.availablePermits.inc() + } if pc.options.autoReceiverQueueSize { pc.incomingMessages.Dec() diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index d8c8f7c9a1..6f5a8dccb0 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -95,6 +95,89 @@ func TestNormalZeroQueueConsumer(t *testing.T) { err = consumer.Unsubscribe() assert.Nil(t, err) } + +func TestMultipleConsumer(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + log.Fatal(err) + } + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create consumer1 + consumer1, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Shared, + EnableZeroQueueConsumer: true, + }) + assert.Nil(t, err) + _, ok := consumer1.(*zeroQueueConsumer) + assert.True(t, ok) + defer consumer1.Close() + + // create consumer2 + consumer2, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Shared, + EnableZeroQueueConsumer: true, + }) + assert.Nil(t, err) + _, ok = consumer2.(*zeroQueueConsumer) + assert.True(t, ok) + defer consumer2.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + sendNum := 10 + // send 10 messages + for i := 0; i < sendNum; i++ { + msg, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }) + assert.Nil(t, err) + log.Printf("send message: %s", msg.String()) + } + + // receive messages + for i := 0; i < sendNum/2; i++ { + msg, err := consumer1.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + log.Printf("consumer1 receive message: %s %s", msg.ID().String(), msg.Payload()) + // ack message + consumer1.Ack(msg) + } + + // receive messages + for i := 0; i < sendNum/2; i++ { + msg, err := consumer2.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + log.Printf("consumer2 receive message: %s %s", msg.ID().String(), msg.Payload()) + // ack message + consumer2.Ack(msg) + } + +} + func TestPartitionZeroQueueConsumer(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL,