Skip to content

Commit

Permalink
feat: reference upstream consumer-groups relationships (#57)
Browse files Browse the repository at this point in the history
This commit allows consumer-groups -> consumers relationships
to be pulled from upstream via the `default_lookup_tags` option.
This allows to possibly handle consumer-groups within decK and
consumers outside of decK.
  • Loading branch information
GGabriele authored Feb 8, 2024
1 parent 6618ec2 commit 656b2d7
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 8 deletions.
113 changes: 106 additions & 7 deletions pkg/file/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ func (b *stateBuilder) consumerGroups() {
if current != nil {
cgo.ConsumerGroup.CreatedAt = current.CreatedAt
}

for _, consumer := range cg.Consumers {
if consumer != nil {
c, err := b.ingestConsumerGroupConsumer(cg.ID, &FConsumer{
Consumer: *consumer,
})
if err != nil {
b.err = err
return
}
cgo.Consumers = append(cgo.Consumers, c)
}
}
b.rawState.ConsumerGroups = append(b.rawState.ConsumerGroups, &cgo)
}
}
Expand Down Expand Up @@ -295,6 +308,74 @@ func (b *stateBuilder) caCertificates() {
}
}

func (b *stateBuilder) ingestConsumerGroupConsumer(cgID *string, c *FConsumer) (*kong.Consumer, error) {
var (
consumer *state.Consumer
err error
)

// if the consumer is already present in the target state because it is pulled from
// upstream via the lookup tags, we don't want to create a new consumer.
for _, tc := range b.targetContent.Consumers {
stringTCTags := make([]string, len(tc.Tags))
for i, tag := range tc.Tags {
if tag != nil {
stringTCTags[i] = *tag
}
}
sort.Strings(stringTCTags)
if reflect.DeepEqual(stringTCTags, b.lookupTagsConsumers) && !utils.Empty(tc.ID) {
if (tc.Username != nil && c.Username != nil && *tc.Username == *c.Username) ||
(tc.CustomID != nil && c.CustomID != nil && *tc.CustomID == *c.CustomID) {
return &kong.Consumer{
ID: tc.ID,
Username: tc.Username,
CustomID: tc.CustomID,
Tags: tc.Tags,
}, nil
}
}
}

if c.Username != nil {
consumer, err = b.currentState.Consumers.GetByIDOrUsername(*c.Username)
}
if errors.Is(err, state.ErrNotFound) || consumer == nil {
if c.CustomID != nil {
consumer, err = b.currentState.Consumers.GetByCustomID(*c.CustomID)
}
}
if utils.Empty(c.ID) {
if errors.Is(err, state.ErrNotFound) {
c.ID = uuid()
} else if err != nil {
return nil, err
} else {
c.ID = kong.String(*consumer.ID)
}
}
utils.MustMergeTags(&c.Consumer, b.selectTags)
if consumer != nil {
c.Consumer.CreatedAt = consumer.CreatedAt
}

b.rawState.Consumers = append(b.rawState.Consumers, &c.Consumer)
err = b.intermediate.Consumers.Add(state.Consumer{Consumer: c.Consumer})
if err != nil {
return nil, err
}
err = b.intermediate.ConsumerGroupConsumers.Add(state.ConsumerGroupConsumer{
ConsumerGroupConsumer: kong.ConsumerGroupConsumer{
ConsumerGroup: &kong.ConsumerGroup{ID: cgID},
Consumer: &c.Consumer,
},
})
if err != nil {
return nil, err
}
return &c.Consumer, nil
}

