From 6f4b17048c92acad8dc83342158022f67cebba93 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Tue, 8 Oct 2024 14:19:59 -0700 Subject: [PATCH] More robust system Tier creation / update (#6696) System Tier initialization should ideally wait for the Tier informer's cache to sync before using the lister. Otherwise we may think that the system Tiers have not been created yet when in fact it is just the informer that has not completed the initial List operation. Additionally, we improve the logic in system Tier initialization: * After every failed API call, we should probably check the state of the informer cache again (via the lister) and decide which step is needed next (creation / update / nothing). In case of a failed create or update, this gives us the opportunity to check again if the Tier actually exists, and if yes, what its priority is. * AlreadyExists is no longer treated differently for create. The reason is that if the Tier already exists and for some reason it was not returned by the Lister, it will probably be the validation webhook that fails (overlapping priority), and the error won't match AlreadyExists. Related to #6694 Signed-off-by: Antonin Bas --- pkg/apiserver/apiserver.go | 9 +- pkg/controller/networkpolicy/tier.go | 140 ++++++++++++---------- pkg/controller/networkpolicy/tier_test.go | 126 ++++++++++++++----- 3 files changed, 178 insertions(+), 97 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index c6a28165c82..c7bd3a32c4a 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/informers" @@ -320,7 +321,13 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) { // Install a post start hook to initialize Tiers on start-up s.AddPostStartHook("initialize-tiers", func(context genericapiserver.PostStartHookContext) error { - go c.networkPolicyController.InitializeTiers() + ctx := wait.ContextForChannel(context.StopCh) + go func() { + // context gets cancelled when the server stops. + if err := c.networkPolicyController.InitializeTiers(ctx); err != nil { + klog.ErrorS(err, "Failed to initialize system Tiers") + } + }() return nil }) } diff --git a/pkg/controller/networkpolicy/tier.go b/pkg/controller/networkpolicy/tier.go index fae1e68d1d1..798f588dccd 100644 --- a/pkg/controller/networkpolicy/tier.go +++ b/pkg/controller/networkpolicy/tier.go @@ -19,11 +19,13 @@ package networkpolicy import ( "context" + "fmt" "time" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" @@ -118,83 +120,89 @@ var ( // will first attempt to retrieve the Tier by it's name from K8s and if missing, // create the CR. InitializeTiers will be called as part of a Post-Start hook // of antrea-controller's APIServer. -func (n *NetworkPolicyController) InitializeTiers() { +func (n *NetworkPolicyController) InitializeTiers(ctx context.Context) error { + if !cache.WaitForCacheSync(ctx.Done(), n.tierListerSynced) { + // This happens when Done is closed because we are shutting down. + return fmt.Errorf("caches not synced for system Tier initialization") + } for _, t := range systemGeneratedTiers { - // Check if Tier is already present. - oldTier, err := n.tierLister.Get(t.Name) - if err == nil { - // Tier is already present. - klog.V(2).Infof("%s Tier already created", t.Name) - // Update Tier Priority if it is not set to desired Priority. - expPrio := priorityMap[t.Name] - if oldTier.Spec.Priority != expPrio { - tToUpdate := oldTier.DeepCopy() - tToUpdate.Spec.Priority = expPrio - n.updateTier(tToUpdate) - } - continue + if err := n.initializeTier(ctx, t); err != nil { + return err } - n.initTier(t) } + return nil } -// initTier attempts to create system Tiers until they are created using an -// exponential backoff period from 1 to max of 8secs. -func (n *NetworkPolicyController) initTier(t *secv1beta1.Tier) { - var err error - const maxBackoffTime = 8 * time.Second - backoff := 1 * time.Second +func (n *NetworkPolicyController) initializeTier(ctx context.Context, t *secv1beta1.Tier) error { + // Tier creation or update may fail because antrea APIService is not yet ready to accept + // requests for validation. We will keep retrying until it succeeds, using an exponential + // backoff (not exceeding 8s), unless the context is cancelled. + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Jitter: 0.0, + Steps: 3, // max duration of 8s + } retryAttempt := 1 for { - klog.V(2).InfoS("Creating system Tier", "tier", t.Name) - _, err = n.crdClient.CrdV1beta1().Tiers().Create(context.TODO(), t, metav1.CreateOptions{}) - // Attempt to recreate Tier after a backoff only if it does not exist. - if err != nil { - if errors.IsAlreadyExists(err) { - klog.InfoS("System Tier already exists", "tier", t.Name) - return + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if success := func() bool { + // Check if Tier is already present. + if oldTier, err := n.tierLister.Get(t.Name); err == nil { + // Tier is already present. + klog.V(2).InfoS("Tier already exists", "tier", klog.KObj(t)) + // Update Tier Priority if it is not set to desired Priority. + expPrio := t.Spec.Priority + if oldTier.Spec.Priority == expPrio { + return true + } + tToUpdate := oldTier.DeepCopy() + tToUpdate.Spec.Priority = expPrio + if err := n.updateTier(ctx, tToUpdate); err != nil { + klog.InfoS("Failed to update system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err) + return false + } + return true } - klog.InfoS("Failed to create system Tier on init, will retry", "tier", t.Name, "attempts", retryAttempt, "err", err) - // Tier creation may fail because antrea APIService is not yet ready - // to accept requests for validation. Retry fixed number of times - // not exceeding 8s. - time.Sleep(backoff) - backoff *= 2 - if backoff > maxBackoffTime { - backoff = maxBackoffTime + if err := n.createTier(ctx, t); err != nil { + // Error may be that the Tier already exists, in this case, we will + // call tierLister.Get again and compare priorities. + klog.InfoS("Failed to create system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err) + return false } - retryAttempt += 1 - continue + return true + }(); success { + break + } + retryAttempt += 1 + waitBeforeRetry := backoff.Step() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(waitBeforeRetry): } - klog.InfoS("Created system Tier", "tier", t.Name) - return } + return nil } -// updateTier attempts to update Tiers using an -// exponential backoff period from 1 to max of 8secs. -func (n *NetworkPolicyController) updateTier(t *secv1beta1.Tier) { - var err error - const maxBackoffTime = 8 * time.Second - backoff := 1 * time.Second - retryAttempt := 1 - for { - klog.V(2).Infof("Updating %s Tier", t.Name) - _, err = n.crdClient.CrdV1beta1().Tiers().Update(context.TODO(), t, metav1.UpdateOptions{}) - // Attempt to update Tier after a backoff. - if err != nil { - klog.Warningf("Failed to update %s Tier on init: %v. Retry attempt: %d", t.Name, err, retryAttempt) - // Tier update may fail because antrea APIService is not yet ready - // to accept requests for validation. Retry fixed number of times - // not exceeding 8s. - time.Sleep(backoff) - backoff *= 2 - if backoff > maxBackoffTime { - backoff = maxBackoffTime - } - retryAttempt += 1 - continue - } - return +func (n *NetworkPolicyController) createTier(ctx context.Context, t *secv1beta1.Tier) error { + klog.V(2).InfoS("Creating system Tier", "tier", klog.KObj(t)) + if _, err := n.crdClient.CrdV1beta1().Tiers().Create(ctx, t, metav1.CreateOptions{}); err != nil { + return err + } + klog.InfoS("Created system Tier", "tier", klog.KObj(t)) + return nil +} + +func (n *NetworkPolicyController) updateTier(ctx context.Context, t *secv1beta1.Tier) error { + klog.V(2).InfoS("Updating system Tier", "tier", klog.KObj(t)) + if _, err := n.crdClient.CrdV1beta1().Tiers().Update(ctx, t, metav1.UpdateOptions{}); err != nil { + return err } + klog.InfoS("Updated system Tier", "tier", klog.KObj(t)) + return nil } diff --git a/pkg/controller/networkpolicy/tier_test.go b/pkg/controller/networkpolicy/tier_test.go index bcbe2fc549c..f7f1f768629 100644 --- a/pkg/controller/networkpolicy/tier_test.go +++ b/pkg/controller/networkpolicy/tier_test.go @@ -15,9 +15,12 @@ package networkpolicy import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -27,61 +30,124 @@ import ( "antrea.io/antrea/pkg/client/clientset/versioned/fake" ) -func TestInitTier(t *testing.T) { - testTier := &secv1beta1.Tier{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: secv1beta1.TierSpec{ - Priority: 10, - }, +func TestInitializeTier(t *testing.T) { + makeTestTier := func(priority int32) *secv1beta1.Tier { + return &secv1beta1.Tier{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: secv1beta1.TierSpec{ + Priority: priority, + }, + } } + testTier := makeTestTier(10) + tests := []struct { - name string - reactor k8stesting.ReactionFunc - expectedCalled int + name string + createReactor k8stesting.ReactionFunc + updateReactor k8stesting.ReactionFunc + existingTier *secv1beta1.Tier + createExpectedCalls int + updateExpectedCalls int }{ { - name: "create successfully", - expectedCalled: 1, + name: "create successful", + createExpectedCalls: 1, }, { - name: "tier already exists", - reactor: func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { - return true, nil, errors.NewAlreadyExists(action.GetResource().GroupResource(), testTier.Name) - }, - expectedCalled: 1, + name: "create error", + createReactor: func() k8stesting.ReactionFunc { + curFailureCount := 0 + return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + if curFailureCount < 1 { + curFailureCount += 1 + return true, nil, errors.NewServiceUnavailable("unknown reason") + } + return false, nil, nil + } + }(), + createExpectedCalls: 2, }, { - name: "transient error", - reactor: func() k8stesting.ReactionFunc { + name: "update successful", + existingTier: makeTestTier(5), + updateExpectedCalls: 1, + }, + { + name: "update error", + updateReactor: func() k8stesting.ReactionFunc { curFailureCount := 0 - maxFailureCount := 1 return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { - if curFailureCount < maxFailureCount { + if curFailureCount < 1 { curFailureCount += 1 return true, nil, errors.NewServiceUnavailable("unknown reason") } return false, nil, nil } }(), - expectedCalled: 2, + existingTier: makeTestTier(5), + updateExpectedCalls: 2, + }, + { + name: "no change needed", + existingTier: makeTestTier(10), }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - _, c := newController(nil, nil) - if tc.reactor != nil { - c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.reactor) + ctx := context.Background() + crdObjects := []runtime.Object{} + if tc.existingTier != nil { + crdObjects = append(crdObjects, tc.existingTier) + } + _, c := newController(nil, crdObjects) + stopCh := make(chan struct{}) + defer close(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + if tc.createReactor != nil { + c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.createReactor) + } + if tc.updateReactor != nil { + c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", tc.updateReactor) } - createCalled := 0 + createCalls := 0 c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) { - createCalled += 1 + createCalls += 1 return false, nil, nil }) - c.initTier(testTier) - assert.Equal(t, tc.expectedCalled, createCalled) + updateCalls := 0 + c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateCalls += 1 + return false, nil, nil + }) + // Prevent test from hanging in case of issue. + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + require.NoError(t, c.initializeTier(ctx, testTier)) + assert.Equal(t, tc.createExpectedCalls, createCalls) + assert.Equal(t, tc.updateExpectedCalls, updateCalls) }) } } + +func TestInitializeTiers(t *testing.T) { + ctx := context.Background() + + _, c := newController(nil, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + // All system Tiers should be created on the first try, so we can use a small timeout. + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + require.NoError(t, c.InitializeTiers(ctx)) + tiers, err := c.crdClient.CrdV1beta1().Tiers().List(ctx, metav1.ListOptions{}) + require.NoError(t, err) + assert.Len(t, tiers.Items, len(systemGeneratedTiers)) +}