diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index cf04832698d..fc4615fbcfb 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/informers" @@ -323,7 +324,14 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) { // Install a post start hook to initialize Tiers on start-up s.AddPostStartHook("initialize-tiers", func(context genericapiserver.PostStartHookContext) error { - go c.networkPolicyController.InitializeTiers() + go func() { + ctx, cancel := wait.ContextForChannel(context.StopCh) + defer cancel() + // context gets cancelled when the server stops. + if err := c.networkPolicyController.InitializeTiers(ctx); err != nil { + klog.ErrorS(err, "Failed to initialize system Tiers") + } + }() return nil }) } diff --git a/pkg/controller/networkpolicy/tier.go b/pkg/controller/networkpolicy/tier.go index 66fd318ff63..637a1344470 100644 --- a/pkg/controller/networkpolicy/tier.go +++ b/pkg/controller/networkpolicy/tier.go @@ -19,11 +19,13 @@ package networkpolicy import ( "context" + "fmt" "time" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" @@ -123,83 +125,89 @@ var ( // will first attempt to retrieve the Tier by it's name from K8s and if missing, // create the CR. InitializeTiers will be called as part of a Post-Start hook // of antrea-controller's APIServer. -func (n *NetworkPolicyController) InitializeTiers() { +func (n *NetworkPolicyController) InitializeTiers(ctx context.Context) error { + if !cache.WaitForCacheSync(ctx.Done(), n.tierListerSynced) { + // This happens when Done is closed because we are shutting down. + return fmt.Errorf("caches not synced for system Tier initialization") + } for _, t := range systemGeneratedTiers { - // Check if Tier is already present. - oldTier, err := n.tierLister.Get(t.Name) - if err == nil { - // Tier is already present. - klog.V(2).Infof("%s Tier already created", t.Name) - // Update Tier Priority if it is not set to desired Priority. - expPrio := priorityMap[t.Name] - if oldTier.Spec.Priority != expPrio { - tToUpdate := oldTier.DeepCopy() - tToUpdate.Spec.Priority = expPrio - n.updateTier(tToUpdate) - } - continue + if err := n.initializeTier(ctx, t); err != nil { + return err } - n.initTier(t) } + return nil } -// initTier attempts to create system Tiers until they are created using an -// exponential backoff period from 1 to max of 8secs. -func (n *NetworkPolicyController) initTier(t *secv1beta1.Tier) { - var err error - const maxBackoffTime = 8 * time.Second - backoff := 1 * time.Second +func (n *NetworkPolicyController) initializeTier(ctx context.Context, t *secv1beta1.Tier) error { + // Tier creation or update may fail because antrea APIService is not yet ready to accept + // requests for validation. We will keep retrying until it succeeds, using an exponential + // backoff (not exceeding 8s), unless the context is cancelled. + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Jitter: 0.0, + Steps: 3, // max duration of 8s + } retryAttempt := 1 for { - klog.V(2).InfoS("Creating system Tier", "tier", t.Name) - _, err = n.crdClient.CrdV1beta1().Tiers().Create(context.TODO(), t, metav1.CreateOptions{}) - // Attempt to recreate Tier after a backoff only if it does not exist. - if err != nil { - if errors.IsAlreadyExists(err) { - klog.InfoS("System Tier already exists", "tier", t.Name) - return + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if success := func() bool { + // Check if Tier is already present. + if oldTier, err := n.tierLister.Get(t.Name); err == nil { + // Tier is already present. + klog.V(2).InfoS("Tier already exists", "tier", klog.KObj(t)) + // Update Tier Priority if it is not set to desired Priority. + expPrio := t.Spec.Priority + if oldTier.Spec.Priority == expPrio { + return true + } + tToUpdate := oldTier.DeepCopy() + tToUpdate.Spec.Priority = expPrio + if err := n.updateTier(ctx, tToUpdate); err != nil { + klog.InfoS("Failed to update system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err) + return false + } + return true } - klog.InfoS("Failed to create system Tier on init, will retry", "tier", t.Name, "attempts", retryAttempt, "err", err) - // Tier creation may fail because antrea APIService is not yet ready - // to accept requests for validation. Retry fixed number of times - // not exceeding 8s. - time.Sleep(backoff) - backoff *= 2 - if backoff > maxBackoffTime { - backoff = maxBackoffTime + if err := n.createTier(ctx, t); err != nil { + // Error may be that the Tier already exists, in this case, we will + // call tierLister.Get again and compare priorities. + klog.InfoS("Failed to create system Tier on init, will retry", "tier", klog.KObj(t), "attempts", retryAttempt, "err", err) + return false } - retryAttempt += 1 - continue + return true + }(); success { + break + } + retryAttempt += 1 + waitBeforeRetry := backoff.Step() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(waitBeforeRetry): } - klog.InfoS("Created system Tier", "tier", t.Name) - return } + return nil } -// updateTier attempts to update Tiers using an -// exponential backoff period from 1 to max of 8secs. -func (n *NetworkPolicyController) updateTier(t *secv1beta1.Tier) { - var err error - const maxBackoffTime = 8 * time.Second - backoff := 1 * time.Second - retryAttempt := 1 - for { - klog.V(2).Infof("Updating %s Tier", t.Name) - _, err = n.crdClient.CrdV1beta1().Tiers().Update(context.TODO(), t, metav1.UpdateOptions{}) - // Attempt to update Tier after a backoff. - if err != nil { - klog.Warningf("Failed to update %s Tier on init: %v. Retry attempt: %d", t.Name, err, retryAttempt) - // Tier update may fail because antrea APIService is not yet ready - // to accept requests for validation. Retry fixed number of times - // not exceeding 8s. - time.Sleep(backoff) - backoff *= 2 - if backoff > maxBackoffTime { - backoff = maxBackoffTime - } - retryAttempt += 1 - continue - } - return +func (n *NetworkPolicyController) createTier(ctx context.Context, t *secv1beta1.Tier) error { + klog.V(2).InfoS("Creating system Tier", "tier", klog.KObj(t)) + if _, err := n.crdClient.CrdV1beta1().Tiers().Create(ctx, t, metav1.CreateOptions{}); err != nil { + return err + } + klog.InfoS("Created system Tier", "tier", klog.KObj(t)) + return nil +} + +func (n *NetworkPolicyController) updateTier(ctx context.Context, t *secv1beta1.Tier) error { + klog.V(2).InfoS("Updating system Tier", "tier", klog.KObj(t)) + if _, err := n.crdClient.CrdV1beta1().Tiers().Update(ctx, t, metav1.UpdateOptions{}); err != nil { + return err } + klog.InfoS("Updated system Tier", "tier", klog.KObj(t)) + return nil } diff --git a/pkg/controller/networkpolicy/tier_test.go b/pkg/controller/networkpolicy/tier_test.go index bcbe2fc549c..f7f1f768629 100644 --- a/pkg/controller/networkpolicy/tier_test.go +++ b/pkg/controller/networkpolicy/tier_test.go @@ -15,9 +15,12 @@ package networkpolicy import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -27,61 +30,124 @@ import ( "antrea.io/antrea/pkg/client/clientset/versioned/fake" ) -func TestInitTier(t *testing.T) { - testTier := &secv1beta1.Tier{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: secv1beta1.TierSpec{ - Priority: 10, - }, +func TestInitializeTier(t *testing.T) { + makeTestTier := func(priority int32) *secv1beta1.Tier { + return &secv1beta1.Tier{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: secv1beta1.TierSpec{ + Priority: priority, + }, + } } + testTier := makeTestTier(10) + tests := []struct { - name string - reactor k8stesting.ReactionFunc - expectedCalled int + name string + createReactor k8stesting.ReactionFunc + updateReactor k8stesting.ReactionFunc + existingTier *secv1beta1.Tier + createExpectedCalls int + updateExpectedCalls int }{ { - name: "create successfully", - expectedCalled: 1, + name: "create successful", + createExpectedCalls: 1, }, { - name: "tier already exists", - reactor: func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { - return true, nil, errors.NewAlreadyExists(action.GetResource().GroupResource(), testTier.Name) - }, - expectedCalled: 1, + name: "create error", + createReactor: func() k8stesting.ReactionFunc { + curFailureCount := 0 + return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + if curFailureCount < 1 { + curFailureCount += 1 + return true, nil, errors.NewServiceUnavailable("unknown reason") + } + return false, nil, nil + } + }(), + createExpectedCalls: 2, }, { - name: "transient error", - reactor: func() k8stesting.ReactionFunc { + name: "update successful", + existingTier: makeTestTier(5), + updateExpectedCalls: 1, + }, + { + name: "update error", + updateReactor: func() k8stesting.ReactionFunc { curFailureCount := 0 - maxFailureCount := 1 return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { - if curFailureCount < maxFailureCount { + if curFailureCount < 1 { curFailureCount += 1 return true, nil, errors.NewServiceUnavailable("unknown reason") } return false, nil, nil } }(), - expectedCalled: 2, + existingTier: makeTestTier(5), + updateExpectedCalls: 2, + }, + { + name: "no change needed", + existingTier: makeTestTier(10), }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - _, c := newController(nil, nil) - if tc.reactor != nil { - c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.reactor) + ctx := context.Background() + crdObjects := []runtime.Object{} + if tc.existingTier != nil { + crdObjects = append(crdObjects, tc.existingTier) + } + _, c := newController(nil, crdObjects) + stopCh := make(chan struct{}) + defer close(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + if tc.createReactor != nil { + c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.createReactor) + } + if tc.updateReactor != nil { + c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", tc.updateReactor) } - createCalled := 0 + createCalls := 0 c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) { - createCalled += 1 + createCalls += 1 return false, nil, nil }) - c.initTier(testTier) - assert.Equal(t, tc.expectedCalled, createCalled) + updateCalls := 0 + c.crdClient.(*fake.Clientset).PrependReactor("update", "tiers", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateCalls += 1 + return false, nil, nil + }) + // Prevent test from hanging in case of issue. + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + require.NoError(t, c.initializeTier(ctx, testTier)) + assert.Equal(t, tc.createExpectedCalls, createCalls) + assert.Equal(t, tc.updateExpectedCalls, updateCalls) }) } } + +func TestInitializeTiers(t *testing.T) { + ctx := context.Background() + + _, c := newController(nil, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + // All system Tiers should be created on the first try, so we can use a small timeout. + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + require.NoError(t, c.InitializeTiers(ctx)) + tiers, err := c.crdClient.CrdV1beta1().Tiers().List(ctx, metav1.ListOptions{}) + require.NoError(t, err) + assert.Len(t, tiers.Items, len(systemGeneratedTiers)) +}