Skip to content

Commit

Permalink
feat: extract selectors from daemonsets if not provided through config
Browse files Browse the repository at this point in the history
  • Loading branch information
scoquelin committed Aug 23, 2024
1 parent 3ea2efe commit ac5366e
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 43 deletions.
8 changes: 7 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package main
import (
"flag"
"os"
"strings"

"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/uswitch/nidhogg/pkg/apis"
Expand Down Expand Up @@ -57,7 +59,11 @@ func main() {
os.Exit(1)
}

log.Info("looking for nodes that match selector", "selector", handlerConf.Selector.String())
if handlerConf.NodeSelector == nil {
log.Info("looking for nodes that will match daemonsets selectors")
} else {
log.Info("looking for nodes that match provided node selector", "selector", strings.Join(handlerConf.NodeSelector, ","))
}

// Get a config to talk to the apiserver
log.Info("setting up client for manager")
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/node/node_controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package node

import (
"context"
"github.com/onsi/gomega"
stdlog "log"
"os"
"path/filepath"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sync"
"testing"

"github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/uswitch/nidhogg/pkg/apis"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down
11 changes: 6 additions & 5 deletions pkg/controller/node/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package node

import (
"context"
"testing"
"time"

"github.com/onsi/gomega"
"github.com/uswitch/nidhogg/pkg/nidhogg"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"
"testing"
"time"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -47,10 +48,10 @@ func TestReconcile(t *testing.T) {
g.Expect(err).NotTo(gomega.HaveOccurred())
c = mgr.GetClient()

handler := nidhogg.HandlerConfig{}
_ = handler.BuildSelectors()
handlerConfig := nidhogg.HandlerConfig{}
_ = handlerConfig.BuildSelectors()

recFn, requests := SetupTestReconcile(newReconciler(mgr, handler))
recFn, requests := SetupTestReconcile(newReconciler(mgr, handlerConfig))
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())

_, cancel, mgrStopped := StartTestManager(mgr, g)
Expand Down
90 changes: 60 additions & 30 deletions pkg/nidhogg/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package nidhogg
import (
"context"
"fmt"
"github.com/uswitch/nidhogg/pkg/utils"
"k8s.io/apimachinery/pkg/api/errors"
"reflect"
"strings"
"time"

"github.com/uswitch/nidhogg/pkg/utils"
"k8s.io/apimachinery/pkg/api/errors"

"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -66,20 +68,25 @@ type HandlerConfig struct {
TaintNamePrefix string `json:"taintNamePrefix,omitempty" yaml:"taintNamePrefix,omitempty"`
TaintRemovalDelayInSeconds int `json:"taintRemovalDelayInSeconds,omitempty" yaml:"taintRemovalDelayInSeconds,omitempty"`
Daemonsets []Daemonset `json:"daemonsets" yaml:"daemonsets"`
NodeSelector []string `json:"nodeSelector" yaml:"nodeSelector"`
Selector labels.Selector
NodeSelector []string `json:"nodeSelector,omitempty" yaml:"nodeSelector,omitempty"`
DaemonsetSelectors map[Daemonset]labels.Selector
}

func (hc *HandlerConfig) BuildSelectors() error {
hc.Selector = labels.Everything()
hc.DaemonsetSelectors = make(map[Daemonset]labels.Selector)
globalSelector := labels.Nothing()
for _, rawSelector := range hc.NodeSelector {
if selector, err := labels.Parse(rawSelector); err != nil {
return fmt.Errorf("error parsing selector: %v", err)
} else {
requirements, _ := selector.Requirements()
hc.Selector = hc.Selector.Add(requirements...)
globalSelector = labels.NewSelector().Add(requirements...)
}
}
//Will initialize all daemonsets with the same selector, either representing the NodeSelector config or labels.Nothing if no config was provided for NodeSelector
for _, daemonset := range hc.Daemonsets {
hc.DaemonsetSelectors[daemonset] = globalSelector
}
return nil
}

Expand Down Expand Up @@ -116,15 +123,10 @@ func (h *Handler) HandleNode(ctx context.Context, request reconcile.Request) (re
return reconcile.Result{}, err
}

//check whether nodeName matches the nodeSelector
if !h.config.Selector.Matches(labels.Set(latestNode.Labels)) {
return reconcile.Result{}, nil
}

updatedNode, taintChanges, err := h.calculateTaints(ctx, latestNode)
if err != nil {
taintOperationErrors.WithLabelValues("calculateTaints").Inc()
return reconcile.Result{}, fmt.Errorf("error caluclating taints for nodeName: %v", err)
return reconcile.Result{}, fmt.Errorf("error calculating taints for nodeName: %v", err)
}

taintLess := true
Expand Down Expand Up @@ -179,6 +181,18 @@ func (h *Handler) HandleNode(ctx context.Context, request reconcile.Request) (re
return reconcile.Result{}, nil
}

func (h *Handler) getSelectorFromDaemonSet(ctx context.Context, daemonset Daemonset) (labels.Selector, error) {
ds := &appsv1.DaemonSet{}
err := h.Get(ctx, types.NamespacedName{Namespace: daemonset.Namespace, Name: daemonset.Name}, ds)
if err != nil {
logf.Log.Info(fmt.Sprintf("Could not fetch daemonset %s from namespace %s", daemonset.Name, daemonset.Namespace))
return nil, err
}
selector := labels.SelectorFromSet(ds.Spec.Template.Spec.NodeSelector)

return selector, nil
}

func (h *Handler) calculateTaints(ctx context.Context, instance *corev1.Node) (*corev1.Node, taintChanges, error) {

nodeCopy := instance.DeepCopy()
Expand All @@ -195,28 +209,44 @@ func (h *Handler) calculateTaints(ctx context.Context, instance *corev1.Node) (*
}
for _, daemonset := range h.config.Daemonsets {

taint := fmt.Sprintf("%s/%s.%s", h.getTaintNamePrefix(), daemonset.Namespace, daemonset.Name)
// Get Pod for nodeName
pods, err := h.getDaemonsetPods(ctx, instance.Name, daemonset)
if err != nil {
return nil, taintChanges{}, fmt.Errorf("error fetching pods: %v", err)
//If NodeSelector was not provided upfront through config
if h.config.DaemonsetSelectors[daemonset] == labels.Nothing() {
//Will try to get selectors from daemonset directly
selector, err := h.getSelectorFromDaemonSet(ctx, daemonset)
if err != nil {
logf.Log.Info(fmt.Sprintf("Could not fetch selector from daemonset %s in namespace %s", daemonset.Name, daemonset.Namespace))
} else {
//Override existing daemonset selector with the one freshly retrieved from the daemonset
h.config.DaemonsetSelectors[daemonset] = selector
}
}

if len(pods) > 0 && utils.AllTrue(pods, func(pod *corev1.Pod) bool { return podReady(pod) }) {
// if the taint is in the taintsToRemove map, it'll be removed
continue
}
// pod doesn't exist or is not ready
_, ok := taintsToRemove[taint]
if ok {
// we want to keep this already existing taint on it
delete(taintsToRemove, taint)
continue
//make sure daemonset selector matches node selector
if h.config.DaemonsetSelectors[daemonset].Matches(labels.Set(instance.Labels)) {
taint := fmt.Sprintf("%s/%s.%s", h.getTaintNamePrefix(), daemonset.Namespace, daemonset.Name)
// Get Pod for nodeName
pods, err := h.getDaemonsetPods(ctx, instance.Name, daemonset)
if err != nil {
return nil, taintChanges{}, fmt.Errorf("error fetching pods: %v", err)
}

if len(pods) > 0 && utils.AllTrue(pods, func(pod *corev1.Pod) bool { return podReady(pod) }) {
// if the taint is in the taintsToRemove map, it'll be removed
continue
}
// pod doesn't exist or is not ready
_, ok := taintsToRemove[taint]
if ok {
// we want to keep this already existing taint on it
delete(taintsToRemove, taint)
continue
}
// taint is not already present, adding it
changes.taintsAdded = append(changes.taintsAdded, taint)
nodeCopy.Spec.Taints = addTaint(nodeCopy.Spec.Taints, taint)
}
// taint is not already present, adding it
changes.taintsAdded = append(changes.taintsAdded, taint)
nodeCopy.Spec.Taints = addTaint(nodeCopy.Spec.Taints, taint)
}

for taint := range taintsToRemove {
h.applyTaintRemovalDelay()
nodeCopy.Spec.Taints = removeTaint(nodeCopy.Spec.Taints, taint)
Expand Down
78 changes: 73 additions & 5 deletions pkg/nidhogg/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -30,8 +31,26 @@ func TestCalculateTaintsWithReadyPod(t *testing.T) {
node := buildNode(namespace, []string{daemonset})
pod := buildPod("pod", daemonset, corev1.PodReady)
cfg := buildNidhoggConfig(namespace, []string{daemonset})
cfg.BuildSelectors()

handler := buildHandler([]corev1.Pod{pod}, cfg)
handler := buildHandler([]corev1.Pod{pod}, nil, cfg)
updatedNode, changes, err := handler.calculateTaints(ctx, &node)

assert.NoError(t, err)
assert.NotNil(t, updatedNode)
assert.NotContains(t, updatedNode.Spec.Taints, taintName)
assert.Empty(t, changes.taintsAdded)
assert.Contains(t, changes.taintsRemoved, taintName)
}

func TestCalculateTaintsWithReadyPodAndWithoutNodeSelector(t *testing.T) {
ctx := context.TODO()
node := buildNode(namespace, []string{daemonset})
pod := buildPod("pod", daemonset, corev1.PodReady)
cfg := buildNidhoggConfigWithoutNodeSelector(namespace, []string{daemonset})
cfg.BuildSelectors()

handler := buildHandler([]corev1.Pod{pod}, []appsv1.DaemonSet{buildDaemonset(daemonset)}, cfg)
updatedNode, changes, err := handler.calculateTaints(ctx, &node)

assert.NoError(t, err)
Expand All @@ -47,8 +66,9 @@ func TestCalculateTaintsWithMultipleDaemonsets(t *testing.T) {
pod1 := buildPod("pod1", daemonset1, corev1.PodReady)
pod2 := buildPod("pod2", daemonset2, corev1.PodScheduled)
cfg := buildNidhoggConfig(namespace, []string{daemonset1, daemonset2})
cfg.BuildSelectors()

handler := buildHandler([]corev1.Pod{pod1, pod2}, cfg)
handler := buildHandler([]corev1.Pod{pod1, pod2}, nil, cfg)
updatedNode, changes, err := handler.calculateTaints(ctx, &node)

assert.NoError(t, err)
Expand All @@ -64,8 +84,26 @@ func TestCalculateTaintsWithUnreadyPod(t *testing.T) {
node := buildNode(namespace, []string{daemonset})
pod := buildPod("pod", daemonset, corev1.PodScheduled)
cfg := buildNidhoggConfig(namespace, []string{daemonset})
cfg.BuildSelectors()

handler := buildHandler([]corev1.Pod{pod}, cfg)
handler := buildHandler([]corev1.Pod{pod}, nil, cfg)
updatedNode, changes, err := handler.calculateTaints(ctx, &node)

assert.NoError(t, err)
assert.NotNil(t, updatedNode)
assert.Contains(t, updatedNode.Spec.Taints, buildActiveTaint(namespace, daemonset))
assert.Empty(t, changes.taintsRemoved)
assert.Empty(t, changes.taintsAdded, taintName)
}

func TestCalculateTaintsWithUnreadyPodAndWithoutNodeSelector(t *testing.T) {
ctx := context.TODO()
node := buildNode(namespace, []string{daemonset})
pod := buildPod("pod", daemonset, corev1.PodScheduled)
cfg := buildNidhoggConfigWithoutNodeSelector(namespace, []string{daemonset})
cfg.BuildSelectors()

handler := buildHandler([]corev1.Pod{pod}, []appsv1.DaemonSet{buildDaemonset(daemonset)}, cfg)
updatedNode, changes, err := handler.calculateTaints(ctx, &node)

assert.NoError(t, err)
Expand All @@ -81,7 +119,7 @@ func TestGetDaemonsetPodsReturnsUniquePods(t *testing.T) {
pod2 := buildPod("pod2", daemonset, corev1.PodReady)
cfg := buildNidhoggConfig(namespace, []string{daemonset})

handler := buildHandler([]corev1.Pod{pod1, pod2}, cfg)
handler := buildHandler([]corev1.Pod{pod1, pod2}, nil, cfg)
daemonset := Daemonset{Name: daemonset, Namespace: namespace}
pods, err := handler.getDaemonsetPods(ctx, nodeName, daemonset)

Expand All @@ -92,12 +130,16 @@ func TestGetDaemonsetPodsReturnsUniquePods(t *testing.T) {
assert.Equal(t, pods[1].Name, pod2.Name)
}

func buildHandler(pods []corev1.Pod, config HandlerConfig) Handler {
func buildHandler(pods []corev1.Pod, daemonsets []appsv1.DaemonSet, config HandlerConfig) Handler {
return Handler{
Client: fake.NewClientBuilder().WithLists(&corev1.PodList{
TypeMeta: metav1.TypeMeta{},
ListMeta: metav1.ListMeta{},
Items: pods,
}, &appsv1.DaemonSetList{
TypeMeta: metav1.TypeMeta{},
ListMeta: metav1.ListMeta{},
Items: daemonsets,
}).Build(),
recorder: record.NewFakeRecorder(0),
config: config,
Expand All @@ -121,6 +163,14 @@ func buildNidhoggConfig(namespace string, daemonsets []string) HandlerConfig {
}
}

func buildNidhoggConfigWithoutNodeSelector(namespace string, daemonsets []string) HandlerConfig {
return HandlerConfig{
TaintNamePrefix: taintNamePrefix,
TaintRemovalDelayInSeconds: 0,
Daemonsets: buildDaemonsets(namespace, daemonsets),
}
}

func buildPod(podName string, daemonsetName string, conditionType corev1.PodConditionType) corev1.Pod {
return corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -143,6 +193,24 @@ func buildPod(podName string, daemonsetName string, conditionType corev1.PodCond
}
}

func buildDaemonset(daemonsetName string) appsv1.DaemonSet {
return appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonsetName,
Namespace: namespace,
},
Spec: appsv1.DaemonSetSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
NodeSelector: map[string]string{
nodeSelector: "true",
},
},
},
},
}
}

func buildNode(namespace string, daemonsets []string) corev1.Node {
return corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit ac5366e

Please sign in to comment.