Skip to content

Commit

Permalink
chore: do not run goroutines in manager.New
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Feb 13, 2025
1 parent 1e9406f commit e66c75c
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 82 deletions.
46 changes: 22 additions & 24 deletions internal/clients/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type AdminAPIClientsManager struct {

dbMode dpconf.DBMode

ctx context.Context
onceNotifyLoopRunning sync.Once
runningChan chan struct{}
isRunning bool
Expand Down Expand Up @@ -119,7 +118,6 @@ func NewAdminAPIClientsManager(
readinessChecker: readinessChecker,
readinessReconciliationTicker: clock.NewTicker(),
discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI),
ctx: ctx,
runningChan: make(chan struct{}),
logger: ctrl.LoggerFrom(ctx),
}
Expand All @@ -138,9 +136,9 @@ func (c *AdminAPIClientsManager) Running() chan struct{} {

// Run runs a goroutine that will dynamically ingest new addresses of Kong Admin API endpoints.
// It should only be called when Gateway Discovery is enabled.
func (c *AdminAPIClientsManager) Run() {
func (c *AdminAPIClientsManager) Run(ctx context.Context) {
c.onceNotifyLoopRunning.Do(func() {
go c.gatewayClientsReconciliationLoop()
c.gatewayClientsReconciliationLoop(ctx)

c.lock.Lock()
defer c.lock.Unlock()
Expand All @@ -150,17 +148,17 @@ func (c *AdminAPIClientsManager) Run() {

// Notify receives a list of addresses that KongClient should use from now on as
// a list of Kong Admin API endpoints.
func (c *AdminAPIClientsManager) Notify(discoveredAPIs []adminapi.DiscoveredAdminAPI) {
func (c *AdminAPIClientsManager) Notify(ctx context.Context, discoveredAPIs []adminapi.DiscoveredAdminAPI) {
// Ensure here that we're not done.
select {
case <-c.ctx.Done():
case <-ctx.Done():
return
default:
}

// And here also listen on c.ctx.Done() to allow the notification to be interrupted.
select {
case <-c.ctx.Done():
case <-ctx.Done():
case c.discoveredAdminAPIsNotifyChan <- discoveredAPIs:
}
}
Expand Down Expand Up @@ -207,12 +205,12 @@ func (c *AdminAPIClientsManager) GatewayClientsCount() int {
// GatewayClients call will return an already updated slice of clients.
// It will return `false` as a second result in case the notifications loop is not running (e.g. static clients setup
// is used and no updates are going to happen).
func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan struct{}, bool) {
func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges(ctx context.Context) (<-chan struct{}, bool) {
c.lock.Lock()
defer c.lock.Unlock()

// Context is already done, no subscriptions should be created.
if c.ctx.Err() != nil {
if ctx.Err() != nil {
return nil, false
}

Expand All @@ -229,45 +227,45 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru
// gatewayClientsReconciliationLoop is an inner loop listening on:
// - discoveredAdminAPIsNotifyChan - triggered on every Notify() call.
// - readinessReconciliationTicker - triggered on every readinessReconciliationTicker tick.
func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() {
func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop(ctx context.Context) {
c.readinessReconciliationTicker.Reset(c.readinessReconciliationInterval)
defer c.readinessReconciliationTicker.Stop()

close(c.runningChan)
for {
select {
case <-c.ctx.Done():
c.logger.V(logging.InfoLevel).Info("Closing AdminAPIClientsManager", "reason", c.ctx.Err())
case <-ctx.Done():
c.logger.V(logging.InfoLevel).Info("Closing AdminAPIClientsManager", "reason", ctx.Err())
c.closeGatewayClientsSubscribers()
return
case discoveredAdminAPIs := <-c.discoveredAdminAPIsNotifyChan:
c.onDiscoveredAdminAPIsNotification(discoveredAdminAPIs)
c.onDiscoveredAdminAPIsNotification(ctx, discoveredAdminAPIs)
case <-c.readinessReconciliationTicker.Channel():
c.onReadinessReconciliationTick()
c.onReadinessReconciliationTick(ctx)
}
}
}

// onDiscoveredAdminAPIsNotification is called when a new notification about Admin API addresses change is received.
// It will adjust lists of gateway clients and notify subscribers about the change if readyGatewayClients list has
// changed.
func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) {
func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(ctx context.Context, discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) {
c.logger.V(logging.DebugLevel).Info("Received notification about Admin API addresses change")

clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs)
readinessChanged := c.reconcileGatewayClientsReadiness()
readinessChanged := c.reconcileGatewayClientsReadiness(ctx)
if clientsChanged || readinessChanged {
c.notifyGatewayClientsSubscribers()
c.notifyGatewayClientsSubscribers(ctx)
}
}

// onReadinessReconciliationTick is called on every readinessReconciliationTicker tick. It will reconcile readiness
// of all gateway clients and notify subscribers about the change if readyGatewayClients list has changed.
func (c *AdminAPIClientsManager) onReadinessReconciliationTick() {
func (c *AdminAPIClientsManager) onReadinessReconciliationTick(ctx context.Context) {
c.logger.V(logging.DebugLevel).Info("Reconciling readiness of gateway clients")

if changed := c.reconcileGatewayClientsReadiness(); changed {
c.notifyGatewayClientsSubscribers()
if changed := c.reconcileGatewayClientsReadiness(ctx); changed {
c.notifyGatewayClientsSubscribers(ctx)
}
}

Expand Down Expand Up @@ -328,7 +326,7 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi
// If any of the clients is not ready anymore, it will be moved to the pendingGatewayClients list. If any of the clients
// is not pending anymore, it will be moved to the readyGatewayClients list. It returns true if any transition has been
// made, false otherwise.
func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool {
func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness(ctx context.Context) bool {
// Reset the ticker after each readiness reconciliation despite the trigger (whether it was a tick or a notification).
// It's to ensure that the readiness is not reconciled too often when we receive a lot of notifications.
defer c.readinessReconciliationTicker.Reset(c.readinessReconciliationInterval)
Expand All @@ -342,7 +340,7 @@ func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool {
}

readinessCheckResult := c.readinessChecker.CheckReadiness(
c.ctx,
ctx,
lo.MapToSlice(c.readyGatewayClients, func(_ string, cl *adminapi.Client) AlreadyCreatedClient { return cl }),
lo.Values(c.pendingGatewayClients),
)
Expand All @@ -360,11 +358,11 @@ func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool {
}

// notifyGatewayClientsSubscribers sends notifications to all subscribers that have called SubscribeToGatewayClientsChanges.
func (c *AdminAPIClientsManager) notifyGatewayClientsSubscribers() {
func (c *AdminAPIClientsManager) notifyGatewayClientsSubscribers(ctx context.Context) {
c.logger.V(logging.DebugLevel).Info("Notifying subscribers about gateway clients change")
for _, sub := range c.gatewayClientsChangesSubscribers {
select {
case <-c.ctx.Done():
case <-ctx.Done():
c.logger.V(logging.InfoLevel).Info("Not sending notification to subscribers as the context is done")
return
case sub <- struct{}{}:
Expand Down
62 changes: 31 additions & 31 deletions internal/clients/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.
)
require.NoError(t, err)
require.NotNil(t, manager)
manager.Run()
manager.Run(ctx)
<-manager.Running()

requireClientsMatchEventually := func(t *testing.T, c *clients.AdminAPIClientsManager, addresses []string, args ...any) {
Expand All @@ -119,44 +119,44 @@ func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.
"initially there should be the initial client")

readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)})
manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
requireClientsMatchEventually(t, manager, []string{testURL1},
"after notifying about a new address we should get 1 client eventually")

readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{})
manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
requireClientsMatchEventually(t, manager, []string{testURL1},
"after notifying the same address there's no update in clients")

manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
requireClientsMatchEventually(t, manager, []string{testURL1},
"after notifying new address set including the old already existing one but new one not yet ready we get just the old one")

readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)})
manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
requireClientsMatchEventually(t, manager, []string{testURL1, testURL2},
"after notifying new address set including the old already existing one and new one turning ready we get both the old and the new")

readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{})
manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
requireClientsMatchEventually(t, manager, []string{testURL1, testURL2},
"after notifying again with the same set of URLs should not change the existing URLs")

readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedPending: intoTurnedPending(testURL2)})
manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
requireClientsMatchEventually(t, manager, []string{testURL1},
"after notifying the same address set with one turning pending, we get only one client")

readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{})
manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
requireClientsMatchEventually(t, manager, []string{testURL1},
"notifying again with just one URL should decrease the set of URLs to just this one")

manager.Notify([]adminapi.DiscoveredAdminAPI{})
manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{})
requireClientsMatchEventually(t, manager, []string{})

cancel()
require.NotPanics(t, func() { manager.Notify([]adminapi.DiscoveredAdminAPI{}) }, "notifying about new clients after manager has been shut down shouldn't panic")
require.NotPanics(t, func() { manager.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{}) }, "notifying about new clients after manager has been shut down shouldn't panic")
}

func TestNewAdminAPIClientsManager_NoInitialClientsDisallowed(t *testing.T) {
Expand Down Expand Up @@ -252,20 +252,20 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
require.NoError(t, err)

t.Run("no notify loop running should return false when subscribing", func(t *testing.T) {
ch, ok := m.SubscribeToGatewayClientsChanges()
ch, ok := m.SubscribeToGatewayClientsChanges(context.Background())
require.Nil(t, ch)
require.Falsef(t, ok, "expected no subscription to be created because no notification loop is running")
})

m.Run()
m.Run(ctx)

t.Run("when notification loop is running subscription should be created", func(t *testing.T) {
ch, ok := m.SubscribeToGatewayClientsChanges()
ch, ok := m.SubscribeToGatewayClientsChanges(context.Background())
require.NotNil(t, ch)
require.True(t, ok)

readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1, testURL2)})
m.Notify([]adminapi.DiscoveredAdminAPI{
m.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{
testDiscoveredAdminAPI(testURL1),
testDiscoveredAdminAPI(testURL2),
})
Expand All @@ -279,16 +279,16 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
})

t.Run("when multiple subscriptions are created, each of them should receive notifications", func(t *testing.T) {
sub1, ok := m.SubscribeToGatewayClientsChanges()
sub1, ok := m.SubscribeToGatewayClientsChanges(context.Background())
require.NotNil(t, sub1)
require.True(t, ok)

sub2, ok := m.SubscribeToGatewayClientsChanges()
sub2, ok := m.SubscribeToGatewayClientsChanges(context.Background())
require.NotNil(t, sub2)
require.True(t, ok)

readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedPending: intoTurnedPending(testURL2)})
m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
m.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})

