From f29adb50f3053df9eba5c9bb8f1afe02bd6400e5 Mon Sep 17 00:00:00 2001 From: Dyanngg Date: Thu, 14 Dec 2023 11:21:02 -0800 Subject: [PATCH] Add policy processed verification for networkpolicyanalysis Signed-off-by: Dyanngg --- pkg/antctl/command_definition_test.go | 27 +-- .../handlers/endpoint/handler_test.go | 13 +- .../handlers/networkpolicyanalysis/handler.go | 19 +- .../networkpolicyanalysis/handler_test.go | 78 ++++---- .../networkpolicy/adminnetworkpolicy_test.go | 4 +- .../networkpolicy/antreanetworkpolicy_test.go | 8 +- .../networkpolicy/clustergroup_test.go | 16 +- .../clusternetworkpolicy_test.go | 14 +- .../networkpolicy/crd_utils_test.go | 4 +- .../networkpolicy/endpoint_querier.go | 35 ++-- .../networkpolicy/endpoint_querier_test.go | 8 +- pkg/controller/networkpolicy/group_test.go | 10 +- pkg/controller/networkpolicy/mutate_test.go | 4 +- .../networkpolicy/networkpolicy_controller.go | 55 +++++- .../networkpolicy_controller_perf_test.go | 4 +- .../networkpolicy_controller_test.go | 174 +++++++++++++++--- .../testing/mock_networkpolicy.go | 15 ++ pkg/controller/networkpolicy/tier_test.go | 2 +- pkg/controller/networkpolicy/validate_test.go | 12 +- 19 files changed, 355 insertions(+), 147 deletions(-) diff --git a/pkg/antctl/command_definition_test.go b/pkg/antctl/command_definition_test.go index 1f4acbce761..cc14eed2662 100644 --- a/pkg/antctl/command_definition_test.go +++ b/pkg/antctl/command_definition_test.go @@ -40,6 +40,7 @@ import ( "antrea.io/antrea/pkg/antctl/transform/controllerinfo" "antrea.io/antrea/pkg/antctl/transform/networkpolicy" "antrea.io/antrea/pkg/antctl/transform/version" + "antrea.io/antrea/pkg/apis/controlplane" cpv1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/apis/crd/v1beta1" controllernetworkpolicy "antrea.io/antrea/pkg/controller/networkpolicy" @@ -909,8 +910,8 @@ func TestGetRequestErrorFallback(t *testing.T) { } func TestTableOutputForQueryEndpoint(t *testing.T) { - policyRef0 := controllernetworkpolicy.PolicyRef{Namespace: "testNamespace", Name: "test-ingress-egress", UID: "uid-1"} - policyRef1 := controllernetworkpolicy.PolicyRef{Namespace: "testNamespace", Name: "default-deny-egress", UID: "uid-2"} + policyRef0 := controlplane.NetworkPolicyReference{Namespace: "testNamespace", Name: "test-ingress-egress", UID: "uid-1"} + policyRef1 := controlplane.NetworkPolicyReference{Namespace: "testNamespace", Name: "default-deny-egress", UID: "uid-2"} tc := []struct { name string rawResponseData interface{} @@ -939,10 +940,10 @@ Ingress Rules: None { Namespace: "testNamespace", Name: "podA", - Policies: []controllernetworkpolicy.Policy{{PolicyRef: policyRef0}}, + Policies: []controllernetworkpolicy.Policy{{NetworkPolicyReference: policyRef0}}, Rules: []controllernetworkpolicy.Rule{ - {PolicyRef: policyRef0, Direction: cpv1beta.DirectionOut, RuleIndex: 0}, - {PolicyRef: policyRef0, Direction: cpv1beta.DirectionIn, RuleIndex: 0}, + {NetworkPolicyReference: policyRef0, Direction: cpv1beta.DirectionOut, RuleIndex: 0}, + {NetworkPolicyReference: policyRef0, Direction: cpv1beta.DirectionIn, RuleIndex: 0}, }, }, }, @@ -970,12 +971,12 @@ test-ingress-egress testNamespace 0 uid-1 Namespace: "testNamespace", Name: "podA", Policies: []controllernetworkpolicy.Policy{ - {PolicyRef: policyRef0}, - {PolicyRef: policyRef1}, + {NetworkPolicyReference: policyRef0}, + {NetworkPolicyReference: policyRef1}, }, Rules: []controllernetworkpolicy.Rule{ - {PolicyRef: policyRef0, Direction: cpv1beta.DirectionOut, RuleIndex: 0}, - {PolicyRef: policyRef0, Direction: cpv1beta.DirectionIn, RuleIndex: 0}, + {NetworkPolicyReference: policyRef0, Direction: cpv1beta.DirectionOut, RuleIndex: 0}, + {NetworkPolicyReference: policyRef0, Direction: cpv1beta.DirectionIn, RuleIndex: 0}, }, }, }, @@ -1009,7 +1010,7 @@ test-ingress-egress testNamespace 0 uid-1 } func TestTableOutputForQueryNetworkPolicyAnalysis(t *testing.T) { - policyRef0 := controllernetworkpolicy.PolicyRef{Namespace: "testNamespace", Name: "test-default-deny", UID: "uid-1"} + policyRef0 := controlplane.NetworkPolicyReference{Namespace: "testNamespace", Name: "test-default-deny", UID: "uid-1"} tc := []struct { name string rawResponseData interface{} @@ -1023,9 +1024,9 @@ func TestTableOutputForQueryNetworkPolicyAnalysis(t *testing.T) { { name: "Matched KNP default drop rule", rawResponseData: &controllernetworkpolicy.Rule{ - PolicyRef: policyRef0, - Direction: cpv1beta.DirectionIn, - RuleIndex: -1, + NetworkPolicyReference: policyRef0, + Direction: cpv1beta.DirectionIn, + RuleIndex: -1, }, expected: `Name Namespace RuleIndex PolicyUID Direction test-default-deny testNamespace -1 uid-1 In diff --git a/pkg/apiserver/handlers/endpoint/handler_test.go b/pkg/apiserver/handlers/endpoint/handler_test.go index 7b63546b2fa..1c97e7a39e0 100644 --- a/pkg/apiserver/handlers/endpoint/handler_test.go +++ b/pkg/apiserver/handlers/endpoint/handler_test.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/admission/v1" "k8s.io/apimachinery/pkg/api/errors" + "antrea.io/antrea/pkg/apis/controlplane" "antrea.io/antrea/pkg/controller/networkpolicy" queriermock "antrea.io/antrea/pkg/controller/networkpolicy/testing" ) @@ -57,7 +58,7 @@ var responses = []response{ { Policies: []networkpolicy.Policy{ { - PolicyRef: networkpolicy.PolicyRef{Name: "policy1"}, + NetworkPolicyReference: controlplane.NetworkPolicyReference{Name: "policy1"}, }, }, }, @@ -70,10 +71,10 @@ var responses = []response{ { Policies: []networkpolicy.Policy{ { - PolicyRef: networkpolicy.PolicyRef{Name: "policy1"}, + NetworkPolicyReference: controlplane.NetworkPolicyReference{Name: "policy1"}, }, { - PolicyRef: networkpolicy.PolicyRef{Name: "policy2"}, + NetworkPolicyReference: controlplane.NetworkPolicyReference{Name: "policy2"}, }, }, }, @@ -148,7 +149,7 @@ func TestSinglePolicyResponse(t *testing.T) { { Policies: []networkpolicy.Policy{ { - PolicyRef: networkpolicy.PolicyRef{Name: "policy1"}, + NetworkPolicyReference: controlplane.NetworkPolicyReference{Name: "policy1"}, }, }, }, @@ -181,10 +182,10 @@ func TestMultiPolicyResponse(t *testing.T) { { Policies: []networkpolicy.Policy{ { - PolicyRef: networkpolicy.PolicyRef{Name: "policy1"}, + NetworkPolicyReference: controlplane.NetworkPolicyReference{Name: "policy1"}, }, { - PolicyRef: networkpolicy.PolicyRef{Name: "policy2"}, + NetworkPolicyReference: controlplane.NetworkPolicyReference{Name: "policy2"}, }, }, }, diff --git a/pkg/apiserver/handlers/networkpolicyanalysis/handler.go b/pkg/apiserver/handlers/networkpolicyanalysis/handler.go index ef38be22a06..74d3a7a41b3 100644 --- a/pkg/apiserver/handlers/networkpolicyanalysis/handler.go +++ b/pkg/apiserver/handlers/networkpolicyanalysis/handler.go @@ -135,13 +135,9 @@ func predictEndpointsRules(srcEndpoints, dstEndpoints *networkpolicy.EndpointRul return &networkpolicy.Rule{} } return &networkpolicy.Rule{ - PolicyRef: networkpolicy.PolicyRef{ - Namespace: commonRule.Policy.SourceRef.Namespace, - Name: commonRule.Policy.SourceRef.Name, - UID: commonRule.Policy.SourceRef.UID, - }, - Direction: commonRule.Direction, - RuleIndex: commonRule.Index, + NetworkPolicyReference: *commonRule.Policy.SourceRef, + Direction: commonRule.Direction, + RuleIndex: commonRule.Index, } } @@ -159,7 +155,14 @@ func HandleFunc(eq networkpolicy.EndpointQuerier) http.HandlerFunc { http.Error(w, "invalid command argument format", http.StatusBadRequest) return } - + if policyProcessed, err := eq.VerifyPoliciesProcessed(); !policyProcessed || err != nil { + if !policyProcessed { + http.Error(w, "policies in the cluster have not been fully processed by Antrea, please retry later", http.StatusTooEarly) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } // query endpoints and handle response errors endpointAnalysisSource, err := eq.QueryNetworkPolicyRules(srcNS, srcPod) if err != nil { diff --git a/pkg/apiserver/handlers/networkpolicyanalysis/handler_test.go b/pkg/apiserver/handlers/networkpolicyanalysis/handler_test.go index f270f94fc44..e78707c4619 100644 --- a/pkg/apiserver/handlers/networkpolicyanalysis/handler_test.go +++ b/pkg/apiserver/handlers/networkpolicyanalysis/handler_test.go @@ -66,7 +66,7 @@ func TestIncompleteArguments(t *testing.T) { }, { name: "Default namespaces", - handlerRequest: "?source=pod1&destination=/pod2", + handlerRequest: "?source=pod1&destination=pod2", argsMock: []string{"default", "pod1", "default", "pod2"}, mockQueryResponse: []mockResponse{{response: nil, error: nil}, {response: nil, error: nil}}, expectedStatus: http.StatusOK, @@ -130,18 +130,29 @@ func TestNetworkPolicyAnalysis(t *testing.T) { mockKNPEgressRules := generateRuleInfo(uid1, controlplane.K8sNetworkPolicy, controlplane.DirectionOut, nil, &defaultPriority, 1) mockKNPIngressRules := generateRuleInfo(uid2, controlplane.K8sNetworkPolicy, controlplane.DirectionIn, nil, &defaultPriority, 1) expectedRuleEgress := networkpolicy.Rule{ - PolicyRef: networkpolicy.PolicyRef{Namespace: namespace, Name: "Policy111", UID: uid1}, - Direction: v1beta2.DirectionOut, - RuleIndex: 0, + NetworkPolicyReference: controlplane.NetworkPolicyReference{Type: controlplane.AntreaNetworkPolicy, Namespace: namespace, Name: "Policy111", UID: uid1}, + Direction: v1beta2.DirectionOut, + RuleIndex: 0, } - expectedRuleIngress := networkpolicy.Rule{ - PolicyRef: networkpolicy.PolicyRef{Namespace: namespace, Name: "Policy222", UID: uid2}, - Direction: v1beta2.DirectionIn, - RuleIndex: 0, + expectedRuleAntreaPolicyIngress := networkpolicy.Rule{ + NetworkPolicyReference: controlplane.NetworkPolicyReference{Type: controlplane.AntreaNetworkPolicy, Namespace: namespace, Name: "Policy222", UID: uid2}, + Direction: v1beta2.DirectionIn, + RuleIndex: 0, } - expectedRuleKNPEgressIsolation := expectedRuleEgress + expectedRuleKNPIngress := networkpolicy.Rule{ + NetworkPolicyReference: controlplane.NetworkPolicyReference{Type: controlplane.K8sNetworkPolicy, Namespace: namespace, Name: "Policy222", UID: uid2}, + Direction: v1beta2.DirectionIn, + RuleIndex: 0, + } + expectedRuleKNPEgress := networkpolicy.Rule{ + NetworkPolicyReference: controlplane.NetworkPolicyReference{Type: controlplane.K8sNetworkPolicy, Namespace: namespace, Name: "Policy111", UID: uid1}, + Direction: v1beta2.DirectionOut, + RuleIndex: 0, + } + + expectedRuleKNPEgressIsolation := expectedRuleKNPEgress expectedRuleKNPEgressIsolation.RuleIndex = -1 - expectedRuleKNPIngressIsolation := expectedRuleIngress + expectedRuleKNPIngressIsolation := expectedRuleKNPIngress expectedRuleKNPIngressIsolation.RuleIndex = -1 testCases := []TestCase{ @@ -154,7 +165,7 @@ func TestNetworkPolicyAnalysis(t *testing.T) { {response: generateResponse(2, uid2, generateRuleInfo(uid1, controlplane.AntreaNetworkPolicy, controlplane.DirectionOut, &networkpolicy.DefaultTierPriority, nil, 1), nil)}, }, expectedStatus: http.StatusOK, - expectedResult: &expectedRuleIngress, + expectedResult: &expectedRuleAntreaPolicyIngress, }, { name: "Different policy priorities", @@ -176,7 +187,7 @@ func TestNetworkPolicyAnalysis(t *testing.T) { {response: generateResponse(2, uid2, generateRuleInfo(uid1, controlplane.AntreaNetworkPolicy, controlplane.DirectionOut, &networkpolicy.DefaultTierPriority, &priority1, 0), nil)}, }, expectedStatus: http.StatusOK, - expectedResult: &expectedRuleIngress, + expectedResult: &expectedRuleAntreaPolicyIngress, }, { name: "Different policy names", @@ -199,7 +210,7 @@ func TestNetworkPolicyAnalysis(t *testing.T) { mockKNPIngressRules)}, }, expectedStatus: http.StatusOK, - expectedResult: &expectedRuleIngress, + expectedResult: &expectedRuleKNPIngress, }, { name: "KNP and default isolation", @@ -210,7 +221,7 @@ func TestNetworkPolicyAnalysis(t *testing.T) { {response: generateResponse(2, uid2, mockKNPEgressRules, nil)}, }, expectedStatus: http.StatusOK, - expectedResult: &expectedRuleEgress, + expectedResult: &expectedRuleKNPEgress, }, { name: "KNP egress default isolation", @@ -260,26 +271,29 @@ func TestNetworkPolicyAnalysis(t *testing.T) { // argsMock has at least 4 entries and mockQueryResponse has at least 2 entries if not expecting bad request. func evaluateTestCases(testCases []TestCase, mockCtrl *gomock.Controller, t *testing.T) { for _, tc := range testCases { - mockQuerier := queriermock.NewMockEndpointQuerier(mockCtrl) - if tc.expectedStatus != http.StatusBadRequest { - mockQuerier.EXPECT().QueryNetworkPolicyRules(tc.argsMock[0], tc.argsMock[1]).Return(tc.mockQueryResponse[0].response, tc.mockQueryResponse[0].error) - mockQuerier.EXPECT().QueryNetworkPolicyRules(tc.argsMock[2], tc.argsMock[3]).Return(tc.mockQueryResponse[1].response, tc.mockQueryResponse[1].error) - } + t.Run(tc.name, func(t *testing.T) { + mockQuerier := queriermock.NewMockEndpointQuerier(mockCtrl) + if tc.expectedStatus != http.StatusBadRequest { + mockQuerier.EXPECT().VerifyPoliciesProcessed().Return(true, nil) + mockQuerier.EXPECT().QueryNetworkPolicyRules(tc.argsMock[0], tc.argsMock[1]).Return(tc.mockQueryResponse[0].response, tc.mockQueryResponse[0].error) + mockQuerier.EXPECT().QueryNetworkPolicyRules(tc.argsMock[2], tc.argsMock[3]).Return(tc.mockQueryResponse[1].response, tc.mockQueryResponse[1].error) + } - handler := HandleFunc(mockQuerier) - req, err := http.NewRequest(http.MethodGet, tc.handlerRequest, nil) - assert.Nil(t, err) + handler := HandleFunc(mockQuerier) + req, err := http.NewRequest(http.MethodGet, tc.handlerRequest, nil) + assert.Nil(t, err) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - assert.Equal(t, tc.expectedStatus, recorder.Code) - if tc.expectedStatus != http.StatusOK { - return - } + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + assert.Equal(t, tc.expectedStatus, recorder.Code) + if tc.expectedStatus != http.StatusOK { + return + } - var received networkpolicy.Rule - err = json.Unmarshal(recorder.Body.Bytes(), &received) - assert.Nil(t, err) - assert.EqualValues(t, *tc.expectedResult, received) + var received networkpolicy.Rule + err = json.Unmarshal(recorder.Body.Bytes(), &received) + assert.Nil(t, err) + assert.EqualValues(t, *tc.expectedResult, received) + }) } } diff --git a/pkg/controller/networkpolicy/adminnetworkpolicy_test.go b/pkg/controller/networkpolicy/adminnetworkpolicy_test.go index c7dea83f135..09164fd218f 100644 --- a/pkg/controller/networkpolicy/adminnetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/adminnetworkpolicy_test.go @@ -416,7 +416,7 @@ func TestProcessAdminNetworkPolicy(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, features.AdminNetworkPolicy, true) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processAdminNetworkPolicy(tt.inputPolicy) assert.Equal(t, tt.expectedPolicy.UID, actualPolicy.UID) assert.Equal(t, tt.expectedPolicy.Name, actualPolicy.Name) @@ -712,7 +712,7 @@ func TestProcessBaselineAdminNetworkPolicy(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, features.AdminNetworkPolicy, true) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processBaselineAdminNetworkPolicy(tt.inputPolicy) assert.Equal(t, tt.expectedPolicy.UID, actualPolicy.UID) assert.Equal(t, tt.expectedPolicy.Name, actualPolicy.Name) diff --git a/pkg/controller/networkpolicy/antreanetworkpolicy_test.go b/pkg/controller/networkpolicy/antreanetworkpolicy_test.go index 62554de4402..e03a7ee832e 100644 --- a/pkg/controller/networkpolicy/antreanetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/antreanetworkpolicy_test.go @@ -739,7 +739,7 @@ func TestProcessAntreaNetworkPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.serviceStore.Add(&svcA) actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processAntreaNetworkPolicy(tt.inputPolicy) assert.Equal(t, tt.expectedPolicy, actualPolicy) @@ -750,7 +750,7 @@ func TestProcessAntreaNetworkPolicy(t *testing.T) { } func TestAddANNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) annp := getANNP() npc.addANNP(annp) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -761,7 +761,7 @@ func TestAddANNP(t *testing.T) { } func TestUpdateANNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) annp := getANNP() newANNP := annp.DeepCopy() // Make a change to the ANNP. @@ -775,7 +775,7 @@ func TestUpdateANNP(t *testing.T) { } func TestDeleteANNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) annp := getANNP() npc.deleteANNP(annp) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) diff --git a/pkg/controller/networkpolicy/clustergroup_test.go b/pkg/controller/networkpolicy/clustergroup_test.go index e957d024318..8e3c3e29a2a 100644 --- a/pkg/controller/networkpolicy/clustergroup_test.go +++ b/pkg/controller/networkpolicy/clustergroup_test.go @@ -167,7 +167,7 @@ func TestProcessClusterGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) actualGroup := c.processClusterGroup(tt.inputGroup) assert.Equal(t, tt.expectedGroup, actualGroup) }) @@ -269,7 +269,7 @@ func TestAddClusterGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(tt.inputGroup) key := tt.inputGroup.Name actualGroupObj, _, _ := npc.internalGroupStore.Get(key) @@ -418,7 +418,7 @@ func TestUpdateClusterGroup(t *testing.T) { }, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(&testCG) key := testCG.Name for _, tt := range tests { @@ -440,7 +440,7 @@ func TestDeleteCG(t *testing.T) { }, } key := testCG.Name - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(&testCG) npc.deleteClusterGroup(&testCG) _, found, _ := npc.internalGroupStore.Get(key) @@ -584,7 +584,7 @@ func TestFilterInternalGroupsForService(t *testing.T) { sets.New[string]("cgC"), }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.internalGroupStore.Create(grp1) npc.internalGroupStore.Create(grp2) npc.internalGroupStore.Create(grp3) @@ -688,7 +688,7 @@ func TestServiceToGroupSelector(t *testing.T) { nil, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.serviceStore.Add(svc1) npc.serviceStore.Add(svc2) npc.serviceStore.Add(svc3) @@ -858,7 +858,7 @@ func TestGetAssociatedGroups(t *testing.T) { []antreatypes.Group{}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) for i := range testPods { npc.groupingInterface.AddPod(testPods[i]) } @@ -906,7 +906,7 @@ func TestGetClusterGroupMembers(t *testing.T) { controlplane.GroupMemberSet{}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) for i := range testPods { npc.groupingInterface.AddPod(testPods[i]) } diff --git a/pkg/controller/networkpolicy/clusternetworkpolicy_test.go b/pkg/controller/networkpolicy/clusternetworkpolicy_test.go index 08c40914817..4f4623c625f 100644 --- a/pkg/controller/networkpolicy/clusternetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/clusternetworkpolicy_test.go @@ -1689,7 +1689,7 @@ func TestProcessClusterNetworkPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.addClusterGroup(&cgA) c.cgStore.Add(&cgA) c.namespaceStore.Add(&nsA) @@ -1712,7 +1712,7 @@ func TestProcessClusterNetworkPolicy(t *testing.T) { } func TestAddCNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) cnp := getCNP() npc.addCNP(cnp) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -1723,7 +1723,7 @@ func TestAddCNP(t *testing.T) { } func TestUpdateCNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) cnp := getCNP() newCNP := cnp.DeepCopy() // Make a change to the CNP. @@ -1737,7 +1737,7 @@ func TestUpdateCNP(t *testing.T) { } func TestDeleteCNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) cnp := getCNP() npc.deleteCNP(cnp) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -1773,7 +1773,7 @@ func TestGetTierPriority(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) name := "" if tt.inputTier != nil { npc.tierStore.Add(tt.inputTier) @@ -1844,7 +1844,7 @@ func TestProcessRefGroupOrClusterGroup(t *testing.T) { }, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(&cgA) npc.addClusterGroup(&cgB) npc.addClusterGroup(&cgNested1) @@ -2105,7 +2105,7 @@ func TestFilterPerNamespaceRuleACNPsByNSLabels(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.acnpStore.Add(cnpWithSpecAppliedTo) c.acnpStore.Add(cnpWithRuleAppliedTo) c.acnpStore.Add(cnpMatchAllNamespaces) diff --git a/pkg/controller/networkpolicy/crd_utils_test.go b/pkg/controller/networkpolicy/crd_utils_test.go index a572e347c28..88b3ce145f0 100644 --- a/pkg/controller/networkpolicy/crd_utils_test.go +++ b/pkg/controller/networkpolicy/crd_utils_test.go @@ -472,7 +472,7 @@ func TestToAntreaPeerForCRD(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(&cgA) npc.cgStore.Add(&cgA) if tt.clusterSetScope { @@ -523,7 +523,7 @@ func TestCreateAppliedToGroupsForGroup(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "nsB", Name: "gB", UID: "uidB"}, Spec: crdv1beta1.GroupSpec{IPBlocks: []crdv1beta1.IPBlock{{CIDR: cidr}}}, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(clusterGroupWithSelector) npc.addClusterGroup(clusterGroupWithIPBlock) npc.addGroup(groupWithSelector) diff --git a/pkg/controller/networkpolicy/endpoint_querier.go b/pkg/controller/networkpolicy/endpoint_querier.go index 437af434f29..54327691fbf 100644 --- a/pkg/controller/networkpolicy/endpoint_querier.go +++ b/pkg/controller/networkpolicy/endpoint_querier.go @@ -36,6 +36,8 @@ type EndpointQuerier interface { QueryNetworkPolicies(namespace string, podName string) (*EndpointQueryResponse, error) // QueryNetworkPolicyRules returns the detailed rules in addition to QueryNetworkPolicies. QueryNetworkPolicyRules(namespace, podName string) (*EndpointRuleAnalysis, error) + // VerifyPoliciesProcessed validates whether all policies have been processed by the Antrea controller. + VerifyPoliciesProcessed() (bool, error) } // endpointQuerier implements the EndpointQuerier interface @@ -55,18 +57,12 @@ type Endpoint struct { Rules []Rule `json:"rules,omitempty"` } -type PolicyRef struct { - Namespace string `json:"namespace,omitempty"` - Name string `json:"name,omitempty"` - UID types.UID `json:"uid,omitempty"` -} - type Policy struct { - PolicyRef + controlplane.NetworkPolicyReference } type Rule struct { - PolicyRef + controlplane.NetworkPolicyReference Direction cpv1beta.Direction `json:"direction,omitempty"` RuleIndex int `json:"ruleindex,omitempty"` } @@ -105,7 +101,7 @@ func (eq *endpointQuerier) categorizeNetworkPolicies(groups map[grouping.GroupTy appliedToGroupKeys := groups[appliedToGroupType] // We iterate over all AppliedToGroups (same for AddressGroups below). This is acceptable // since this implementation only supports user queries (in particular through antctl) and - // should resturn within a reasonable amount of time. We experimented with adding Pod + // should return within a reasonable amount of time. We experimented with adding Pod // Indexers to the AppliedToGroup and AddressGroup stores, but we felt that this use case // did not justify the memory overhead. If we can find another use for the Indexers as part // of the NetworkPolicy Controller implementation, we may consider adding them back. @@ -185,11 +181,7 @@ func (eq *endpointQuerier) QueryNetworkPolicies(namespace string, podName string responsePolicies := make([]Policy, 0) for _, internalPolicy := range applied { responsePolicy := Policy{ - PolicyRef: PolicyRef{ - Namespace: internalPolicy.SourceRef.Namespace, - Name: internalPolicy.SourceRef.Name, - UID: internalPolicy.SourceRef.UID, - }, + NetworkPolicyReference: *internalPolicy.SourceRef, } responsePolicies = append(responsePolicies, responsePolicy) } @@ -197,13 +189,9 @@ func (eq *endpointQuerier) QueryNetworkPolicies(namespace string, podName string // create rules based on egress and ingress policies for _, internalPolicy := range append(egress, ingress...) { newRule := Rule{ - PolicyRef: PolicyRef{ - Namespace: internalPolicy.Policy.SourceRef.Namespace, - Name: internalPolicy.Policy.SourceRef.Name, - UID: internalPolicy.Policy.SourceRef.UID, - }, - Direction: internalPolicy.Direction, - RuleIndex: internalPolicy.Index, + NetworkPolicyReference: *internalPolicy.Policy.SourceRef, + Direction: internalPolicy.Direction, + RuleIndex: internalPolicy.Index, } responseRules = append(responseRules, newRule) } @@ -251,3 +239,8 @@ func (eq *endpointQuerier) QueryNetworkPolicyRules(namespace, podName string) (* } return &EndpointRuleAnalysis{namespace, podName, policyUIDs, ingressIsolation, egressIsolation, ingress, egress}, nil } + +// VerifyPoliciesProcessed validates whether all policies have been processed by the Antrea controller. +func (eq *endpointQuerier) VerifyPoliciesProcessed() (bool, error) { + return eq.networkPolicyController.verifyPoliciesProcessed() +} diff --git a/pkg/controller/networkpolicy/endpoint_querier_test.go b/pkg/controller/networkpolicy/endpoint_querier_test.go index d7aa7740c89..3dc049d6c3a 100644 --- a/pkg/controller/networkpolicy/endpoint_querier_test.go +++ b/pkg/controller/networkpolicy/endpoint_querier_test.go @@ -184,7 +184,7 @@ var namespaces = []*corev1.Namespace{ func makeControllerAndEndpointQuerier(objects ...runtime.Object) *endpointQuerier { // create controller - _, c := newController(objects, nil) + _, c := newController(nil, objects, nil) c.heartbeatCh = make(chan heartbeat, 1000) stopCh := make(chan struct{}) // create querier with stores inside controller @@ -216,9 +216,9 @@ func makeControllerAndEndpointQuerier(objects ...runtime.Object) *endpointQuerie } func TestQueryNetworkPolicies(t *testing.T) { - policyRef0 := PolicyRef{policies[0].Namespace, policies[0].Name, policies[0].UID} - policyRef1 := PolicyRef{policies[1].Namespace, policies[1].Name, policies[1].UID} - policyRef2 := PolicyRef{policies[2].Namespace, policies[2].Name, policies[2].UID} + policyRef0 := controlplane.NetworkPolicyReference{Type: controlplane.K8sNetworkPolicy, Namespace: policies[0].Namespace, Name: policies[0].Name, UID: policies[0].UID} + policyRef1 := controlplane.NetworkPolicyReference{Type: controlplane.K8sNetworkPolicy, Namespace: policies[1].Namespace, Name: policies[1].Name, UID: policies[1].UID} + policyRef2 := controlplane.NetworkPolicyReference{Type: controlplane.K8sNetworkPolicy, Namespace: policies[2].Namespace, Name: policies[2].Name, UID: policies[2].UID} testCases := []struct { name string diff --git a/pkg/controller/networkpolicy/group_test.go b/pkg/controller/networkpolicy/group_test.go index 153266c48a0..eb892633040 100644 --- a/pkg/controller/networkpolicy/group_test.go +++ b/pkg/controller/networkpolicy/group_test.go @@ -170,7 +170,7 @@ func TestProcessGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) actualGroup := c.processGroup(tt.inputGroup) assert.Equal(t, tt.expectedGroup, actualGroup) }) @@ -276,7 +276,7 @@ func TestAddGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addGroup(tt.inputGroup) key := fmt.Sprintf("%s/%s", tt.inputGroup.Namespace, tt.inputGroup.Name) actualGroupObj, _, _ := npc.internalGroupStore.Get(key) @@ -431,7 +431,7 @@ func TestUpdateGroup(t *testing.T) { }, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addGroup(&testG) key := fmt.Sprintf("%s/%s", testG.Namespace, testG.Name) for _, tt := range tests { @@ -453,7 +453,7 @@ func TestDeleteG(t *testing.T) { }, } key := fmt.Sprintf("%s/%s", testG.Namespace, testG.Name) - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addGroup(&testG) npc.deleteGroup(&testG) _, found, _ := npc.internalGroupStore.Get(key) @@ -570,7 +570,7 @@ func TestGetGroupMembers(t *testing.T) { controlplane.GroupMemberSet{}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) for i := range testPods { npc.groupingInterface.AddPod(testPods[i]) } diff --git a/pkg/controller/networkpolicy/mutate_test.go b/pkg/controller/networkpolicy/mutate_test.go index 4d0dc25e25c..18df01a5978 100644 --- a/pkg/controller/networkpolicy/mutate_test.go +++ b/pkg/controller/networkpolicy/mutate_test.go @@ -184,7 +184,7 @@ func TestMutateAntreaClusterNetworkPolicy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) mutator := NewNetworkPolicyMutator(controller.NetworkPolicyController) _, _, patch := mutator.mutateAntreaPolicy(tt.operation, tt.policy.Spec.Ingress, tt.policy.Spec.Egress, tt.policy.Spec.Tier) marshalExpPatch, _ := json.Marshal(tt.expectPatch) @@ -353,7 +353,7 @@ func TestMutateAntreaNetworkPolicy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) mutator := NewNetworkPolicyMutator(controller.NetworkPolicyController) _, _, patch := mutator.mutateAntreaPolicy(tt.operation, tt.policy.Spec.Ingress, tt.policy.Spec.Egress, tt.policy.Spec.Tier) marshalExpPatch, _ := json.Marshal(tt.expectPatch) diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 6c8f3b185f4..27666a4eddb 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -241,7 +241,7 @@ type NetworkPolicyController struct { // internalNetworkPolicyQueue maintains the networkpolicy.NetworkPolicy objects that // need to be synced. internalNetworkPolicyQueue workqueue.RateLimitingInterface - // internalGroupQueue maintains the networkpolicy.Group objects that needs to be + // internalGroupQueue maintains the networkpolicy.Group objects that need to be // synced. internalGroupQueue workqueue.RateLimitingInterface @@ -1670,6 +1670,59 @@ func (n *NetworkPolicyController) cleanupOrphanGroups(internalNetworkPolicy *ant } } +// verifyPoliciesProcessed checks that all the policy objects in the cluster supported by Antrea +// are already processed by the Antrea controller. It serves as a sanity check/prerequisite for +// the networkpolicyanalysis command, since the command uses controller cache as computation +// source. Note that: +// +// 1. This function blocks internal NP processing until it returns, which is acceptable given +// the rarity of invocation. +// 2. Verification does not guarantee the latest versions of policies are processed. +// On the K8s side policies could be concurrently updated, and those events will be processed +// after the function returns. +// 3. Verification is based on the fact that, as of now, processed internal NPs and original +// policy has a one-to-one relationship. +func (n *NetworkPolicyController) verifyPoliciesProcessed() (bool, error) { + n.internalNetworkPolicyMutex.Lock() + defer n.internalNetworkPolicyMutex.Unlock() + + numInternalNP := n.GetNetworkPolicyNum() + policyObjNum := 0 + if npList, err := n.networkPolicyLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(npList) + } + if features.DefaultFeatureGate.Enabled(features.AntreaPolicy) { + if acnpList, err := n.acnpLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(acnpList) + } + if annpList, err := n.annpLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(annpList) + } + } + if features.DefaultFeatureGate.Enabled(features.AdminNetworkPolicy) { + if anpList, err := n.adminNetworkPolicyLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(anpList) + } + if banpList, err := n.banpLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(banpList) + } + } + if numInternalNP != policyObjNum { + return false, fmt.Errorf("policy events have not been fully processed by the Antrea controller") + } + return true, nil +} + // ipStrToIPAddress converts an IP string to a controlplane.IPAddress. // nil will returned if the IP string is not valid. func ipStrToIPAddress(ip string) controlplane.IPAddress { diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go index fcc091a6e52..05cfd084ebc 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go @@ -241,7 +241,7 @@ func testComputeNetworkPolicy(t *testing.T, maxExecutionTime time.Duration, name } k8sObjs = append(k8sObjs, toRunTimeObjects(namespaces)...) - _, c := newController(k8sObjs, crdObjs) + _, c := newController(crdObjs, k8sObjs, nil) c.heartbeatCh = make(chan heartbeat, 1000) stopCh := make(chan struct{}) @@ -533,7 +533,7 @@ func BenchmarkSyncAddressGroup(b *testing.B) { objs = append(objs, pods...) stopCh := make(chan struct{}) defer close(stopCh) - _, c := newController(objs, nil) + _, c := newController(nil, objs, nil) c.informerFactory.Start(stopCh) c.crdInformerFactory.Start(stopCh) go c.groupingController.Run(stopCh) diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_test.go index 622f85c65bb..1970d518927 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_test.go @@ -41,6 +41,8 @@ import ( k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "sigs.k8s.io/network-policy-api/apis/v1alpha1" fakepolicyversioned "sigs.k8s.io/network-policy-api/pkg/client/clientset/versioned/fake" policyv1a1informers "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions" @@ -56,6 +58,7 @@ import ( "antrea.io/antrea/pkg/controller/labelidentity" "antrea.io/antrea/pkg/controller/networkpolicy/store" antreatypes "antrea.io/antrea/pkg/controller/types" + "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/util/externalnode" ) @@ -100,16 +103,17 @@ type networkPolicyController struct { internalNetworkPolicyStore storage.Interface informerFactory informers.SharedInformerFactory crdInformerFactory crdinformers.SharedInformerFactory + policyInformerFactory policyv1a1informers.SharedInformerFactory groupingController *grouping.GroupEntityController labelIdentityController *labelidentity.Controller } // objects is an initial set of K8s objects that is exposed through the client. -func newController(k8sObjects, crdObjects []runtime.Object) (*fake.Clientset, *networkPolicyController) { +func newController(crdObjects, k8sObjects, adminPolicyObjects []runtime.Object) (*fake.Clientset, *networkPolicyController) { client := newClientset(k8sObjects...) crdClient := fakeversioned.NewSimpleClientset(crdObjects...) mcsClient := fakemcsversioned.NewSimpleClientset() - policyClient := fakepolicyversioned.NewSimpleClientset() + policyClient := fakepolicyversioned.NewSimpleClientset(adminPolicyObjects...) informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync) crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) mcsInformerFactory := mcsinformers.NewSharedInformerFactory(mcsClient, informerDefaultResync) @@ -177,6 +181,7 @@ func newController(k8sObjects, crdObjects []runtime.Object) (*fake.Clientset, *n internalNetworkPolicyStore, informerFactory, crdInformerFactory, + policyInformerFactory, groupingController, labelIdentityController, } @@ -255,6 +260,7 @@ func newControllerWithoutEventHandler(k8sObjects, crdObjects []runtime.Object) ( internalNetworkPolicyStore, informerFactory, crdInformerFactory, + policyInformerFactory, nil, nil, } @@ -278,7 +284,7 @@ func newClientset(objects ...runtime.Object) *fake.Clientset { } func TestAddNetworkPolicy(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) np := getK8sNetworkPolicyObj() npc.addNetworkPolicy(np) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -289,7 +295,7 @@ func TestAddNetworkPolicy(t *testing.T) { } func TestDeleteNetworkPolicy(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) np := getK8sNetworkPolicyObj() npc.addNetworkPolicy(np) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -300,7 +306,7 @@ func TestDeleteNetworkPolicy(t *testing.T) { } func TestUpdateNetworkPolicy(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) np := getK8sNetworkPolicyObj() newNP := np.DeepCopy() newNP.Spec.Ingress = nil @@ -736,7 +742,7 @@ func TestAddPod(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.networkPolicyStore.Add(testNPObj) npc.syncInternalNetworkPolicy(getKNPReference(testNPObj)) groupKey := testCG.Name @@ -827,7 +833,7 @@ func TestDeletePod(t *testing.T) { p2 := getPod("p2", ns, "", p2IP, false) // Ensure Pod p2 matches AddressGroup. p2.Labels = ruleLabels - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.networkPolicyStore.Add(matchNPObj) npc.syncInternalNetworkPolicy(getKNPReference(matchNPObj)) npc.addClusterGroup(testCG) @@ -980,7 +986,7 @@ func TestAddNamespace(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.networkPolicyStore.Add(testNPObj) npc.syncInternalNetworkPolicy(getKNPReference(testNPObj)) npc.addClusterGroup(testCG) @@ -1139,7 +1145,7 @@ func TestDeleteNamespace(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.networkPolicyStore.Add(testNPObj) npc.syncInternalNetworkPolicy(getKNPReference(testNPObj)) npc.addClusterGroup(testCG) @@ -1270,7 +1276,7 @@ func TestAddAndUpdateService(t *testing.T) { Selector: map[string]string{"app": "test-2"}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.cgStore.Add(testCG1) npc.cgStore.Add(testCG2) npc.addClusterGroup(testCG1) @@ -1349,7 +1355,7 @@ func TestDeleteService(t *testing.T) { Selector: map[string]string{"app": "test"}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.cgStore.Add(testCG) npc.addClusterGroup(testCG) npc.groupingInterface.AddPod(testPod) @@ -1786,7 +1792,7 @@ func TestToAntreaPeer(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) actualPeer, _ := npc.toAntreaPeer(tt.inPeers, testNPObj, tt.direction, tt.namedPortExist) if !reflect.DeepEqual(tt.outPeer.AddressGroups, (*actualPeer).AddressGroups) { t.Errorf("Unexpected AddressGroups in Antrea Peer conversion. Expected %v, got %v", tt.outPeer.AddressGroups, (*actualPeer).AddressGroups) @@ -2202,7 +2208,7 @@ func TestProcessNetworkPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(tt.existingObjects, nil) + _, c := newController(nil, tt.existingObjects, nil) stopCh := make(chan struct{}) defer close(stopCh) c.informerFactory.Start(stopCh) @@ -2475,7 +2481,7 @@ func TestIPStrToIPAddress(t *testing.T) { } func TestDeleteFinalStateUnknownNetworkPolicy(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.heartbeatCh = make(chan heartbeat, 2) np := &networkingv1.NetworkPolicy{ ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "npA", UID: "uidA"}, @@ -2621,7 +2627,7 @@ func TestGetAppliedToWorkloads(t *testing.T) { expEEs: emptyEEs, }, } - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.groupingInterface.AddPod(podA) c.groupingInterface.AddPod(podB) clusterGroups := []v1beta1.ClusterGroup{cgA, cgB, cgC, cgD, nestedCG1, nestedCG2} @@ -2742,7 +2748,7 @@ func TestGetAddressGroupMemberSet(t *testing.T) { expMemberSet: podABMemberSet, }, } - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.groupingInterface.AddPod(podA) c.groupingInterface.AddPod(podB) clusterGroups := []v1beta1.ClusterGroup{cgA, cgB, cgC, cgD, nestedCG1, nestedCG2} @@ -2765,7 +2771,7 @@ func TestGetAddressGroupMemberSet(t *testing.T) { func TestAddressGroupWithNodeSelector(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.informerFactory.Start(stopCh) c.crdInformerFactory.Start(stopCh) go c.groupingController.Run(stopCh) @@ -3042,7 +3048,7 @@ func TestMultipleNetworkPoliciesWithSameAppliedTo(t *testing.T) { }, AppliedToGroups: []string{selectorAGroupUID}, } - _, c := newController([]runtime.Object{podA, podB, podC}, nil) + _, c := newController(nil, []runtime.Object{podA, podB, podC}, nil) stopCh := make(chan struct{}) defer close(stopCh) c.informerFactory.Start(stopCh) @@ -3183,7 +3189,7 @@ func TestSyncInternalNetworkPolicy(t *testing.T) { } // Add a new policy, it should create an internal NetworkPolicy, AddressGroups and AppliedToGroups used by it. - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.acnpStore.Add(inputPolicy) networkPolicyRef := getACNPReference(inputPolicy) assert.NoError(t, c.syncInternalNetworkPolicy(networkPolicyRef)) @@ -3291,7 +3297,7 @@ func TestSyncInternalNetworkPolicyWithSameName(t *testing.T) { } // Add and sync policyA first, it should create an AppliedToGroup. - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.networkPolicyStore.Add(policyA) networkPolicyRefA := getKNPReference(policyA) assert.NoError(t, c.syncInternalNetworkPolicy(networkPolicyRefA)) @@ -3403,7 +3409,7 @@ func TestSyncInternalNetworkPolicyConcurrently(t *testing.T) { } // Add and sync policyA first, it should create an AddressGroup and AppliedToGroups. - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.networkPolicyStore.Add(policyA) networkPolicyRefA := getKNPReference(policyA) assert.NoError(t, c.syncInternalNetworkPolicy(networkPolicyRefA)) @@ -3651,7 +3657,7 @@ func TestSyncInternalNetworkPolicyWithGroups(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController([]runtime.Object{podA, podB}, nil) + _, c := newController(nil, []runtime.Object{podA, podB}, nil) stopCh := make(chan struct{}) defer close(stopCh) c.informerFactory.Start(stopCh) @@ -3758,7 +3764,7 @@ func TestSyncAppliedToGroupWithExternalEntity(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.groupingInterface.AddExternalEntity(tt.addedExternalEntity) groupSelector := antreatypes.NewGroupSelector("nsA", nil, nil, &selectorSpec, nil) appGroupID := getNormalizedUID(groupSelector.NormalizedName) @@ -3782,6 +3788,128 @@ func TestSyncAppliedToGroupWithExternalEntity(t *testing.T) { } } +func TestVerifyPoliciesProcessed(t *testing.T) { + np1 := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "np1", + Namespace: "ns1", + UID: "uid1", + }, + } + np2 := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "np2", + Namespace: "ns2", + UID: "uid2", + }, + } + acnp := &v1beta1.ClusterNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acnp1", + UID: "uid3", + }, + } + annp := &v1beta1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "annp1", + UID: "uid4", + }, + } + anp := &v1alpha1.AdminNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "anp1", + UID: "uid5", + }, + } + banp := &v1alpha1.BaselineAdminNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + UID: "uid6", + }, + } + tests := []struct { + name string + existingKNPObjects []runtime.Object + existingAntreaNPObjects []runtime.Object + existingANPObjects []runtime.Object + }{ + { + name: "no-policies-in-cluster", + }, + { + name: "k8s-policy", + existingKNPObjects: []runtime.Object{np1}, + }, + { + name: "multiple-k8s-policy", + existingKNPObjects: []runtime.Object{np1, np2}, + }, + { + name: "mixed-k8s-antrea-native-policy", + existingKNPObjects: []runtime.Object{np1}, + existingAntreaNPObjects: []runtime.Object{acnp, annp}, + }, + { + name: "mixed-k8s-admin-network-policy", + existingKNPObjects: []runtime.Object{np1}, + existingANPObjects: []runtime.Object{anp, banp}, + }, + } + defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, features.AdminNetworkPolicy, true)() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, c := newController(tt.existingAntreaNPObjects, tt.existingKNPObjects, tt.existingANPObjects) + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.crdInformerFactory.Start(stopCh) + c.policyInformerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + c.policyInformerFactory.WaitForCacheSync(stopCh) + + testSynced := func(numTotal, numSynced int) { + processed, err := c.verifyPoliciesProcessed() + if numSynced < numTotal { + assert.Falsef(t, processed, "controller reports policies processed before they are synced") + } else { + assert.NoError(t, err) + assert.Truef(t, processed, "controller reports policies not processed after they are all synced") + } + } + numPolicy := len(tt.existingKNPObjects) + len(tt.existingAntreaNPObjects) + len(tt.existingANPObjects) + numSynced := 0 + for _, policyObj := range tt.existingKNPObjects { + np := policyObj.(*networkingv1.NetworkPolicy) + c.syncInternalNetworkPolicy(getKNPReference(np)) + numSynced += 1 + testSynced(numPolicy, numSynced) + } + for _, obj := range tt.existingAntreaNPObjects { + if acnp, ok := obj.(*v1beta1.ClusterNetworkPolicy); ok { + c.syncInternalNetworkPolicy(getACNPReference(acnp)) + } else { + annp := obj.(*v1beta1.NetworkPolicy) + c.syncInternalNetworkPolicy(getANNPReference(annp)) + } + numSynced += 1 + testSynced(numPolicy, numSynced) + } + for _, obj := range tt.existingANPObjects { + if anp, ok := obj.(*v1alpha1.AdminNetworkPolicy); ok { + c.syncInternalNetworkPolicy(getAdminNPReference(anp)) + } else { + banp := obj.(*v1alpha1.BaselineAdminNetworkPolicy) + c.syncInternalNetworkPolicy(getBANPReference(banp)) + } + numSynced += 1 + testSynced(numPolicy, numSynced) + } + }) + } +} + func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface, items ...string) { require.Equal(t, len(items), queue.Len()) expectedItems := sets.New[string](items...) diff --git a/pkg/controller/networkpolicy/testing/mock_networkpolicy.go b/pkg/controller/networkpolicy/testing/mock_networkpolicy.go index 26684c42404..16a8e50dad9 100644 --- a/pkg/controller/networkpolicy/testing/mock_networkpolicy.go +++ b/pkg/controller/networkpolicy/testing/mock_networkpolicy.go @@ -82,3 +82,18 @@ func (mr *MockEndpointQuerierMockRecorder) QueryNetworkPolicyRules(arg0, arg1 an mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryNetworkPolicyRules", reflect.TypeOf((*MockEndpointQuerier)(nil).QueryNetworkPolicyRules), arg0, arg1) } + +// VerifyPoliciesProcessed mocks base method. +func (m *MockEndpointQuerier) VerifyPoliciesProcessed() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "VerifyPoliciesProcessed") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// VerifyPoliciesProcessed indicates an expected call of VerifyPoliciesProcessed. +func (mr *MockEndpointQuerierMockRecorder) VerifyPoliciesProcessed() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "VerifyPoliciesProcessed", reflect.TypeOf((*MockEndpointQuerier)(nil).VerifyPoliciesProcessed)) +} diff --git a/pkg/controller/networkpolicy/tier_test.go b/pkg/controller/networkpolicy/tier_test.go index bcbe2fc549c..ec7db98b285 100644 --- a/pkg/controller/networkpolicy/tier_test.go +++ b/pkg/controller/networkpolicy/tier_test.go @@ -70,7 +70,7 @@ func TestInitTier(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) if tc.reactor != nil { c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.reactor) } diff --git a/pkg/controller/networkpolicy/validate_test.go b/pkg/controller/networkpolicy/validate_test.go index 43b20fe13ca..67909bf9a1c 100644 --- a/pkg/controller/networkpolicy/validate_test.go +++ b/pkg/controller/networkpolicy/validate_test.go @@ -1668,7 +1668,7 @@ func TestValidateAntreaClusterNetworkPolicy(t *testing.T) { for feature, value := range tt.featureGates { defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, feature, value)() } - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) validator := NewNetworkPolicyValidator(controller.NetworkPolicyController) actualReason, allowed := validator.validateAntreaPolicy(tt.policy, "", tt.operation, authenticationv1.UserInfo{}) assert.Equal(t, tt.expectedReason, actualReason) @@ -1740,7 +1740,7 @@ func TestValidateAntreaNetworkPolicy(t *testing.T) { for feature, value := range tt.featureGates { defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, feature, value)() } - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) validator := NewNetworkPolicyValidator(controller.NetworkPolicyController) actualReason, allowed := validator.validateAntreaPolicy(tt.policy, "", tt.operation, authenticationv1.UserInfo{}) assert.Equal(t, tt.expectedReason, actualReason) @@ -2023,7 +2023,7 @@ func TestValidateAntreaClusterGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) if tt.existGroup != nil { controller.cgStore.Add(tt.existGroup) controller.addClusterGroup(tt.existGroup) @@ -2280,7 +2280,7 @@ func TestValidateAntreaGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) if tt.existGroup != nil { controller.gStore.Add(tt.existGroup) controller.addGroup(tt.existGroup) @@ -2488,7 +2488,7 @@ func TestValidateTier(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) for i := 1; i <= tt.existTierNum; i++ { controller.tierStore.Add(&crdv1beta1.Tier{ ObjectMeta: metav1.ObjectMeta{ @@ -2710,7 +2710,7 @@ func TestValidateAdminNetworkPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) validator := NewNetworkPolicyValidator(controller.NetworkPolicyController) actualReason, allowed := validator.validateAdminNetworkPolicy(tt.policy, "", tt.operation, authenticationv1.UserInfo{}) assert.Equal(t, tt.expectedReason, actualReason)