From 8309f204cbb970dd3c1cd6c3abf09ec37f45e604 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 23 Feb 2024 18:08:42 +0800 Subject: [PATCH 1/7] [fix] Fix Reader HasNext() enters infinite loop --- pulsar/consumer_partition.go | 30 ++++++++++++++----- pulsar/reader.go | 1 + pulsar/reader_impl.go | 13 +++------ pulsar/reader_test.go | 56 ++++++++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 16 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fd6441c1cb..940362bda8 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -568,15 +568,31 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { - pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") - return nil, errors.New("failed to redeliver closing or closed consumer") + pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer") + return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } - req := &getLastMsgIDRequest{doneCh: make(chan struct{})} - pc.eventsCh <- req + backoff := &internal.DefaultBackoff{} + request := func() (*trackingMessageID, error) { + req := &getLastMsgIDRequest{doneCh: make(chan struct{})} + pc.eventsCh <- req - // wait for the request to complete - <-req.doneCh - return req.msgID, req.err + // wait for the request to complete + <-req.doneCh + return req.msgID, req.err + } + for { + msgID, err := request() + if err == nil { + return msgID, nil + } + pc.log.Info("RETRY !!!") + nextDelay := backoff.Next() + if backoff.IsMaxBackoffReached() { + return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) + } + pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay) + time.Sleep(nextDelay) + } } func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) { diff --git a/pulsar/reader.go b/pulsar/reader.go index 5e1a73b988..7bbe88dd43 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -113,6 +113,7 @@ type Reader interface { Next(context.Context) (Message, error) // HasNext checks if there is any message available to read from the current position + // If there is any errors, it will return false HasNext() bool // Close the reader and stop the broker to push more messages diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 7b260b88db..5bca3e7313 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -173,16 +173,11 @@ func (r *reader) HasNext() bool { return true } - for { - lastMsgID, err := r.pc.getLastMessageID() - if err != nil { - r.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { - r.lastMessageInBroker = lastMsgID - break - } + lastMsgID, err := r.pc.getLastMessageID() + if err != nil { + return false } + r.lastMessageInBroker = lastMsgID return r.hasMoreMessages() } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index c8228a7ca9..24bb711ad2 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/google/uuid" "github.com/stretchr/testify/assert" ) @@ -943,3 +944,58 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } + +func TestReaderHasNextFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + r.(*reader).pc.state.Store(consumerClosing) + assert.False(t, r.HasNext()) +} + +func TestReaderHasNextRetryFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + // Retry the connection 10 seconds after it's disconnected. + BackoffPolicy: newTestBackoffPolicy(10*time.Second, 10*time.Second), + }) + assert.Nil(t, err) + + // Disconnect the connection + r.(*reader).pc.conn.Load().(internal.Connection).Close() + minTimer := time.NewTimer(10 * time.Second) // Timer to check if r.HasNext() blocked for at least 10s + maxTimer := time.NewTimer(20 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 20s + + done := make(chan bool) + go func() { + assert.False(t, r.HasNext()) + done <- true + }() + + select { + case <-maxTimer.C: + t.Fatal("r.HasNext() blocked for more than 20s") + case <-done: + + if minTimer.Stop() { + t.Fatal("r.HasNext() did not block for at least 10s") + } + if !maxTimer.Stop() { + t.Fatal("r.HasNext() blocked for more than 20s") + } + } + +} From db32e4387b5474920505e93922efe2bc95c8eb0f Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 26 Feb 2024 15:22:34 +0800 Subject: [PATCH 2/7] Reduce backoff time in `TestReaderHasNextRetryFailed` --- pulsar/reader_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ccb88c029e..c6aa5541b3 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -1049,15 +1049,15 @@ func TestReaderHasNextRetryFailed(t *testing.T) { r, err := client.CreateReader(ReaderOptions{ Topic: topic, StartMessageID: EarliestMessageID(), - // Retry the connection 10 seconds after it's disconnected. - BackoffPolicy: newTestBackoffPolicy(10*time.Second, 10*time.Second), + // Retry the connection 1 second after it's disconnected. + BackoffPolicy: newTestBackoffPolicy(1*time.Second, 1*time.Second), }) assert.Nil(t, err) // Disconnect the connection r.(*reader).c.consumers[0].conn.Load().(internal.Connection).Close() - minTimer := time.NewTimer(10 * time.Second) // Timer to check if r.HasNext() blocked for at least 10s - maxTimer := time.NewTimer(20 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 20s + minTimer := time.NewTimer(1 * time.Second) // Timer to check if r.HasNext() blocked for at least 1s + maxTimer := time.NewTimer(2 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 2s done := make(chan bool) go func() { From 753f7c64bb0e75729cc174228379e287d78e69bc Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 26 Feb 2024 15:27:24 +0800 Subject: [PATCH 3/7] Fix lint --- pulsar/reader_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index c6aa5541b3..3f1080bfbb 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -874,6 +874,7 @@ func TestReaderWithSchema(t *testing.T) { assert.Equal(t, *res, value) } +//nolint:unparam func newTestBackoffPolicy(minBackoff, maxBackoff time.Duration) *testBackoffPolicy { return &testBackoffPolicy{ curBackoff: 0, From 49b7fb895409a6c7da799219c43eb346cee2c2b8 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 28 Feb 2024 17:34:39 +0800 Subject: [PATCH 4/7] Refactor backoff logic for `getLastMessageID` --- pulsar/client_impl.go | 24 +++++++++++++----------- pulsar/consumer_partition.go | 16 +++++++++++++--- pulsar/reader_test.go | 26 +++++++++++++++++--------- 3 files changed, 43 insertions(+), 23 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 7daf6f62ab..65aed3b963 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -40,14 +40,15 @@ const ( ) type client struct { - cnxPool internal.ConnectionPool - rpcClient internal.RPCClient - handlers internal.ClientHandlers - lookupService internal.LookupService - metrics *internal.Metrics - tcClient *transactionCoordinatorClient - memLimit internal.MemoryLimitController - closeOnce sync.Once + cnxPool internal.ConnectionPool + rpcClient internal.RPCClient + handlers internal.ClientHandlers + lookupService internal.LookupService + metrics *internal.Metrics + tcClient *transactionCoordinatorClient + memLimit internal.MemoryLimitController + closeOnce sync.Once + operationTimeout time.Duration log log.Logger } @@ -161,9 +162,10 @@ func newClient(options ClientOptions) (Client, error) { c := &client{ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval, maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime), - log: logger, - metrics: metrics, - memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + log: logger, + metrics: metrics, + memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + operationTimeout: operationTimeout, } serviceNameResolver := internal.NewPulsarServiceNameResolver(url) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 1a50665bde..8abcc85441 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -573,7 +573,13 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer") return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } - backoff := &internal.DefaultBackoff{} + remainTime := pc.client.operationTimeout + var backoff internal.BackoffPolicy + if pc.options.backoffPolicy != nil { + backoff = pc.options.backoffPolicy + } else { + backoff = &internal.DefaultBackoff{} + } request := func() (*trackingMessageID, error) { req := &getLastMsgIDRequest{doneCh: make(chan struct{})} pc.eventsCh <- req @@ -587,10 +593,14 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { if err == nil { return msgID, nil } - nextDelay := backoff.Next() - if backoff.IsMaxBackoffReached() { + if remainTime <= 0 { return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) } + nextDelay := backoff.Next() + if nextDelay > remainTime { + nextDelay = remainTime + } + remainTime -= nextDelay pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay) time.Sleep(nextDelay) } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 3f1080bfbb..4ad2ba1fbb 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -24,11 +24,11 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/crypto" - "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -1043,20 +1043,28 @@ func TestReaderHasNextFailed(t *testing.T) { func TestReaderHasNextRetryFailed(t *testing.T) { client, err := NewClient(ClientOptions{ - URL: serviceURL, + URL: serviceURL, + OperationTimeout: 1 * time.Second, }) assert.Nil(t, err) topic := newTopicName() r, err := client.CreateReader(ReaderOptions{ Topic: topic, StartMessageID: EarliestMessageID(), - // Retry the connection 1 second after it's disconnected. - BackoffPolicy: newTestBackoffPolicy(1*time.Second, 1*time.Second), }) assert.Nil(t, err) - // Disconnect the connection - r.(*reader).c.consumers[0].conn.Load().(internal.Connection).Close() + c := make(chan interface{}) + defer close(c) + r.(*reader).c.consumers[0].eventsCh = c + go func() { + for e := range c { + req, ok := e.(*getLastMsgIDRequest) + assert.True(t, ok, "unexpected event type") + req.err = errors.New("expected error") + close(req.doneCh) + } + }() minTimer := time.NewTimer(1 * time.Second) // Timer to check if r.HasNext() blocked for at least 1s maxTimer := time.NewTimer(2 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 2s @@ -1068,14 +1076,14 @@ func TestReaderHasNextRetryFailed(t *testing.T) { select { case <-maxTimer.C: - t.Fatal("r.HasNext() blocked for more than 20s") + t.Fatal("r.HasNext() blocked for more than 2s") case <-done: if minTimer.Stop() { - t.Fatal("r.HasNext() did not block for at least 10s") + t.Fatal("r.HasNext() did not block for at least 1s") } if !maxTimer.Stop() { - t.Fatal("r.HasNext() blocked for more than 20s") + t.Fatal("r.HasNext() blocked for more than 2s") } } From eaa9bed62016299f543b10ee3545ecc32c660ea4 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 28 Feb 2024 17:40:03 +0800 Subject: [PATCH 5/7] Update --- pulsar/reader_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 4ad2ba1fbb..63ce56c8c0 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -874,7 +874,6 @@ func TestReaderWithSchema(t *testing.T) { assert.Equal(t, *res, value) } -//nolint:unparam func newTestBackoffPolicy(minBackoff, maxBackoff time.Duration) *testBackoffPolicy { return &testBackoffPolicy{ curBackoff: 0, From a7a079b82e7265fe5b1b8b3130a40b4f6fac9d71 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 28 Feb 2024 17:59:35 +0800 Subject: [PATCH 6/7] Refine the test --- pulsar/reader_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 63ce56c8c0..78c222dac7 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -1043,7 +1043,7 @@ func TestReaderHasNextFailed(t *testing.T) { func TestReaderHasNextRetryFailed(t *testing.T) { client, err := NewClient(ClientOptions{ URL: serviceURL, - OperationTimeout: 1 * time.Second, + OperationTimeout: 2 * time.Second, }) assert.Nil(t, err) topic := newTopicName() @@ -1055,7 +1055,13 @@ func TestReaderHasNextRetryFailed(t *testing.T) { c := make(chan interface{}) defer close(c) - r.(*reader).c.consumers[0].eventsCh = c + + // Close the consumer events loop and assign a mock eventsCh + pc := r.(*reader).c.consumers[0] + pc.Close() + pc.state.Store(consumerReady) + pc.eventsCh = c + go func() { for e := range c { req, ok := e.(*getLastMsgIDRequest) @@ -1065,8 +1071,7 @@ func TestReaderHasNextRetryFailed(t *testing.T) { } }() minTimer := time.NewTimer(1 * time.Second) // Timer to check if r.HasNext() blocked for at least 1s - maxTimer := time.NewTimer(2 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 2s - + maxTimer := time.NewTimer(3 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 3s done := make(chan bool) go func() { assert.False(t, r.HasNext()) @@ -1075,15 +1080,10 @@ func TestReaderHasNextRetryFailed(t *testing.T) { select { case <-maxTimer.C: - t.Fatal("r.HasNext() blocked for more than 2s") + t.Fatal("r.HasNext() blocked for more than 3s") case <-done: - - if minTimer.Stop() { - t.Fatal("r.HasNext() did not block for at least 1s") - } - if !maxTimer.Stop() { - t.Fatal("r.HasNext() blocked for more than 2s") - } + assert.False(t, minTimer.Stop(), "r.HasNext() did not block for at least 1s") + assert.True(t, maxTimer.Stop()) } } From 35ae05157b702804037446e1190a8f4e7557fdbf Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 28 Feb 2024 18:01:29 +0800 Subject: [PATCH 7/7] Add log for getLastMessageId --- pulsar/consumer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 8abcc85441..d9c05f3497 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -594,6 +594,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { return msgID, nil } if remainTime <= 0 { + pc.log.WithError(err).Error("Failed to getLastMessageID") return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) } nextDelay := backoff.Next()