Skip to content

Commit

Permalink
BPF Upgrade without disruption
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Boland committed Oct 23, 2023
1 parent 729abe1 commit 5381547
Show file tree
Hide file tree
Showing 41 changed files with 783 additions and 257 deletions.
111 changes: 56 additions & 55 deletions pkg/controller/installation/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,72 +30,58 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func bpfUpgradeWithoutDisruption(r *ReconcileInstallation, ctx context.Context, install *operator.Installation, ds *appsv1.DaemonSet, fc *crdv1.FelixConfiguration, reqLogger logr.Logger) error {
func bpfUpgradeDaemonsetEnvVar(r *ReconcileInstallation, ctx context.Context, install *operator.Installation, ds *appsv1.DaemonSet, fc *crdv1.FelixConfiguration, reqLogger logr.Logger) error {

// Query calico-node DS: if FELIX_BPFENABLED env var set and FC bpfEnabled unset then patch FC and quit.
patchFelixConfig, err := processDaemonsetEnvVar(r, ctx, ds, fc, reqLogger)
if err != nil {
return err
}

// Otherwise check dataplane and patch Felix Config according to logic:
if !patchFelixConfig {
// Attempt to patch Felix Config now.
return patchFelixConfigurationImpl(r, ctx, install, fc, reqLogger, patchFelixConfig)
}

// Check the install dataplane mode is either Iptables or BPF.
installBpfEnabled := common.BpfDataplaneEnabled(&install.Spec)
func bpfUpgradeWithoutDisruption(r *ReconcileInstallation, ctx context.Context, install *operator.Installation, ds *appsv1.DaemonSet, fc *crdv1.FelixConfiguration, reqLogger logr.Logger) error {

// Check edge case where User has externally patched FelixConfig bpfEnabled which causes conflict to prevent Operator from upgrading dataplane.
if fc.Spec.BPFEnabled != nil {
var patchFelixConfig bool

fcBPFEnabled := *fc.Spec.BPFEnabled
if installBpfEnabled != fcBPFEnabled {

// Ensure Felix Config annotations are either empty or equal previous FC bpfEnabled value.
if fc.Annotations[render.BpfOperatorAnnotation] == strconv.FormatBool(installBpfEnabled) {
text := fmt.Sprintf("An error occurred while attempting patch Felix Config bpfEnabled: '%s' as Felix Config has been modified externally to '%s'",
strconv.FormatBool(installBpfEnabled),
strconv.FormatBool(fcBPFEnabled))
err = errors.New(text)
return err
}
}
}
// Check the install dataplane mode is either Iptables or BPF.
installBpfEnabled := common.BpfDataplaneEnabled(&install.Spec)

if !installBpfEnabled {
// IP Tables dataplane:
// Only patch Felix Config once to prevent log spamming.
if fc.Spec.BPFEnabled == nil || *fc.Spec.BPFEnabled {
patchFelixConfig = true
}
} else {
// BPF dataplane:
// Check daemonset rollout complete before patching.
if fc.Spec.BPFEnabled == nil || !(*fc.Spec.BPFEnabled) {
patchFelixConfig = checkDaemonsetRolloutComplete(ds)
}
}
// Check edge case where User has externally patched FelixConfig bpfEnabled which causes conflict to prevent Operator from upgrading dataplane.
if fc.Spec.BPFEnabled != nil {

// Attempt to patch Felix Config now.
if patchFelixConfig {
fcBPFEnabled := *fc.Spec.BPFEnabled
if installBpfEnabled != fcBPFEnabled {

err = patchFelixConfiguration(r, ctx, fc, reqLogger, installBpfEnabled)
if err != nil {
// Ensure Felix Config annotations are either empty or equal previous FC bpfEnabled value.
if fc.Annotations[render.BpfOperatorAnnotation] == strconv.FormatBool(installBpfEnabled) {
text := fmt.Sprintf("An error occurred while attempting patch Felix Config bpfEnabled: '%s' as Felix Config has been modified externally to '%s'",
strconv.FormatBool(installBpfEnabled),
strconv.FormatBool(fcBPFEnabled))
err := errors.New(text)
return err
}

// Ensure if no errors occurred while attempting to patch Falix Config then successfully patched.
patchFelixConfig = err == nil
}
}

if patchFelixConfig {
if fc.Spec.BPFEnabled != nil {
msg := fmt.Sprintf("Successfully patched Felix Config OK bpfEnabled='%s'", strconv.FormatBool(*fc.Spec.BPFEnabled))
reqLogger.Info(msg)
if !installBpfEnabled {
// IP Tables dataplane:
// Only patch Felix Config once to prevent log spamming.
if fc.Spec.BPFEnabled == nil || *fc.Spec.BPFEnabled {
patchFelixConfig = true
}
} else {
// BPF dataplane:
// Check daemonset rollout complete before patching.
if fc.Spec.BPFEnabled == nil || !(*fc.Spec.BPFEnabled) {
patchFelixConfig = checkDaemonsetRolloutComplete(ds)
}
}

return nil
// Attempt to patch Felix Config now.
return patchFelixConfigurationImpl(r, ctx, install, fc, reqLogger, patchFelixConfig)
}

func processDaemonsetEnvVar(r *ReconcileInstallation, ctx context.Context, ds *appsv1.DaemonSet, fc *crdv1.FelixConfiguration, reqLogger logr.Logger) (bool, error) {
Expand All @@ -115,16 +101,7 @@ func processDaemonsetEnvVar(r *ReconcileInstallation, ctx context.Context, ds *a
}
}

if dsBpfEnabledStatus && fc.Spec.BPFEnabled == nil {
err = patchFelixConfiguration(r, ctx, fc, reqLogger, dsBpfEnabledStatus)
if err != nil {
return false, err
} else {
return true, nil
}
}

return false, nil
return dsBpfEnabledStatus && fc.Spec.BPFEnabled == nil, nil
}

func checkDaemonsetRolloutComplete(ds *appsv1.DaemonSet) bool {
Expand All @@ -142,6 +119,30 @@ func checkDaemonsetRolloutComplete(ds *appsv1.DaemonSet) bool {
return false
}

func patchFelixConfigurationImpl(r *ReconcileInstallation, ctx context.Context, install *operator.Installation, fc *crdv1.FelixConfiguration, reqLogger logr.Logger, patchFelixConfig bool) error {

if patchFelixConfig {

installBpfEnabled := common.BpfDataplaneEnabled(&install.Spec)
err := patchFelixConfiguration(r, ctx, fc, reqLogger, installBpfEnabled)
if err != nil {
return err
}

// Ensure if no errors occurred while attempting to patch Falix Config then successfully patched.
patchFelixConfig = err == nil
}

if patchFelixConfig {
if fc.Spec.BPFEnabled != nil {
msg := fmt.Sprintf("Successfully patched Felix Config OK bpfEnabled='%s'", strconv.FormatBool(*fc.Spec.BPFEnabled))
reqLogger.Info(msg)
}
}

return nil
}

func patchFelixConfiguration(r *ReconcileInstallation, ctx context.Context, fc *crdv1.FelixConfiguration, reqLogger logr.Logger, patchBpfEnabled bool) error {

// Obtain the original FelixConfig to patch.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/installation/bpf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ var _ = Describe("Testing BPF Upgrade without disruption during core-controller
Expect(c.Create(ctx, fc)).NotTo(HaveOccurred())

// Act.
err := bpfUpgradeWithoutDisruption(&r, ctx, cr, ds, fc, reqLogger)
err := bpfUpgradeDaemonsetEnvVar(&r, ctx, cr, ds, fc, reqLogger)
Expect(err).ShouldNot(HaveOccurred())

// Assert.
Expand Down
19 changes: 18 additions & 1 deletion pkg/controller/installation/core_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,23 @@ func (r *ReconcileInstallation) Reconcile(ctx context.Context, request reconcile
return reconcile.Result{}, err
}

// BPF Upgrade env var initial check:
var calicoNodeDaemonset appsv1.DaemonSet
calicoNodeDaemonset = appsv1.DaemonSet{}

err = r.client.Get(ctx, types.NamespacedName{Namespace: common.CalicoNamespace, Name: common.NodeDaemonSetName}, &calicoNodeDaemonset)
if err != nil {
reqLogger.Error(err, "An error occurred when querying the calico-node daemonset")
return reconcile.Result{}, err
}

// Next delegate logic implementation here using the state of the installation and dependent resources.
err = bpfUpgradeDaemonsetEnvVar(r, ctx, instance, &calicoNodeDaemonset, felixConfiguration, reqLogger)
if err != nil {
reqLogger.Error(err, "An error occurred when attempting to process BPF Upgrade Calico-Node DS env var")
return reconcile.Result{}, err
}

// Create a component handler to create or update the rendered components.
handler := utils.NewComponentHandler(log, r.client, r.scheme, instance)
for _, component := range components {
Expand Down Expand Up @@ -1501,7 +1518,7 @@ func (r *ReconcileInstallation) Reconcile(ctx context.Context, request reconcile

// BPF Upgrade without disruption:
// First get the calico-node daemonset.
calicoNodeDaemonset := appsv1.DaemonSet{}
calicoNodeDaemonset = appsv1.DaemonSet{}
err = r.client.Get(ctx, types.NamespacedName{Namespace: common.CalicoNamespace, Name: common.NodeDaemonSetName}, &calicoNodeDaemonset)
if err != nil {
reqLogger.Error(err, "An error occurred when querying the calico-node daemonset")
Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/installation/core_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,20 @@ var _ = Describe("Testing core-controller installation", func() {
},
},
})).NotTo(HaveOccurred())

Expect(c.Create(
ctx,
&appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: common.NodeDaemonSetName, Namespace: common.CalicoNamespace},
Spec: appsv1.DaemonSetSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: render.CalicoNodeObjectName}},
},
},
},
})).NotTo(HaveOccurred())
})
AfterEach(func() {
cancel()
Expand Down Expand Up @@ -810,6 +824,20 @@ var _ = Describe("Testing core-controller installation", func() {
Expect(err).NotTo(HaveOccurred())
Expect(c.Create(ctx, prometheusTLS.Secret(common.OperatorNamespace()))).NotTo(HaveOccurred())
Expect(c.Create(ctx, &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}})).NotTo(HaveOccurred())

Expect(c.Create(
ctx,
&appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: common.NodeDaemonSetName, Namespace: common.CalicoNamespace},
Spec: appsv1.DaemonSetSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: render.CalicoNodeObjectName}},
},
},
},
})).NotTo(HaveOccurred())
})
AfterEach(func() {
cancel()
Expand Down Expand Up @@ -984,6 +1012,20 @@ var _ = Describe("Testing core-controller installation", func() {
Expect(err).NotTo(HaveOccurred())
Expect(c.Create(ctx, prometheusTLS.Secret(common.OperatorNamespace()))).NotTo(HaveOccurred())
Expect(c.Create(ctx, &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}})).NotTo(HaveOccurred())

Expect(c.Create(
ctx,
&appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: common.NodeDaemonSetName, Namespace: common.CalicoNamespace},
Spec: appsv1.DaemonSetSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: render.CalicoNodeObjectName}},
},
},
},
})).NotTo(HaveOccurred())
})
AfterEach(func() {
cancel()
Expand Down
21 changes: 11 additions & 10 deletions pkg/controller/logcollector/logcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,16 +351,6 @@ func (r *ReconcileLogCollector) Reconcile(ctx context.Context, request reconcile
return reconcile.Result{}, err
}

esClusterConfig, err := utils.GetElasticsearchClusterConfig(ctx, r.client)
if err != nil {
if errors.IsNotFound(err) {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Elasticsearch cluster configuration is not available, waiting for it to become available", err, reqLogger)
return reconcile.Result{}, nil
}
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to get the elasticsearch cluster configuration", err, reqLogger)
return reconcile.Result{}, err
}

pullSecrets, err := utils.GetNetworkingPullSecrets(installation, r.client)
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error retrieving pull secrets", err, reqLogger)
Expand Down Expand Up @@ -541,10 +531,21 @@ func (r *ReconcileLogCollector) Reconcile(ctx context.Context, request reconcile
}

var eksConfig *render.EksCloudwatchLogConfig
var esClusterConfig *relasticsearch.ClusterConfig
if installation.KubernetesProvider == operatorv1.ProviderEKS {
log.Info("Managed kubernetes EKS found, getting necessary credentials and config")
if instance.Spec.AdditionalSources != nil {
if instance.Spec.AdditionalSources.EksCloudwatchLog != nil {
esClusterConfig, err = utils.GetElasticsearchClusterConfig(ctx, r.client)
if err != nil {
if errors.IsNotFound(err) {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Elasticsearch cluster configuration is not available, waiting for it to become available", err, reqLogger)
return reconcile.Result{}, nil
}
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to get the elasticsearch cluster configuration", err, reqLogger)
return reconcile.Result{}, err
}

eksConfig, err = getEksCloudwatchLogConfig(r.client,
instance.Spec.AdditionalSources.EksCloudwatchLog.FetchInterval,
instance.Spec.AdditionalSources.EksCloudwatchLog.Region,
Expand Down
Loading

0 comments on commit 5381547

Please sign in to comment.