func (b *stateBuilder) consumers() {
if b.err != nil {
return
Expand Down Expand Up @@ -345,17 +426,35 @@ func (b *stateBuilder) consumers() {
if consumer != nil {
c.Consumer.CreatedAt = consumer.CreatedAt
}
b.rawState.Consumers = append(b.rawState.Consumers, &c.Consumer)
err = b.intermediate.Consumers.Add(state.Consumer{Consumer: c.Consumer})

// check if consumer was already added in the consumer groups section.
// if it was, we don't want to add it again.
consumerAlreadyAdded := false
consumerGroupConsumers, err := b.intermediate.ConsumerGroupConsumers.GetAll()
if err != nil {
b.err = err
return
}

// ingest consumer into consumer group
if err := b.ingestIntoConsumerGroup(c); err != nil {
b.err = err
return
for _, cgc := range consumerGroupConsumers {
if cgc.Consumer != nil && (c.Username != nil && cgc.Consumer.Username != nil && *cgc.Consumer.Username == *c.Username ||
c.CustomID != nil && cgc.Consumer.CustomID != nil && *cgc.Consumer.CustomID == *c.CustomID) {
c.ID = cgc.Consumer.ID
consumerAlreadyAdded = true
break
}
}
if !consumerAlreadyAdded {
b.rawState.Consumers = append(b.rawState.Consumers, &c.Consumer)
err = b.intermediate.Consumers.Add(state.Consumer{Consumer: c.Consumer})
if err != nil {
b.err = err
return
}
// ingest consumer into consumer group
if err := b.ingestIntoConsumerGroup(c); err != nil {
b.err = err
return
}
}

// plugins for the Consumer
Expand Down
52 changes: 52 additions & 0 deletions tests/integration/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5131,3 +5131,55 @@ func Test_Sync_DeDupPluginsScopedToConsumerGroups(t *testing.T) {
require.NoError(t, sync("testdata/sync/030-plugin-dedup-consumer-groups/kong.yaml"))
testKongState(t, client, false, expectedState, nil)
}

// test scope:
// - 3.5.0+
// - konnect
func Test_Sync_ConsumerGroupConsumerFromUpstream(t *testing.T) {
t.Setenv("DECK_KONNECT_CONTROL_PLANE_NAME", "default")
runWhenEnterpriseOrKonnect(t, ">=3.4.0")
setup(t)

client, err := getTestClient()
if err != nil {
t.Fatalf(err.Error())
}

expectedState := utils.KongRawState{
ConsumerGroups: []*kong.ConsumerGroupObject{
{
ConsumerGroup: &kong.ConsumerGroup{
ID: kong.String("c0f6c818-470c-4df7-8515-c8e904765fcc"),
Name: kong.String("group-1"),
Tags: kong.StringSlice("project:the-project", "managed-by:deck"),
},
Consumers: []*kong.Consumer{
{
ID: kong.String("97cab250-1b0a-4119-aa2e-0756e8931034"),
Username: kong.String("consumer-1"),
Tags: kong.StringSlice("project:the-project", "managed-by:the-background-process"),
},
},
},
},
Consumers: []*kong.Consumer{
{
ID: kong.String("97cab250-1b0a-4119-aa2e-0756e8931034"),
Username: kong.String("consumer-1"),
Tags: kong.StringSlice("project:the-project", "managed-by:the-background-process"),
},
},
}

// simulate the following scenario:
// - a consumer-group defined with a set of tags, ideally managed by decK
// - a consumer defined with another set of tags, ideally managed by an external process
// - the consumer -> consumer-group relationship, ideally managed by an external process
require.NoError(t, sync("testdata/sync/031-consumer-group-consumers-from-upstream/initial.yaml"))
testKongState(t, client, false, expectedState, nil)

// referencing the relationship in a file without the consumer would still work
// if default_lookup_tags are defined to pull consumers from upstream.
require.NoError(t, sync("testdata/sync/031-consumer-group-consumers-from-upstream/consumer-groups.yaml"))
testKongState(t, client, false, expectedState, nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ consumer_groups:
name: silver
consumers:
- username: bar
- username: baz
plugins:
- name: rate-limiting-advanced
config:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
_format_version: "3.0"
_info:
defaults: {}
select_tags:
- project:the-project
- managed-by:deck
default_lookup_tags:
consumers:
- managed-by:the-background-process
- project:the-project
consumer_groups:
- name: group-1
consumers:
- username: consumer-1
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
_format_version: "3.0"
consumer_groups:
- id: c0f6c818-470c-4df7-8515-c8e904765fcc
name: group-1
tags:
- project:the-project
- managed-by:deck
consumers:
- id: 97cab250-1b0a-4119-aa2e-0756e8931034
username: consumer-1
groups:
- id: c0f6c818-470c-4df7-8515-c8e904765fcc
name: group-1
tags:
- project:the-project
- managed-by:the-background-process

0 comments on commit 656b2d7

Please sign in to comment.