select {
case <-sub2:
Expand All @@ -305,7 +305,7 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
})

t.Run("when the context gets cancelled, subscriber channel gets closed", func(t *testing.T) {
ch, ok := m.SubscribeToGatewayClientsChanges()
ch, ok := m.SubscribeToGatewayClientsChanges(context.Background())
require.NotNil(t, ch)
require.True(t, ok)

Expand All @@ -319,7 +319,7 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
})

t.Run("when the context is cancelled, subscriptions cannot be created", func(t *testing.T) {
ch, ok := m.SubscribeToGatewayClientsChanges()
ch, ok := m.SubscribeToGatewayClientsChanges(context.Background())
require.Nil(t, ch)
require.False(t, ok)
})
Expand All @@ -339,7 +339,7 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) {
defer cancel()
m, err := clients.NewAdminAPIClientsManager(ctx, []*adminapi.Client{testClient}, readinessChecker)
require.NoError(t, err)
m.Run()
m.Run(ctx)

// Run a goroutine that will call GatewayClients() every millisecond.
go func() {
Expand All @@ -355,7 +355,7 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) {

go func() {
for i := 0; i < 100; i++ {
go m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
go m.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)})
}
}()

Expand All @@ -379,11 +379,11 @@ func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {
m, err := clients.NewAdminAPIClientsManager(ctx, []*adminapi.Client{testClient}, readinessChecker)
require.NoError(t, err)

m.Run()
m.Run(ctx)
<-m.Running()

var receivedNotificationsCount atomic.Uint32
ch, ok := m.SubscribeToGatewayClientsChanges()
ch, ok := m.SubscribeToGatewayClientsChanges(context.Background())
require.NotNil(t, ch)
require.True(t, ok)

Expand Down Expand Up @@ -417,7 +417,7 @@ func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {
}

// Notify the first set of clients and make sure that the subscriber doesn't get notified as it was initial state.
m.Notify(firstClientsSet)
m.Notify(context.Background(), firstClientsSet)
notificationsCountEventuallyEquals(0)
require.Equal(t, 1, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients")
requireLastReadinessCheckCall(readinessCheckCall{
Expand All @@ -426,17 +426,17 @@ func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {
})

// Notify an empty set of clients and make sure that the subscriber get notified.
m.Notify(nil)
m.Notify(context.Background(), nil)
notificationsCountEventuallyEquals(1)
require.Equal(t, 1, readinessChecker.CallsCount(), "no readiness check should be performed when notifying an empty set")

// Notify an empty set again and make sure that the subscriber doesn't get notified as the state didn't change.
m.Notify(nil)
m.Notify(context.Background(), nil)
notificationsCountEventuallyEquals(1)
require.Equal(t, 1, readinessChecker.CallsCount(), "no readiness check should be performed when notifying an empty set")

// Notify the second set of clients without making the new one ready and make sure that the subscriber gets no notification.
m.Notify(secondClientsSet)
m.Notify(context.Background(), secondClientsSet)
notificationsCountEventuallyEquals(1)
requireLastReadinessCheckCall(readinessCheckCall{
AlreadyCreatedURLs: []string{},
Expand All @@ -446,15 +446,15 @@ func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {

// Notify the second set of clients and make sure that the subscriber gets notified after the new one becomes ready.
readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)})
m.Notify(secondClientsSet)
m.Notify(context.Background(), secondClientsSet)
notificationsCountEventuallyEquals(2)
requireLastReadinessCheckCall(readinessCheckCall{
AlreadyCreatedURLs: []string{},
PendingURLs: []string{testURL2},
})
require.Equal(t, 3, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients")

m.Notify([]adminapi.DiscoveredAdminAPI{firstClientsSet[0], secondClientsSet[0]})
m.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{firstClientsSet[0], secondClientsSet[0]})
notificationsCountEventuallyEquals(3)
requireLastReadinessCheckCall(readinessCheckCall{
AlreadyCreatedURLs: []string{testURL2},
Expand All @@ -478,7 +478,7 @@ func TestAdminAPIClientsManager_PeriodicReadinessReconciliation(t *testing.T) {
defer cancel()
m, err := clients.NewAdminAPIClientsManager(ctx, []*adminapi.Client{testClient}, readinessChecker, clients.WithReadinessReconciliationTicker(readinessTicker))
require.NoError(t, err)
m.Run()
m.Run(ctx)
<-m.Running()

readinessCheckCallEventuallyMatches := func(expected readinessCheckCall) {
Expand All @@ -505,7 +505,7 @@ func TestAdminAPIClientsManager_PeriodicReadinessReconciliation(t *testing.T) {
require.Equal(t, 1, readinessChecker.CallsCount())

// Notify with a new client and check the readiness check call was made as expected.
m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
m.Notify(context.Background(), []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)})
readinessCheckCallEventuallyMatches(readinessCheckCall{
AlreadyCreatedURLs: []string{testURL1},
PendingURLs: []string{testURL2},
Expand Down
Loading

0 comments on commit e66c75c

Please sign in to comment.