Skip to content

Commit

Permalink
More robust system Tier creation / update (#6696) (#6723)
Browse files Browse the repository at this point in the history
System Tier initialization should ideally wait for the Tier informer's
cache to sync before using the lister.  Otherwise we may think that the
system Tiers have not been created yet when in fact it is just the
informer that has not completed the initial List operation.

Additionally, we improve the logic in system Tier initialization:
* After every failed API call, we should probably check the state of the
  informer cache again (via the lister) and decide which step is needed
  next (creation / update / nothing). In case of a failed create or
  update, this gives us the opportunity to check again if the Tier
  actually exists, and if yes, what its priority is.
* AlreadyExists is no longer treated differently for create. The reason
  is that if the Tier already exists and for some reason it was not
  returned by the Lister, it will probably be the validation webhook
  that fails (overlapping priority), and the error won't match
  AlreadyExists.

Related to #6694

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Oct 9, 2024
1 parent 36b0191 commit d5a6e01
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 97 deletions.
9 changes: 8 additions & 1 deletion pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -320,7 +321,13 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) {

// Install a post start hook to initialize Tiers on start-up
s.AddPostStartHook("initialize-tiers", func(context genericapiserver.PostStartHookContext) error {
go c.networkPolicyController.InitializeTiers()
ctx := wait.ContextForChannel(context.StopCh)
go func() {
// context gets cancelled when the server stops.
if err := c.networkPolicyController.InitializeTiers(ctx); err != nil {
klog.ErrorS(err, "Failed to initialize system Tiers")
}
}()
return nil
})
}
Expand Down
140 changes: 74 additions & 66 deletions pkg/controller/networkpolicy/tier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package networkpolicy

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
Expand Down Expand Up @@ -118,83 +120,89 @@ var (
// will first attempt to retrieve the Tier by it's name from K8s and if missing,
// create the CR. InitializeTiers will be called as part of a Post-Start hook
// of antrea-controller's APIServer.
func (n *NetworkPolicyController) InitializeTiers() {
func (n *NetworkPolicyController) InitializeTiers(ctx context.Context) error {
if !cache.WaitForCacheSync(ctx.Done(), n.tierListerSynced) {
// This happens when Done is closed because we are shutting down.
return fmt.Errorf("caches not synced for system Tier initialization")
}
for _, t := range systemGeneratedTiers {
// Check if Tier is already present.
oldTier, err := n.tierLister.Get(t.Name)
if err == nil {
// Tier is already present.
klog.V(2).Infof("%s Tier already created", t.Name)
// Update Tier Priority if it is not set to desired Priority.
expPrio := priorityMap[t.Name]
if oldTier.Spec.Priority != expPrio {
tToUpdate := oldTier.DeepCopy()
tToUpdate.Spec.Priority = expPrio
n.updateTier(tToUpdate)
}
continue
if err := n.initializeTier(ctx, t); err != nil {
return err
}
n.initTier(t)
}
return nil
}

// initTier attempts to create system Tiers until they are created using an
// exponential backoff period from 1 to max of 8secs.
func (n *NetworkPolicyController) initTier(t *secv1beta1.Tier) {
var err error
const maxBackoffTime = 8 * time.Second
backoff := 1 * time.Second
func (n *NetworkPolicyController) initializeTier(ctx context.Context, t *secv1beta1.Tier) error {
// Tier creation or update may fail because antrea APIService is not yet ready to accept
// requests for validation. We will keep retrying until it succeeds, using an exponential
// backoff (not exceeding 8s), unless the context is cancelled.
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 2.0,
Jitter: 0.0,
Steps: 3, // max duration of 8s
}
retryAttempt := 1
for {
klog.V(2).InfoS("Creating system Tier", "tier", t.Name)
_, err = n.crdClient.CrdV1beta1().Tiers().Create(context.TODO(), t, metav1.CreateOptions{})
// Attempt to recreate Tier after a backoff only if it does not exist.
if err != nil {
if errors.IsAlreadyExists(err) {
klog.InfoS("System Tier already exists", "tier", t.Name)
return
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if success := func() bool {
// Check if Tier is already present.
if oldTier, err := n.tierLister.Get(t.Name); err == nil {
// Tier is already present.
klog.V(2).InfoS("Tier already exists", "tier", klog.KObj(t))
// Update Tier Priority if it is not set to desired Priority.
expPrio := t.Spec.Priority
if oldTier.Spec.Priority == expPrio {
return true
}
tToUpdate := oldTier.DeepCopy()
tToUpdate.Spec.Priority = expPrio
if err := n.updateTier(ctx, tToUpdate); err != nil {
klog.InfoS("Failed to update system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err)
return false
}
return true
}
klog.InfoS("Failed to create system Tier on init, will retry", "tier", t.Name, "attempts", retryAttempt, "err", err)
// Tier creation may fail because antrea APIService is not yet ready
// to accept requests for validation. Retry fixed number of times
// not exceeding 8s.
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoffTime {
backoff = maxBackoffTime
if err := n.createTier(ctx, t); err != nil {
// Error may be that the Tier already exists, in this case, we will
// call tierLister.Get again and compare priorities.
klog.InfoS("Failed to create system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err)
return false
}
retryAttempt += 1
continue
return true
}(); success {
break
}
retryAttempt += 1
waitBeforeRetry := backoff.Step()
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(waitBeforeRetry):
}
klog.InfoS("Created system Tier", "tier", t.Name)
return
}
return nil
}

// updateTier attempts to update Tiers using an
// exponential backoff period from 1 to max of 8secs.
func (n *NetworkPolicyController) updateTier(t *secv1beta1.Tier) {
var err error
const maxBackoffTime = 8 * time.Second
backoff := 1 * time.Second
retryAttempt := 1
for {
klog.V(2).Infof("Updating %s Tier", t.Name)
_, err = n.crdClient.CrdV1beta1().Tiers().Update(context.TODO(), t, metav1.UpdateOptions{})
// Attempt to update Tier after a backoff.
if err != nil {
klog.Warningf("Failed to update %s Tier on init: %v. Retry attempt: %d", t.Name, err, retryAttempt)
// Tier update may fail because antrea APIService is not yet ready
// to accept requests for validation. Retry fixed number of times
// not exceeding 8s.
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoffTime {
backoff = maxBackoffTime
}
retryAttempt += 1
continue
}
return
func (n *NetworkPolicyController) createTier(ctx context.Context, t *secv1beta1.Tier) error {
klog.V(2).InfoS("Creating system Tier", "tier", klog.KObj(t))
if _, err := n.crdClient.CrdV1beta1().Tiers().Create(ctx, t, metav1.CreateOptions{}); err != nil {
return err
}
klog.InfoS("Created system Tier", "tier", klog.KObj(t))
return nil
}

func (n *NetworkPolicyController) updateTier(ctx context.Context, t *secv1beta1.Tier) error {
klog.V(2).InfoS("Updating system Tier", "tier", klog.KObj(t))
if _, err := n.crdClient.CrdV1beta1().Tiers().Update(ctx, t, metav1.UpdateOptions{}); err != nil {
return err
}
klog.InfoS("Updated system Tier", "tier", klog.KObj(t))
return nil
}
126 changes: 96 additions & 30 deletions pkg/controller/networkpolicy/tier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package networkpolicy

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,61 +30,124 @@ import (
"antrea.io/antrea/pkg/client/clientset/versioned/fake"
)

func TestInitTier(t *testing.T) {
testTier := &secv1beta1.Tier{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: secv1beta1.TierSpec{
Priority: 10,
},
func TestInitializeTier(t *testing.T) {
makeTestTier := func(priority int32) *secv1beta1.Tier {
return &secv1beta1.Tier{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: secv1beta1.TierSpec{
Priority: priority,
},
}
}
testTier := makeTestTier(10)

tests := []struct {
name string
reactor k8stesting.ReactionFunc
expectedCalled int
name string
createReactor k8stesting.ReactionFunc
updateReactor k8stesting.ReactionFunc
existingTier *secv1beta1.Tier
createExpectedCalls int
updateExpectedCalls int
}{
{
name: "create successfully",
expectedCalled: 1,
name: "create successful",
createExpectedCalls: 1,
},
{
name: "tier already exists",
reactor: func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewAlreadyExists(action.GetResource().GroupResource(), testTier.Name)
},
expectedCalled: 1,
name: "create error",
createReactor: func() k8stesting.ReactionFunc {
curFailureCount := 0
return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
if curFailureCount < 1 {
curFailureCount += 1
return true, nil, errors.NewServiceUnavailable("unknown reason")
}
return false, nil, nil
}
}(),
createExpectedCalls: 2,
},
{
name: "transient error",
reactor: func() k8stesting.ReactionFunc {
name: "update successful",
existingTier: makeTestTier(5),
updateExpectedCalls: 1,
},
{
name: "update error",
updateReactor: func() k8stesting.ReactionFunc {
curFailureCount := 0
maxFailureCount := 1
return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
if curFailureCount < maxFailureCount {
if curFailureCount < 1 {
curFailureCount += 1
return true, nil, errors.NewServiceUnavailable("unknown reason")
}
return false, nil, nil
}
}(),
expectedCalled: 2,
existingTier: makeTestTier(5),
updateExpectedCalls: 2,
},
{
name: "no change needed",
existingTier: makeTestTier(10),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
_, c := newController(nil, nil)
if tc.reactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.reactor)
ctx := context.Background()
crdObjects := []runtime.Object{}
if tc.existingTier != nil {
crdObjects = append(crdObjects, tc.existingTier)
}
_, c := newController(nil, crdObjects)
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

if tc.createReactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.createReactor)
}
if tc.updateReactor != nil {
c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", tc.updateReactor)
}
createCalled := 0
createCalls := 0
c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) {
createCalled += 1
createCalls += 1
return false, nil, nil
})
c.initTier(testTier)
assert.Equal(t, tc.expectedCalled, createCalled)
updateCalls := 0
c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) {
updateCalls += 1
return false, nil, nil
})
// Prevent test from hanging in case of issue.
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
require.NoError(t, c.initializeTier(ctx, testTier))
assert.Equal(t, tc.createExpectedCalls, createCalls)
assert.Equal(t, tc.updateExpectedCalls, updateCalls)
})
}

}

func TestInitializeTiers(t *testing.T) {
ctx := context.Background()

_, c := newController(nil, nil)
stopCh := make(chan struct{})
defer close(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

// All system Tiers should be created on the first try, so we can use a small timeout.
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
require.NoError(t, c.InitializeTiers(ctx))
tiers, err := c.crdClient.CrdV1beta1().Tiers().List(ctx, metav1.ListOptions{})
require.NoError(t, err)
assert.Len(t, tiers.Items, len(systemGeneratedTiers))
}

0 comments on commit d5a6e01

Please sign in to comment.