diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index fc92319645..d2e3895ec5 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -150,12 +150,18 @@ func (c *rpcClient) RequestToHost(serviceNameResolver *ServiceNameResolver, requ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { - c.metrics.RPCRequestCount.Inc() cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr) if err != nil { return nil, err } + return c.RequestOnCnx(cnx, requestID, cmdType, message) +} + +func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, + message proto.Message) (*RPCResult, error) { + c.metrics.RPCRequestCount.Inc() + ch := make(chan result, 1) cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) { @@ -171,7 +177,7 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request case res := <-ch: // Ignoring producer not ready response. // Continue to wait for the producer to create successfully - if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS { + if res.error == nil && res.Response != nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS { if !res.RPCResult.Response.ProducerSuccess.GetProducerReady() { timeoutCh = nil break @@ -184,28 +190,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request } } -func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, - message proto.Message) (*RPCResult, error) { - c.metrics.RPCRequestCount.Inc() - - ch := make(chan result, 1) - - cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) { - ch <- result{&RPCResult{ - Cnx: cnx, - Response: response, - }, err} - close(ch) - }) - - select { - case res := <-ch: - return res.RPCResult, res.error - case <-time.After(c.requestTimeout): - return nil, ErrRequestTimeOut - } -} - func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error { c.metrics.RPCRequestCount.Inc() return cnx.SendRequestNoWait(baseCommand(cmdType, message)) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f3749bc644..e0981303f3 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -284,20 +284,24 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr) // registering the producer first in case broker sends commands in the middle - if err == nil { - p._setConn(cnx) - err = p._getConn().RegisterListener(p.producerID, p) - if err != nil { - p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID) - } + if err != nil { + p.log.Error("Failed to get connection") + return err + } + + p._setConn(cnx) + err = p._getConn().RegisterListener(p.producerID, p) + if err != nil { + p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID) } - res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) + res, err := p.client.rpcClient.RequestOnCnx(cnx, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { + p._getConn().UnregisterListener(p.producerID) p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") if errors.Is(err, internal.ErrRequestTimeOut) { id := p.client.rpcClient.NewRequestID() - _, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER, + _, _ = p.client.rpcClient.RequestOnCnx(cnx, id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{ ProducerId: &p.producerID, RequestId: &id, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 7c4ff89752..a24ee4573c 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -29,6 +29,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" @@ -2474,3 +2476,33 @@ func TestDisableReplication(t *testing.T) { assert.NoError(t, err) assert.Equal(t, []string{"__local__"}, msgMetadata.GetReplicateTo()) } + +func TestProducerWithMaxConnectionsPerBroker(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + MaxConnectionsPerBroker: 8, + }) + require.NoError(t, err) + defer client.Close() + + for i := 0; i < 10; i++ { + testProducer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + Schema: NewBytesSchema(nil), + }) + require.NoError(t, err) + require.NotNil(t, testProducer) + + var ok int32 + testProducer.SendAsync(context.Background(), &ProducerMessage{Value: []byte("hello")}, + func(id MessageID, producerMessage *ProducerMessage, err error) { + if err == nil { + atomic.StoreInt32(&ok, 1) + } + }) + require.Eventually(t, func() bool { + return atomic.LoadInt32(&ok) == 1 + }, 3*time.Second, time.Millisecond*100) + testProducer.Close() + } +}