Skip to content

Commit

Permalink
fix: improve lookup of consumer-groups' consumers
Browse files Browse the repository at this point in the history
This commit makes lookups for consumer-group's consumers
more performant by adding indexes in the in-memory db,
instead of relying on "manual" looping which is very
expensive when several thousands of consumers exist.
  • Loading branch information
GGabriele committed Jul 3, 2024
1 parent 1047bbb commit 7a0d9f5
Showing 1 changed file with 46 additions and 29 deletions.
75 changes: 46 additions & 29 deletions pkg/state/consumer_group_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
const (
consumerGroupConsumerTableName = "consumerGroupConsumer"
consumerByGroupID = "consumerByGroupID"
consumerByConsumerID = "consumerByConsumerID"
consumerByUsername = "consumerByUsername"
consumerByCustomID = "consumerByCustomID"
)

var errInvalidConsumerGroup = fmt.Errorf("consumer_group.ID is required in consumer group consumers")
Expand Down Expand Up @@ -60,6 +63,39 @@ var consumerGroupConsumerTableSchema = &memdb.TableSchema{
},
},
},
consumerByConsumerID: {
Name: consumerByConsumerID,
Indexer: &indexers.SubFieldIndexer{
Fields: []indexers.Field{
{
Struct: "Consumer",
Sub: "ID",
},
},
},
},
consumerByUsername: {
Name: consumerByUsername,
Indexer: &indexers.SubFieldIndexer{
Fields: []indexers.Field{
{
Struct: "Consumer",
Sub: "Username",
},
},
},
},
consumerByCustomID: {
Name: consumerByCustomID,
Indexer: &indexers.SubFieldIndexer{
Fields: []indexers.Field{
{
Struct: "Consumer",
Sub: "CustomID",
},
},
},
},
},
}

Expand Down Expand Up @@ -110,39 +146,20 @@ func (k *ConsumerGroupConsumersCollection) Add(consumer ConsumerGroupConsumer) e
return nil
}

func getAllByConsumerGroupID(txn *memdb.Txn, consumerGroupID string) ([]*ConsumerGroupConsumer, error) {
iter, err := txn.Get(consumerGroupConsumerTableName, consumerByGroupID, consumerGroupID)
if err != nil {
return nil, err
}

var consumers []*ConsumerGroupConsumer
for el := iter.Next(); el != nil; el = iter.Next() {
t, ok := el.(*ConsumerGroupConsumer)
if !ok {
panic(unexpectedType)
}
consumers = append(consumers, &ConsumerGroupConsumer{ConsumerGroupConsumer: *t.DeepCopy()})
}
return consumers, nil
}

func getConsumerGroupConsumer(txn *memdb.Txn, consumerGroupID string, IDs ...string) (*ConsumerGroupConsumer, error) {
consumers, err := getAllByConsumerGroupID(txn, consumerGroupID)
if err != nil {
return nil, err
}
indexes := []string{consumerByConsumerID, consumerByUsername, consumerByCustomID}

for _, id := range IDs {
for _, consumer := range consumers {
var username string
if consumer.Consumer.Username != nil {
username = *consumer.Consumer.Username
} else {
username = *consumer.Consumer.CustomID
for _, index := range indexes {
res, err := txn.First(consumerGroupConsumerTableName, index, id)
if err != nil {
return nil, err
}
if id == *consumer.Consumer.ID || id == username {
return &ConsumerGroupConsumer{ConsumerGroupConsumer: *consumer.DeepCopy()}, nil
if res != nil {
consumer := res.(*ConsumerGroupConsumer)
if *consumer.ConsumerGroup.ID == consumerGroupID {
return consumer, nil
}
}
}
}
Expand Down

0 comments on commit 7a0d9f5

Please sign in to comment.