Skip to content

Commit

Permalink
fix: use least loaded broker to refresh metadata
Browse files Browse the repository at this point in the history
Seed brokers never change after client initialization. If the first seed
broker became stale (still online, but moved to other Kafka cluster),
Sarama client may use this stale broker to get the wrong metadata. To
avoid using the stale broker to do metadata refresh, we will choose the
least loaded broker in the cached broker list which is the similar to
how the Java client implementation works:

https://github.com/apache/kafka/blob/7483991a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L671-L736

Contributes-to: #2637

Signed-off-by: Hao Sun <[email protected]>
  • Loading branch information
HaoSunUber committed Sep 12, 2023
1 parent 4b55bb3 commit 98ec384
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 137 deletions.
12 changes: 8 additions & 4 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,11 +1712,15 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
seedBroker1.BrokerID(), b.ID())
}

metadataResponse := NewMockMetadataResponse(t).
SetController(seedBroker2.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID())
seedBroker1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker2.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
"MetadataRequest": metadataResponse,
})
seedBroker2.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": metadataResponse,
})

if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
Expand Down
69 changes: 40 additions & 29 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
leader.Close() // producer should get EOF
leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
leader.Returns(metadataResponse) // tell it to go to broker 2 again

// Then: a produced message goes through the new broker connection.
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
Expand Down Expand Up @@ -591,13 +591,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)

seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader2.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader1.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
Expand Down Expand Up @@ -653,13 +653,13 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)

leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader2.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader1.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodSuccess)

expectResults(t, producer, 1, 0)
Expand Down Expand Up @@ -739,16 +739,17 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
leader := NewMockBroker(t, 2)

var leaderLock sync.Mutex

// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
leaderLock.Lock()
defer leaderLock.Unlock()
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})
}

// The seed broker only handles Metadata request in bootstrap
seedBroker.setHandler(metadataRequestHandlerFunc)

var emptyValues int32 = 0

Expand All @@ -770,14 +771,27 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
}
}

leader.setHandler(func(req *request) (res encoderWithHeader) {
failedProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

time.Sleep(50 * time.Millisecond)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
return prodSuccess
}

succeededProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
}

leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": failedProduceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

config := NewTestConfig()
Expand Down Expand Up @@ -816,12 +830,9 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
leaderLock.Lock()
leader = NewMockBroker(t, 2)
leaderLock.Unlock()
leader.setHandler(func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": succeededProduceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

wg.Wait()
Expand Down Expand Up @@ -938,7 +949,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)
leader.Returns(metadataResponse)

// succeed this time
prodSuccess = new(ProduceResponse)
Expand Down Expand Up @@ -994,14 +1005,11 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {

time.Sleep(50 * time.Millisecond)

leader.SetHandlerByMap(map[string]MockResponse{
"ProduceRequest": NewMockProduceResponse(t).
SetVersion(0).
SetError("my_topic", 0, ErrNoError),
})

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)
leader.Returns(metadataResponse)
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

// succeed this time
expectResults(t, producer, 5, 0)
Expand All @@ -1010,6 +1018,9 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
}
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectResults(t, producer, 5, 0)

// shutdown
Expand Down Expand Up @@ -1051,7 +1062,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

seedBroker.Returns(metadataLeader)
leader.Returns(metadataLeader)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
Expand Down
58 changes: 21 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (client *client) Broker(brokerID int32) (*Broker, error) {
func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
// FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go?
brokerErrors := make([]error, 0)
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
request := &InitProducerIDRequest{}

if client.conf.Version.IsAtLeast(V2_7_0_0) {
Expand Down Expand Up @@ -763,22 +763,21 @@ func (client *client) registerBroker(broker *Broker) {
}
}

// deregisterBroker removes a broker from the seedsBroker list, and if it's
// not the seedbroker, removes it from brokers map completely.
// deregisterBroker removes a broker from the broker list, and if it's
// not in the broker list, removes it from seedBrokers.
func (client *client) deregisterBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()

_, ok := client.brokers[broker.ID()]
if ok {
Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
return
}
if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
client.deadSeeds = append(client.deadSeeds, broker)
client.seedBrokers = client.seedBrokers[1:]
} else {
// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
// but we really shouldn't have to; once that loop is made better this case can be
// removed, and the function generally can be renamed from `deregisterBroker` to
// `nextSeedBroker` or something
DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
}
}

Expand All @@ -791,33 +790,12 @@ func (client *client) resurrectDeadBrokers() {
client.deadSeeds = nil
}

func (client *client) anyBroker() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

// not guaranteed to be random *or* deterministic
for _, broker := range client.brokers {
_ = broker.Open(client.conf)
return broker
}

return nil
}

// LeastLoadedBroker returns the broker with the least pending requests.
// Firstly, choose the broker from cached broker list. If the broker list is empty, choose from seed brokers.
func (client *client) LeastLoadedBroker() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

var leastLoadedBroker *Broker
pendingRequests := math.MaxInt
for _, broker := range client.brokers {
Expand All @@ -826,10 +804,16 @@ func (client *client) LeastLoadedBroker() *Broker {
leastLoadedBroker = broker
}
}

if leastLoadedBroker != nil {
_ = leastLoadedBroker.Open(client.conf)
return leastLoadedBroker
}

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

return leastLoadedBroker
}

Expand Down Expand Up @@ -1032,9 +1016,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
return err
}

broker := client.anyBroker()
broker := client.LeastLoadedBroker()
brokerErrors := make([]error, 0)
for ; broker != nil && !pastDeadline(0); broker = client.anyBroker() {
for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() {
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
Expand Down Expand Up @@ -1212,7 +1196,7 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo
}

brokerErrors := make([]error, 0)
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr())

request := new(FindCoordinatorRequest)
Expand Down
Loading

0 comments on commit 98ec384

Please sign in to comment.