Skip to content

Commit

Permalink
Configuration model for active / passive felix implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport committed Jun 9, 2020
1 parent 871c78f commit faa5721
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 49 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ run-kubernetes-master: stop-kubernetes-master
--server=http://localhost:8080 \
apply -f /manifests/test/namespaces.yaml

# Wait until some resources we expect to exist are present.
while ! docker exec st-apiserver kubectl get serviceaccount default; do echo "Waiting for default serviceaccount to be created..."; sleep 2; done

## Stop the local kubernetes master
stop-kubernetes-master:
# Delete the cluster role binding.
Expand Down
3 changes: 3 additions & 0 deletions config/crd/crd.projectcalico.org_clusterinformations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ spec:
to signal to components such as Felix that it should wait before
accessing the datastore.
type: boolean
variant:
description: Variant declares which variant of Calico should be active.
type: string
type: object
type: object
served: true
Expand Down
4 changes: 4 additions & 0 deletions config/crd/crd.projectcalico.org_felixconfigurations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ spec:
node appears to use the IP of the ingress node; this requires a
permissive L2 network. [Default: Tunnel]'
type: string
bpfKubeProxyEndpointSlicesEnabled:
description: BPFKubeProxyEndpointSlicesEnabled in BPF mode, controls
whether Felix's embedded kube-proxy accepts EndpointSlices or not.
type: boolean
bpfKubeProxyIptablesCleanupEnabled:
description: 'BPFKubeProxyIptablesCleanupEnabled, if enabled in BPF
mode, Felix will proactively clean up the upstream Kubernetes kube-proxy''s
Expand Down
2 changes: 2 additions & 0 deletions lib/apis/v3/clusterinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type ClusterInformationSpec struct {
// DatastoreReady is used during significant datastore migrations to signal to components
// such as Felix that it should wait before accessing the datastore.
DatastoreReady *bool `json:"datastoreReady,omitempty"`
// Variant declares which variant of Calico should be active.
Variant string `json:"variant,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 1 addition & 1 deletion lib/backend/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func CreateClientAndSyncer(cfg apiconfig.KubeConfig) (*KubeClient, *cb, api.Sync
Lock: &sync.Mutex{},
updateChan: updateChan,
}
syncer := felixsyncer.New(c, caCfg, callback)
syncer := felixsyncer.New(c, caCfg, callback, true)
return c.(*KubeClient), &callback, syncer
}

Expand Down
59 changes: 56 additions & 3 deletions lib/backend/syncersv1/felixsyncer/felixsyncer_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var _ = testutils.E2eDatastoreDescribe("Felix syncer tests", testutils.Datastore

Describe("Felix syncer functionality", func() {
It("should receive the synced after return all current data", func() {
syncer := felixsyncer.New(be, config.Spec, syncTester)
syncer := felixsyncer.New(be, config.Spec, syncTester, true)
syncer.Start()
expectedCacheSize := 0

Expand Down Expand Up @@ -495,7 +495,7 @@ var _ = testutils.E2eDatastoreDescribe("Felix syncer tests", testutils.Datastore
// We need to create a new syncTester and syncer.
current := syncTester.GetCacheEntries()
syncTester = testutils.NewSyncerTester()
syncer = felixsyncer.New(be, config.Spec, syncTester)
syncer = felixsyncer.New(be, config.Spec, syncTester, true)
syncer.Start()

// Verify the data is the same as the data from the previous cache. We got the cache in the previous
Expand Down Expand Up @@ -534,7 +534,7 @@ var _ = testutils.E2eDatastoreDescribe("Felix syncer tests (KDD only)", testutil

It("should handle IPAM blocks properly for host-local IPAM", func() {
config.Spec.K8sUsePodCIDR = true
syncer := felixsyncer.New(be, config.Spec, syncTester)
syncer := felixsyncer.New(be, config.Spec, syncTester, true)
syncer.Start()

// Verify we start a resync.
Expand All @@ -551,3 +551,56 @@ var _ = testutils.E2eDatastoreDescribe("Felix syncer tests (KDD only)", testutil
syncTester.ExpectStatusUpdate(api.InSync)
})
})

var _ = testutils.E2eDatastoreDescribe("Felix syncer tests (passive mode)", testutils.DatastoreK8s, func(config apiconfig.CalicoAPIConfig) {
var be api.Client
var syncTester *testutils.SyncerTester
var err error
var c clientv3.Interface

BeforeEach(func() {
// Create the backend client to obtain a syncer interface.
be, err = backend.NewClient(config)
Expect(err).NotTo(HaveOccurred())
be.Clean()

c, err = clientv3.New(config)
Expect(err).NotTo(HaveOccurred())

// Create a SyncerTester to receive the BGP syncer callback events and to allow us
// to assert state.
syncTester = testutils.NewSyncerTester()
})

It("should only receive config updates when in passive mode", func() {
syncer := felixsyncer.New(be, config.Spec, syncTester, false)
syncer.Start()

// Verify we start a resync.
syncTester.ExpectStatusUpdate(api.WaitForDatastore)
syncTester.ExpectStatusUpdate(api.ResyncInProgress)

// Expect to be in-sync.
syncTester.ExpectStatusUpdate(api.InSync)

// We don't expect any resources, since we're only watching config.
syncTester.ExpectCacheSize(0)

// Change the variant.
ci := &apiv3.ClusterInformation{
ObjectMeta: metav1.ObjectMeta{Name: "default"},
Spec: apiv3.ClusterInformationSpec{
Variant: "Calico",
},
}
_, err = c.ClusterInformation().Create(context.Background(), ci, options.SetOptions{})
Expect(err).NotTo(HaveOccurred())

// Expect an update for the variant.
syncTester.ExpectCacheSize(1)
syncTester.ExpectValueMatches(
model.GlobalConfigKey{Name: "Variant"},
MatchRegexp("Calico"),
)
})
})
92 changes: 49 additions & 43 deletions lib/backend/syncersv1/felixsyncer/felixsyncerv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
)

// New creates a new Felix v1 Syncer.
func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.SyncerCallbacks) api.Syncer {
// Create the set of ResourceTypes required for Felix. Since the update processors
// also cache state, we need to create individual ones per syncer rather than create
// a common global set.
func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.SyncerCallbacks, isLeader bool) api.Syncer {
// Felix always needs ClusterInformation and FelixConfiguration resources.
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindClusterInformation},
Expand All @@ -37,47 +35,55 @@ func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.Syn
ListInterface: model.ResourceListOptions{Kind: apiv3.KindFelixConfiguration},
UpdateProcessor: updateprocessors.NewFelixConfigUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindGlobalNetworkPolicy},
UpdateProcessor: updateprocessors.NewGlobalNetworkPolicyUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindGlobalNetworkSet},
UpdateProcessor: updateprocessors.NewGlobalNetworkSetUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindIPPool},
UpdateProcessor: updateprocessors.NewIPPoolUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindNode},
UpdateProcessor: updateprocessors.NewFelixNodeUpdateProcessor(cfg.K8sUsePodCIDR),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindProfile},
UpdateProcessor: updateprocessors.NewProfileUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindWorkloadEndpoint},
UpdateProcessor: updateprocessors.NewWorkloadEndpointUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindNetworkPolicy},
UpdateProcessor: updateprocessors.NewNetworkPolicyUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindNetworkSet},
UpdateProcessor: updateprocessors.NewNetworkSetUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindHostEndpoint},
UpdateProcessor: updateprocessors.NewHostEndpointUpdateProcessor(),
},
}

// If using Calico IPAM, include IPAM resources the felix cares about.
if !cfg.K8sUsePodCIDR {
resourceTypes = append(resourceTypes, watchersyncer.ResourceType{ListInterface: model.BlockListOptions{}})
if isLeader {
// These resources are only required if this is the active Felix instance on the node.
additionalTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindGlobalNetworkPolicy},
UpdateProcessor: updateprocessors.NewGlobalNetworkPolicyUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindGlobalNetworkSet},
UpdateProcessor: updateprocessors.NewGlobalNetworkSetUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindIPPool},
UpdateProcessor: updateprocessors.NewIPPoolUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindNode},
UpdateProcessor: updateprocessors.NewFelixNodeUpdateProcessor(cfg.K8sUsePodCIDR),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindProfile},
UpdateProcessor: updateprocessors.NewProfileUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindWorkloadEndpoint},
UpdateProcessor: updateprocessors.NewWorkloadEndpointUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindNetworkPolicy},
UpdateProcessor: updateprocessors.NewNetworkPolicyUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindNetworkSet},
UpdateProcessor: updateprocessors.NewNetworkSetUpdateProcessor(),
},
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindHostEndpoint},
UpdateProcessor: updateprocessors.NewHostEndpointUpdateProcessor(),
},
}

// If using Calico IPAM, include IPAM resources the felix cares about.
if !cfg.K8sUsePodCIDR {
additionalTypes = append(additionalTypes, watchersyncer.ResourceType{ListInterface: model.BlockListOptions{}})
}

resourceTypes = append(resourceTypes, additionalTypes...)
}

return watchersyncer.New(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ var _ = Describe("Test the generic configuration update processor and the concre
Name: "node.bgpnode1",
}
numFelixConfigs := 85
numClusterConfigs := 4
numNodeClusterConfigs := 3
numClusterConfigs := 5
numNodeClusterConfigs := 4
numBgpConfigs := 5
felixMappedNames := map[string]interface{}{
"RouteRefreshInterval": nil,
Expand Down

0 comments on commit faa5721

Please sign in to comment.