Skip to content

Commit

Permalink
fix: fix producer connection (#1243)
Browse files Browse the repository at this point in the history
* fix: fix producer connection

* Fix test

* Fix nil pointer

* Fix GetConnection err

* Fix cnx

(cherry picked from commit 29f2779)
  • Loading branch information
nodece authored and RobertIndie committed Jul 15, 2024
1 parent 50dce7e commit 81eca35
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
32 changes: 8 additions & 24 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,18 @@ func (c *rpcClient) RequestToHost(serviceNameResolver *ServiceNameResolver, requ

func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
c.metrics.RPCRequestCount.Inc()
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
if err != nil {
return nil, err
}

return c.RequestOnCnx(cnx, requestID, cmdType, message)
}

func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
c.metrics.RPCRequestCount.Inc()

ch := make(chan result, 1)

cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
Expand All @@ -171,7 +177,7 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
case res := <-ch:
// Ignoring producer not ready response.
// Continue to wait for the producer to create successfully
if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
if res.error == nil && res.Response != nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
if !res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
timeoutCh = nil
break
Expand All @@ -184,28 +190,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
}
}

func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
c.metrics.RPCRequestCount.Inc()

ch := make(chan result, 1)

cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
ch <- result{&RPCResult{
Cnx: cnx,
Response: response,
}, err}
close(ch)
})

select {
case res := <-ch:
return res.RPCResult, res.error
case <-time.After(c.requestTimeout):
return nil, ErrRequestTimeOut
}
}

func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
c.metrics.RPCRequestCount.Inc()
return cnx.SendRequestNoWait(baseCommand(cmdType, message))
Expand Down
20 changes: 12 additions & 8 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,24 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {

cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr)
// registering the producer first in case broker sends commands in the middle
if err == nil {
p._setConn(cnx)
err = p._getConn().RegisterListener(p.producerID, p)
if err != nil {
p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID)
}
if err != nil {
p.log.Error("Failed to get connection")
return err
}

p._setConn(cnx)
err = p._getConn().RegisterListener(p.producerID, p)
if err != nil {
p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID)
}

res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
res, err := p.client.rpcClient.RequestOnCnx(cnx, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p._getConn().UnregisterListener(p.producerID)
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
if errors.Is(err, internal.ErrRequestTimeOut) {
id := p.client.rpcClient.NewRequestID()
_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
_, _ = p.client.rpcClient.RequestOnCnx(cnx, id, pb.BaseCommand_CLOSE_PRODUCER,
&pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
Expand Down
32 changes: 32 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -2474,3 +2476,33 @@ func TestDisableReplication(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, []string{"__local__"}, msgMetadata.GetReplicateTo())
}

func TestProducerWithMaxConnectionsPerBroker(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
MaxConnectionsPerBroker: 8,
})
require.NoError(t, err)
defer client.Close()

for i := 0; i < 10; i++ {
testProducer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
Schema: NewBytesSchema(nil),
})
require.NoError(t, err)
require.NotNil(t, testProducer)

var ok int32
testProducer.SendAsync(context.Background(), &ProducerMessage{Value: []byte("hello")},
func(id MessageID, producerMessage *ProducerMessage, err error) {
if err == nil {
atomic.StoreInt32(&ok, 1)
}
})
require.Eventually(t, func() bool {
return atomic.LoadInt32(&ok) == 1
}, 3*time.Second, time.Millisecond*100)
testProducer.Close()
}
}

0 comments on commit 81eca35

Please sign in to comment.