diff --git a/Makefile b/Makefile index 40ca33c45..8bd2e8688 100644 --- a/Makefile +++ b/Makefile @@ -221,6 +221,9 @@ run-kubernetes-master: stop-kubernetes-master --server=http://localhost:8080 \ apply -f /manifests/test/namespaces.yaml + # Wait until some resources we expect to exist are present. + while ! docker exec st-apiserver kubectl get serviceaccount default; do echo "Waiting for default serviceaccount to be created..."; sleep 2; done + ## Stop the local kubernetes master stop-kubernetes-master: # Delete the cluster role binding. diff --git a/config/crd/crd.projectcalico.org_clusterinformations.yaml b/config/crd/crd.projectcalico.org_clusterinformations.yaml index 62991f943..37c143db2 100644 --- a/config/crd/crd.projectcalico.org_clusterinformations.yaml +++ b/config/crd/crd.projectcalico.org_clusterinformations.yaml @@ -52,6 +52,9 @@ spec: to signal to components such as Felix that it should wait before accessing the datastore. type: boolean + variant: + description: Variant declares which variant of Calico should be active. + type: string type: object type: object served: true diff --git a/config/crd/crd.projectcalico.org_felixconfigurations.yaml b/config/crd/crd.projectcalico.org_felixconfigurations.yaml index b4e64a5a0..3c46a9df3 100644 --- a/config/crd/crd.projectcalico.org_felixconfigurations.yaml +++ b/config/crd/crd.projectcalico.org_felixconfigurations.yaml @@ -73,6 +73,10 @@ spec: node appears to use the IP of the ingress node; this requires a permissive L2 network. [Default: Tunnel]' type: string + bpfKubeProxyEndpointSlicesEnabled: + description: BPFKubeProxyEndpointSlicesEnabled in BPF mode, controls + whether Felix's embedded kube-proxy accepts EndpointSlices or not. + type: boolean bpfKubeProxyIptablesCleanupEnabled: description: 'BPFKubeProxyIptablesCleanupEnabled, if enabled in BPF mode, Felix will proactively clean up the upstream Kubernetes kube-proxy''s diff --git a/lib/apis/v3/clusterinfo.go b/lib/apis/v3/clusterinfo.go index ff180b0cf..8800c9fbc 100644 --- a/lib/apis/v3/clusterinfo.go +++ b/lib/apis/v3/clusterinfo.go @@ -46,6 +46,8 @@ type ClusterInformationSpec struct { // DatastoreReady is used during significant datastore migrations to signal to components // such as Felix that it should wait before accessing the datastore. DatastoreReady *bool `json:"datastoreReady,omitempty"` + // Variant declares which variant of Calico should be active. + Variant string `json:"variant,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/lib/backend/k8s/k8s_test.go b/lib/backend/k8s/k8s_test.go index be5d96908..d0e9a0eb0 100644 --- a/lib/backend/k8s/k8s_test.go +++ b/lib/backend/k8s/k8s_test.go @@ -341,7 +341,7 @@ func CreateClientAndSyncer(cfg apiconfig.KubeConfig) (*KubeClient, *cb, api.Sync Lock: &sync.Mutex{}, updateChan: updateChan, } - syncer := felixsyncer.New(c, caCfg, callback) + syncer := felixsyncer.New(c, caCfg, callback, true) return c.(*KubeClient), &callback, syncer } diff --git a/lib/backend/syncersv1/felixsyncer/felixsyncer_e2e_test.go b/lib/backend/syncersv1/felixsyncer/felixsyncer_e2e_test.go index 5c76e2023..7f83c45d6 100644 --- a/lib/backend/syncersv1/felixsyncer/felixsyncer_e2e_test.go +++ b/lib/backend/syncersv1/felixsyncer/felixsyncer_e2e_test.go @@ -78,7 +78,7 @@ var _ = testutils.E2eDatastoreDescribe("Felix syncer tests", testutils.Datastore Describe("Felix syncer functionality", func() { It("should receive the synced after return all current data", func() { - syncer := felixsyncer.New(be, config.Spec, syncTester) + syncer := felixsyncer.New(be, config.Spec, syncTester, true) syncer.Start() expectedCacheSize := 0 @@ -495,7 +495,7 @@ var _ = testutils.E2eDatastoreDescribe("Felix syncer tests", testutils.Datastore // We need to create a new syncTester and syncer. current := syncTester.GetCacheEntries() syncTester = testutils.NewSyncerTester() - syncer = felixsyncer.New(be, config.Spec, syncTester) + syncer = felixsyncer.New(be, config.Spec, syncTester, true) syncer.Start() // Verify the data is the same as the data from the previous cache. We got the cache in the previous @@ -534,7 +534,7 @@ var _ = testutils.E2eDatastoreDescribe("Felix syncer tests (KDD only)", testutil It("should handle IPAM blocks properly for host-local IPAM", func() { config.Spec.K8sUsePodCIDR = true - syncer := felixsyncer.New(be, config.Spec, syncTester) + syncer := felixsyncer.New(be, config.Spec, syncTester, true) syncer.Start() // Verify we start a resync. @@ -551,3 +551,56 @@ var _ = testutils.E2eDatastoreDescribe("Felix syncer tests (KDD only)", testutil syncTester.ExpectStatusUpdate(api.InSync) }) }) + +var _ = testutils.E2eDatastoreDescribe("Felix syncer tests (passive mode)", testutils.DatastoreK8s, func(config apiconfig.CalicoAPIConfig) { + var be api.Client + var syncTester *testutils.SyncerTester + var err error + var c clientv3.Interface + + BeforeEach(func() { + // Create the backend client to obtain a syncer interface. + be, err = backend.NewClient(config) + Expect(err).NotTo(HaveOccurred()) + be.Clean() + + c, err = clientv3.New(config) + Expect(err).NotTo(HaveOccurred()) + + // Create a SyncerTester to receive the BGP syncer callback events and to allow us + // to assert state. + syncTester = testutils.NewSyncerTester() + }) + + It("should only receive config updates when in passive mode", func() { + syncer := felixsyncer.New(be, config.Spec, syncTester, false) + syncer.Start() + + // Verify we start a resync. + syncTester.ExpectStatusUpdate(api.WaitForDatastore) + syncTester.ExpectStatusUpdate(api.ResyncInProgress) + + // Expect to be in-sync. + syncTester.ExpectStatusUpdate(api.InSync) + + // We don't expect any resources, since we're only watching config. + syncTester.ExpectCacheSize(0) + + // Change the variant. + ci := &apiv3.ClusterInformation{ + ObjectMeta: metav1.ObjectMeta{Name: "default"}, + Spec: apiv3.ClusterInformationSpec{ + Variant: "Calico", + }, + } + _, err = c.ClusterInformation().Create(context.Background(), ci, options.SetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + // Expect an update for the variant. + syncTester.ExpectCacheSize(1) + syncTester.ExpectValueMatches( + model.GlobalConfigKey{Name: "Variant"}, + MatchRegexp("Calico"), + ) + }) +}) diff --git a/lib/backend/syncersv1/felixsyncer/felixsyncerv1.go b/lib/backend/syncersv1/felixsyncer/felixsyncerv1.go index ecdc1a636..356c86cc6 100644 --- a/lib/backend/syncersv1/felixsyncer/felixsyncerv1.go +++ b/lib/backend/syncersv1/felixsyncer/felixsyncerv1.go @@ -24,10 +24,8 @@ import ( ) // New creates a new Felix v1 Syncer. -func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.SyncerCallbacks) api.Syncer { - // Create the set of ResourceTypes required for Felix. Since the update processors - // also cache state, we need to create individual ones per syncer rather than create - // a common global set. +func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.SyncerCallbacks, isLeader bool) api.Syncer { + // Felix always needs ClusterInformation and FelixConfiguration resources. resourceTypes := []watchersyncer.ResourceType{ { ListInterface: model.ResourceListOptions{Kind: apiv3.KindClusterInformation}, @@ -37,47 +35,55 @@ func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.Syn ListInterface: model.ResourceListOptions{Kind: apiv3.KindFelixConfiguration}, UpdateProcessor: updateprocessors.NewFelixConfigUpdateProcessor(), }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindGlobalNetworkPolicy}, - UpdateProcessor: updateprocessors.NewGlobalNetworkPolicyUpdateProcessor(), - }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindGlobalNetworkSet}, - UpdateProcessor: updateprocessors.NewGlobalNetworkSetUpdateProcessor(), - }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindIPPool}, - UpdateProcessor: updateprocessors.NewIPPoolUpdateProcessor(), - }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindNode}, - UpdateProcessor: updateprocessors.NewFelixNodeUpdateProcessor(cfg.K8sUsePodCIDR), - }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindProfile}, - UpdateProcessor: updateprocessors.NewProfileUpdateProcessor(), - }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindWorkloadEndpoint}, - UpdateProcessor: updateprocessors.NewWorkloadEndpointUpdateProcessor(), - }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindNetworkPolicy}, - UpdateProcessor: updateprocessors.NewNetworkPolicyUpdateProcessor(), - }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindNetworkSet}, - UpdateProcessor: updateprocessors.NewNetworkSetUpdateProcessor(), - }, - { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindHostEndpoint}, - UpdateProcessor: updateprocessors.NewHostEndpointUpdateProcessor(), - }, } - // If using Calico IPAM, include IPAM resources the felix cares about. - if !cfg.K8sUsePodCIDR { - resourceTypes = append(resourceTypes, watchersyncer.ResourceType{ListInterface: model.BlockListOptions{}}) + if isLeader { + // These resources are only required if this is the active Felix instance on the node. + additionalTypes := []watchersyncer.ResourceType{ + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindGlobalNetworkPolicy}, + UpdateProcessor: updateprocessors.NewGlobalNetworkPolicyUpdateProcessor(), + }, + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindGlobalNetworkSet}, + UpdateProcessor: updateprocessors.NewGlobalNetworkSetUpdateProcessor(), + }, + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindIPPool}, + UpdateProcessor: updateprocessors.NewIPPoolUpdateProcessor(), + }, + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindNode}, + UpdateProcessor: updateprocessors.NewFelixNodeUpdateProcessor(cfg.K8sUsePodCIDR), + }, + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindProfile}, + UpdateProcessor: updateprocessors.NewProfileUpdateProcessor(), + }, + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindWorkloadEndpoint}, + UpdateProcessor: updateprocessors.NewWorkloadEndpointUpdateProcessor(), + }, + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindNetworkPolicy}, + UpdateProcessor: updateprocessors.NewNetworkPolicyUpdateProcessor(), + }, + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindNetworkSet}, + UpdateProcessor: updateprocessors.NewNetworkSetUpdateProcessor(), + }, + { + ListInterface: model.ResourceListOptions{Kind: apiv3.KindHostEndpoint}, + UpdateProcessor: updateprocessors.NewHostEndpointUpdateProcessor(), + }, + } + + // If using Calico IPAM, include IPAM resources the felix cares about. + if !cfg.K8sUsePodCIDR { + additionalTypes = append(additionalTypes, watchersyncer.ResourceType{ListInterface: model.BlockListOptions{}}) + } + + resourceTypes = append(resourceTypes, additionalTypes...) } return watchersyncer.New( diff --git a/lib/backend/syncersv1/updateprocessors/configurationprocessor_test.go b/lib/backend/syncersv1/updateprocessors/configurationprocessor_test.go index 72895f9ef..deed0233a 100644 --- a/lib/backend/syncersv1/updateprocessors/configurationprocessor_test.go +++ b/lib/backend/syncersv1/updateprocessors/configurationprocessor_test.go @@ -74,8 +74,8 @@ var _ = Describe("Test the generic configuration update processor and the concre Name: "node.bgpnode1", } numFelixConfigs := 85 - numClusterConfigs := 4 - numNodeClusterConfigs := 3 + numClusterConfigs := 5 + numNodeClusterConfigs := 4 numBgpConfigs := 5 felixMappedNames := map[string]interface{}{ "RouteRefreshInterval": nil,