diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 7daf6f62a..65aed3b96 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -40,14 +40,15 @@ const ( ) type client struct { - cnxPool internal.ConnectionPool - rpcClient internal.RPCClient - handlers internal.ClientHandlers - lookupService internal.LookupService - metrics *internal.Metrics - tcClient *transactionCoordinatorClient - memLimit internal.MemoryLimitController - closeOnce sync.Once + cnxPool internal.ConnectionPool + rpcClient internal.RPCClient + handlers internal.ClientHandlers + lookupService internal.LookupService + metrics *internal.Metrics + tcClient *transactionCoordinatorClient + memLimit internal.MemoryLimitController + closeOnce sync.Once + operationTimeout time.Duration log log.Logger } @@ -161,9 +162,10 @@ func newClient(options ClientOptions) (Client, error) { c := &client{ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval, maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime), - log: logger, - metrics: metrics, - memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + log: logger, + metrics: metrics, + memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + operationTimeout: operationTimeout, } serviceNameResolver := internal.NewPulsarServiceNameResolver(url) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 3572a5226..162565b2a 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -570,15 +570,41 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { - pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") - return nil, errors.New("failed to redeliver closing or closed consumer") + pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer") + return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } - req := &getLastMsgIDRequest{doneCh: make(chan struct{})} - pc.eventsCh <- req + remainTime := pc.client.operationTimeout + var backoff internal.BackoffPolicy + if pc.options.backoffPolicy != nil { + backoff = pc.options.backoffPolicy + } else { + backoff = &internal.DefaultBackoff{} + } + request := func() (*trackingMessageID, error) { + req := &getLastMsgIDRequest{doneCh: make(chan struct{})} + pc.eventsCh <- req - // wait for the request to complete - <-req.doneCh - return req.msgID, req.err + // wait for the request to complete + <-req.doneCh + return req.msgID, req.err + } + for { + msgID, err := request() + if err == nil { + return msgID, nil + } + if remainTime <= 0 { + pc.log.WithError(err).Error("Failed to getLastMessageID") + return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) + } + nextDelay := backoff.Next() + if nextDelay > remainTime { + nextDelay = remainTime + } + remainTime -= nextDelay + pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay) + time.Sleep(nextDelay) + } } func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) { @@ -1987,16 +2013,11 @@ func (pc *partitionConsumer) hasNext() bool { return true } - for { - lastMsgID, err := pc.getLastMessageID() - if err != nil { - pc.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { - pc.lastMessageInBroker = lastMsgID - break - } + lastMsgID, err := pc.getLastMessageID() + if err != nil { + return false } + pc.lastMessageInBroker = lastMsgID return pc.hasMoreMessages() } diff --git a/pulsar/reader.go b/pulsar/reader.go index 1c5235d42..4daa88906 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -113,6 +113,7 @@ type Reader interface { Next(context.Context) (Message, error) // HasNext checks if there is any message available to read from the current position + // If there is any errors, it will return false HasNext() bool // Close the reader and stop the broker to push more messages diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ccf52875b..78c222dac 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -28,6 +28,7 @@ import ( "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -1023,3 +1024,66 @@ func createPartitionedTopic(topic string, n int) error { } return nil } + +func TestReaderHasNextFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + r.(*reader).c.consumers[0].state.Store(consumerClosing) + assert.False(t, r.HasNext()) +} + +func TestReaderHasNextRetryFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + OperationTimeout: 2 * time.Second, + }) + assert.Nil(t, err) + topic := newTopicName() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + + c := make(chan interface{}) + defer close(c) + + // Close the consumer events loop and assign a mock eventsCh + pc := r.(*reader).c.consumers[0] + pc.Close() + pc.state.Store(consumerReady) + pc.eventsCh = c + + go func() { + for e := range c { + req, ok := e.(*getLastMsgIDRequest) + assert.True(t, ok, "unexpected event type") + req.err = errors.New("expected error") + close(req.doneCh) + } + }() + minTimer := time.NewTimer(1 * time.Second) // Timer to check if r.HasNext() blocked for at least 1s + maxTimer := time.NewTimer(3 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 3s + done := make(chan bool) + go func() { + assert.False(t, r.HasNext()) + done <- true + }() + + select { + case <-maxTimer.C: + t.Fatal("r.HasNext() blocked for more than 3s") + case <-done: + assert.False(t, minTimer.Stop(), "r.HasNext() did not block for at least 1s") + assert.True(t, maxTimer.Stop()) + } + +}