From 704981fc0480ee84d2875918e80471227f6d63af Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Fri, 2 Feb 2024 13:51:44 -0500 Subject: [PATCH] Add restart for namespace workloads. --- go.mod | 3 +- .../workloadmutation/webhookhandler.go | 58 ++--- .../workloadmutation/webhookhandler_test.go | 10 +- main.go | 15 +- pkg/instrumentation/auto/annotation.go | 148 +++++-------- pkg/instrumentation/auto/annotation_test.go | 200 +++++++++++++++--- pkg/instrumentation/auto/callback.go | 54 +++++ pkg/instrumentation/auto/restart.go | 44 ++++ 8 files changed, 357 insertions(+), 175 deletions(-) create mode 100644 pkg/instrumentation/auto/callback.go create mode 100644 pkg/instrumentation/auto/restart.go diff --git a/go.mod b/go.mod index 645198157..5c45fcab0 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( go.opentelemetry.io/collector/featuregate v0.77.0 go.opentelemetry.io/otel v1.21.0 go.uber.org/zap v1.25.0 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 @@ -126,12 +127,12 @@ require ( github.com/prometheus/procfs v0.11.1 // indirect github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 // indirect github.com/spf13/cobra v1.7.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/vultr/govultr/v2 v2.17.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.17.0 // indirect - golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect diff --git a/internal/webhook/workloadmutation/webhookhandler.go b/internal/webhook/workloadmutation/webhookhandler.go index f43e7dca8..b6cfdaf42 100644 --- a/internal/webhook/workloadmutation/webhookhandler.go +++ b/internal/webhook/workloadmutation/webhookhandler.go @@ -12,7 +12,6 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" - "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -32,56 +31,43 @@ type WebhookHandler interface { // the implementation. type workloadMutationWebhook struct { - client client.Client - decoder *admission.Decoder - logger logr.Logger - config config.Config - annotationMutator *auto.AnnotationMutators + client client.Client + decoder *admission.Decoder + logger logr.Logger + config config.Config + annotationMutators *auto.AnnotationMutators } // NewWebhookHandler creates a new WorkloadWebhookHandler. -func NewWebhookHandler(cfg config.Config, logger logr.Logger, decoder *admission.Decoder, cl client.Client, annotationMutation *auto.AnnotationMutators) WebhookHandler { +func NewWebhookHandler(cfg config.Config, logger logr.Logger, decoder *admission.Decoder, cl client.Client, annotationMutators *auto.AnnotationMutators) WebhookHandler { return &workloadMutationWebhook{ - config: cfg, - decoder: decoder, - logger: logger, - client: cl, - annotationMutator: annotationMutation, + config: cfg, + decoder: decoder, + logger: logger, + client: cl, + annotationMutators: annotationMutators, } } -func (p *workloadMutationWebhook) Handle(ctx context.Context, req admission.Request) admission.Response { - var err error - var marshaledObject []byte - var object runtime.Object +func (p *workloadMutationWebhook) Handle(_ context.Context, req admission.Request) admission.Response { + var obj client.Object switch objectKind := req.Kind.Kind; objectKind { case "DaemonSet": - ds := appsv1.DaemonSet{} - err = p.decoder.Decode(req, &ds) - if err != nil { - return admission.Errored(http.StatusBadRequest, err) - } - object = &ds + obj = &appsv1.DaemonSet{} case "Deployment": - d := appsv1.Deployment{} - err = p.decoder.Decode(req, &d) - if err != nil { - return admission.Errored(http.StatusBadRequest, err) - } - object = &d + obj = &appsv1.Deployment{} case "StatefulSet": - ss := appsv1.StatefulSet{} - err = p.decoder.Decode(req, &ss) - if err != nil { - return admission.Errored(http.StatusBadRequest, err) - } - object = &ss + obj = &appsv1.StatefulSet{} default: return admission.Errored(http.StatusBadRequest, errors.New("failed to unmarshal request object")) } - p.annotationMutator.Mutate(object) - marshaledObject, err = json.Marshal(object) + if err := p.decoder.Decode(req, obj); err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + + p.annotationMutators.MutateObject(obj) + marshaledObject, err := json.Marshal(obj) if err != nil { res := admission.Errored(http.StatusInternalServerError, err) res.Allowed = true diff --git a/internal/webhook/workloadmutation/webhookhandler_test.go b/internal/webhook/workloadmutation/webhookhandler_test.go index d94e6d4a6..ca72d6d3e 100644 --- a/internal/webhook/workloadmutation/webhookhandler_test.go +++ b/internal/webhook/workloadmutation/webhookhandler_test.go @@ -6,17 +6,17 @@ package workloadmutation import ( "context" "encoding/json" + "net/http" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" admv1 "k8s.io/api/admission/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "net/http" - "testing" - - "github.com/go-logr/logr" - "github.com/stretchr/testify/assert" "k8s.io/kubectl/pkg/scheme" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" diff --git a/main.go b/main.go index f0a934204..9787cb0d8 100644 --- a/main.go +++ b/main.go @@ -188,12 +188,14 @@ func main() { decoder := admission.NewDecoder(mgr.GetScheme()) - if os.Getenv("DISABLE_AUTO_ANNOTATION") != "true" { + if os.Getenv("DISABLE_AUTO_ANNOTATION") == "true" || autoAnnotationConfigStr == "" { + setupLog.Info("Auto-annotation is disabled") + } else { var autoAnnotationConfig auto.AnnotationConfig - if err := json.Unmarshal([]byte(autoAnnotationConfigStr), &autoAnnotationConfig); err != nil { - setupLog.Error(err, "unable to unmarshal auto-annotation config") + if err = json.Unmarshal([]byte(autoAnnotationConfigStr), &autoAnnotationConfig); err != nil { + setupLog.Error(err, "Unable to unmarshal auto-annotation config") } else { - autoAnnotationMutator := auto.NewAnnotationMutators( + autoAnnotationMutators := auto.NewAnnotationMutators( mgr.GetClient(), mgr.GetAPIReader(), logger, @@ -204,8 +206,9 @@ func main() { ), ) mgr.GetWebhookServer().Register("/mutate-v1-workload", &webhook.Admission{ - Handler: workloadmutation.NewWebhookHandler(cfg, ctrl.Log.WithName("workload-webhook"), decoder, mgr.GetClient(), autoAnnotationMutator)}) - go autoAnnotationMutator.MutateAll(ctx) + Handler: workloadmutation.NewWebhookHandler(cfg, ctrl.Log.WithName("workload-webhook"), decoder, mgr.GetClient(), autoAnnotationMutators)}) + setupLog.Info("Starting auto-annotation") + go autoAnnotationMutators.MutateAndUpdateAll(ctx) } } diff --git a/pkg/instrumentation/auto/annotation.go b/pkg/instrumentation/auto/annotation.go index 1cc32803d..0135ce6d3 100644 --- a/pkg/instrumentation/auto/annotation.go +++ b/pkg/instrumentation/auto/annotation.go @@ -13,7 +13,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/amazon-cloudwatch-agent-operator/pkg/instrumentation" @@ -26,9 +25,10 @@ const ( // +kubebuilder:rbac:groups="",resources=namespaces,verbs=list;watch;update -// AnnotationMutators has an AnnotationMutator resource name +// AnnotationMutators contains functions that can be used to mutate annotations +// on all supported objects based on the configured mutators. type AnnotationMutators struct { - client client.Client + clientWriter client.Writer clientReader client.Reader logger logr.Logger namespaceMutators map[string]instrumentation.AnnotationMutator @@ -38,96 +38,19 @@ type AnnotationMutators struct { defaultMutator instrumentation.AnnotationMutator } -// MutateAll runs the mutators for each of the configured resources. -func (m *AnnotationMutators) MutateAll(ctx context.Context) { - m.MutateNamespaces(ctx) - m.MutateDeployments(ctx) - m.MutateDaemonSets(ctx) - m.MutateStatefulSets(ctx) +// MutateAndUpdateAll runs the mutators for each of the support resources and updates them. +func (m *AnnotationMutators) MutateAndUpdateAll(ctx context.Context) { + mutateAndUpdateFunc := chainCallbacks(m.MutateObject, m.updateFunc(ctx)) + m.rangeObjectList(ctx, &corev1.NamespaceList{}, &client.ListOptions{}, + chainCallbacks(mutateAndUpdateFunc, m.restartNamespaceFunc(ctx)), + ) + m.rangeObjectList(ctx, &appsv1.DeploymentList{}, &client.ListOptions{}, mutateAndUpdateFunc) + m.rangeObjectList(ctx, &appsv1.DaemonSetList{}, &client.ListOptions{}, mutateAndUpdateFunc) + m.rangeObjectList(ctx, &appsv1.StatefulSetList{}, &client.ListOptions{}, mutateAndUpdateFunc) } -// MutateNamespaces lists all namespaces and runs MutateNamespace on each. -func (m *AnnotationMutators) MutateNamespaces(ctx context.Context) { - namespaces := &corev1.NamespaceList{} - if err := m.clientReader.List(ctx, namespaces); err != nil { - m.logger.Error(err, "Unable to list namespaces") - return - } - - for _, namespace := range namespaces.Items { - if m.Mutate(&namespace) { - if err := m.client.Update(ctx, &namespace); err != nil { - m.logger.Error(err, "Unable to send update", - "kind", namespace.Kind, - "name", namespace.Name, - ) - } - } - } -} - -// MutateDeployments lists all deployments and runs MutateDeployment on each. -func (m *AnnotationMutators) MutateDeployments(ctx context.Context) { - deployments := &appsv1.DeploymentList{} - if err := m.clientReader.List(ctx, deployments); err != nil { - m.logger.Error(err, "Unable to list deployments") - return - } - for _, deployment := range deployments.Items { - if m.Mutate(&deployment) { - if err := m.client.Update(ctx, &deployment); err != nil { - m.logger.Error(err, "Unable to send update", - "kind", deployment.Kind, - "name", deployment.Name, - "namespace", deployment.Namespace, - ) - } - } - } -} - -// MutateDaemonSets lists all daemonsets and runs MutateDaemonSet on each. -func (m *AnnotationMutators) MutateDaemonSets(ctx context.Context) { - daemonSets := &appsv1.DaemonSetList{} - if err := m.clientReader.List(ctx, daemonSets); err != nil { - m.logger.Error(err, "Unable to list daemonsets") - return - } - for _, daemonSet := range daemonSets.Items { - if m.Mutate(&daemonSet) { - if err := m.client.Update(ctx, &daemonSet); err != nil { - m.logger.Error(err, "Unable to send update", - "kind", daemonSet.Kind, - "name", daemonSet.Name, - "namespace", daemonSet.Namespace, - ) - } - } - } -} - -// MutateStatefulSets lists all statefulsets and runs MutateStatefulSet on each. -func (m *AnnotationMutators) MutateStatefulSets(ctx context.Context) { - statefulSets := &appsv1.StatefulSetList{} - if err := m.clientReader.List(ctx, statefulSets); err != nil { - m.logger.Error(err, "Unable to list statefulsets") - return - } - for _, statefulSet := range statefulSets.Items { - if m.Mutate(&statefulSet) { - if err := m.client.Update(ctx, &statefulSet); err != nil { - m.logger.Error(err, "Unable to send update", - "kind", statefulSet.Kind, - "name", statefulSet.Name, - "namespace", statefulSet.Namespace, - ) - } - } - } -} - -// Mutate modifies annotations for a single object using the configured mutators. -func (m *AnnotationMutators) Mutate(obj runtime.Object) bool { +// MutateObject modifies annotations for a single object using the configured mutators. +func (m *AnnotationMutators) MutateObject(obj client.Object) bool { switch o := obj.(type) { case *corev1.Namespace: return m.mutate(o.GetName(), m.namespaceMutators, o.GetObjectMeta()) @@ -142,6 +65,33 @@ func (m *AnnotationMutators) Mutate(obj runtime.Object) bool { } } +func (m *AnnotationMutators) rangeObjectList(ctx context.Context, list client.ObjectList, option client.ListOption, fn objectCallbackFunc) { + if err := m.clientReader.List(ctx, list, option); err != nil { + m.logger.Error(err, "Unable to list objects", + "kind", fmt.Sprintf("%T", list), + ) + return + } + switch l := list.(type) { + case *corev1.NamespaceList: + for _, item := range l.Items { + fn(&item) + } + case *appsv1.DeploymentList: + for _, item := range l.Items { + fn(&item) + } + case *appsv1.DaemonSetList: + for _, item := range l.Items { + fn(&item) + } + case *appsv1.StatefulSetList: + for _, item := range l.Items { + fn(&item) + } + } +} + func (m *AnnotationMutators) mutate(name string, mutators map[string]instrumentation.AnnotationMutator, obj metav1.Object) bool { mutator, ok := mutators[name] if !ok { @@ -157,10 +107,16 @@ func namespacedName(obj metav1.Object) string { // NewAnnotationMutators creates mutators based on the AnnotationConfig provided and enabled instrumentation.TypeSet. // The default mutator, which is used for non-configured resources, removes all auto-annotated annotations in the type // set. -func NewAnnotationMutators(client client.Client, clientReader client.Reader, logger logr.Logger, cfg AnnotationConfig, typeSet instrumentation.TypeSet) *AnnotationMutators { +func NewAnnotationMutators( + clientWriter client.Writer, + clientReader client.Reader, + logger logr.Logger, + cfg AnnotationConfig, + typeSet instrumentation.TypeSet, +) *AnnotationMutators { builder := newMutatorBuilder(typeSet) return &AnnotationMutators{ - client: client, + clientWriter: clientWriter, clientReader: clientReader, logger: logger, namespaceMutators: builder.buildMutators(getResources(cfg, typeSet, getNamespaces)), @@ -171,7 +127,11 @@ func NewAnnotationMutators(client client.Client, clientReader client.Reader, log } } -func getResources(cfg AnnotationConfig, typeSet instrumentation.TypeSet, resourceFn func(AnnotationResources) []string) map[instrumentation.Type][]string { +func getResources( + cfg AnnotationConfig, + typeSet instrumentation.TypeSet, + resourceFn func(AnnotationResources) []string, +) map[instrumentation.Type][]string { resources := map[instrumentation.Type][]string{} for instType := range typeSet { resources[instType] = resourceFn(cfg.getResources(instType)) diff --git a/pkg/instrumentation/auto/annotation_test.go b/pkg/instrumentation/auto/annotation_test.go index 130e32c5c..1fa0ebdeb 100644 --- a/pkg/instrumentation/auto/annotation_test.go +++ b/pkg/instrumentation/auto/annotation_test.go @@ -5,15 +5,18 @@ package auto import ( "context" + "errors" "strings" "testing" "github.com/go-logr/logr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - appv1 "k8s.io/api/apps/v1" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/aws/amazon-cloudwatch-agent-operator/pkg/instrumentation" @@ -104,17 +107,17 @@ func TestAnnotationMutators_Namespaces(t *testing.T) { }) } ctx := context.Background() - client := fake.NewClientBuilder().WithLists(&corev1.NamespaceList{Items: namespaces}).Build() + fakeClient := fake.NewClientBuilder().WithLists(&corev1.NamespaceList{Items: namespaces}).Build() mutators := NewAnnotationMutators( - client, - client, + fakeClient, + fakeClient, logr.Logger{}, testCase.cfg, testCase.typeSet, ) - mutators.MutateAll(ctx) + mutators.MutateAndUpdateAll(ctx) gotNamespaces := &corev1.NamespaceList{} - require.NoError(t, client.List(ctx, gotNamespaces)) + require.NoError(t, fakeClient.List(ctx, gotNamespaces)) for _, gotNamespace := range gotNamespaces.Items { annotations, ok := testCase.want[gotNamespace.Name] assert.True(t, ok) @@ -124,6 +127,84 @@ func TestAnnotationMutators_Namespaces(t *testing.T) { } } +func TestAnnotationMutators_Namespaces_Restart(t *testing.T) { + cfg := AnnotationConfig{ + Java: AnnotationResources{ + Namespaces: []string{"default"}, + }, + } + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + defaultDeployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace.Name, + Name: "deployment", + }, + } + daemonSet := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace.Name, + Name: "daemonset", + }, + } + statefulSet := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace.Name, + Name: "statefulset", + }, + } + otherDeployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "other", + Name: "deployment", + }, + } + namespacedResources := []client.Object{defaultDeployment, daemonSet, statefulSet} + fakeClient := fake.NewFakeClient(namespace, defaultDeployment, daemonSet, statefulSet, otherDeployment) + mutators := NewAnnotationMutators( + fakeClient, + fakeClient, + logr.Logger{}, + cfg, + instrumentation.NewTypeSet(instrumentation.TypeJava), + ) + mutators.MutateAndUpdateAll(context.Background()) + ctx := context.Background() + for _, namespacedResource := range namespacedResources { + assert.NoError(t, fakeClient.Get(ctx, client.ObjectKeyFromObject(namespacedResource), namespacedResource)) + obj := getAnnotationObjectMeta(namespacedResource) + assert.NotNil(t, obj) + annotations := obj.GetAnnotations() + assert.NotNil(t, annotations) + assert.NotEmpty(t, annotations[restartedAtAnnotation]) + } + + // non-configured namespace is not restarted/updated + assert.NoError(t, fakeClient.Get(ctx, client.ObjectKeyFromObject(otherDeployment), otherDeployment)) + obj := getAnnotationObjectMeta(otherDeployment) + assert.NotNil(t, obj) + annotations := obj.GetAnnotations() + assert.Nil(t, annotations) +} + +func getAnnotationObjectMeta(obj client.Object) metav1.Object { + switch o := obj.(type) { + case *corev1.Namespace: + return o.GetObjectMeta() + case *appsv1.Deployment: + return o.Spec.Template.GetObjectMeta() + case *appsv1.DaemonSet: + return o.Spec.Template.GetObjectMeta() + case *appsv1.StatefulSet: + return o.Spec.Template.GetObjectMeta() + default: + return nil + } +} + func TestAnnotationMutators_Deployments(t *testing.T) { testCases := map[string]struct { typeSet instrumentation.TypeSet @@ -152,16 +233,16 @@ func TestAnnotationMutators_Deployments(t *testing.T) { } for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { - var deployments []appv1.Deployment + var deployments []appsv1.Deployment for name, annotations := range testCase.deployments { var namespace string namespace, name, _ = strings.Cut(name, "/") - deployments = append(deployments, appv1.Deployment{ + deployments = append(deployments, appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, Name: name, }, - Spec: appv1.DeploymentSpec{ + Spec: appsv1.DeploymentSpec{ Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: annotations, @@ -171,17 +252,17 @@ func TestAnnotationMutators_Deployments(t *testing.T) { }) } ctx := context.Background() - client := fake.NewClientBuilder().WithLists(&appv1.DeploymentList{Items: deployments}).Build() + fakeClient := fake.NewClientBuilder().WithLists(&appsv1.DeploymentList{Items: deployments}).Build() mutators := NewAnnotationMutators( - client, - client, + fakeClient, + fakeClient, logr.Logger{}, testCase.cfg, testCase.typeSet, ) - mutators.MutateAll(ctx) - gotDeployments := &appv1.DeploymentList{} - require.NoError(t, client.List(ctx, gotDeployments)) + mutators.MutateAndUpdateAll(ctx) + gotDeployments := &appsv1.DeploymentList{} + require.NoError(t, fakeClient.List(ctx, gotDeployments)) for _, gotDeployment := range gotDeployments.Items { name := namespacedName(gotDeployment.GetObjectMeta()) annotations, ok := testCase.want[name] @@ -220,16 +301,16 @@ func TestAnnotationMutators_DaemonSets(t *testing.T) { } for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { - var daemonSets []appv1.DaemonSet + var daemonSets []appsv1.DaemonSet for name, annotations := range testCase.daemonSets { var namespace string namespace, name, _ = strings.Cut(name, "/") - daemonSets = append(daemonSets, appv1.DaemonSet{ + daemonSets = append(daemonSets, appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, Name: name, }, - Spec: appv1.DaemonSetSpec{ + Spec: appsv1.DaemonSetSpec{ Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: annotations, @@ -239,17 +320,17 @@ func TestAnnotationMutators_DaemonSets(t *testing.T) { }) } ctx := context.Background() - client := fake.NewClientBuilder().WithLists(&appv1.DaemonSetList{Items: daemonSets}).Build() + fakeClient := fake.NewClientBuilder().WithLists(&appsv1.DaemonSetList{Items: daemonSets}).Build() mutators := NewAnnotationMutators( - client, - client, + fakeClient, + fakeClient, logr.Logger{}, testCase.cfg, testCase.typeSet, ) - mutators.MutateAll(ctx) - gotDaemonSets := &appv1.DaemonSetList{} - require.NoError(t, client.List(ctx, gotDaemonSets)) + mutators.MutateAndUpdateAll(ctx) + gotDaemonSets := &appsv1.DaemonSetList{} + require.NoError(t, fakeClient.List(ctx, gotDaemonSets)) for _, gotDaemonSet := range gotDaemonSets.Items { name := namespacedName(gotDaemonSet.GetObjectMeta()) annotations, ok := testCase.want[name] @@ -288,16 +369,16 @@ func TestAnnotationMutators_StatefulSets(t *testing.T) { } for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { - var statefulSets []appv1.StatefulSet + var statefulSets []appsv1.StatefulSet for name, annotations := range testCase.statefulSets { var namespace string namespace, name, _ = strings.Cut(name, "/") - statefulSets = append(statefulSets, appv1.StatefulSet{ + statefulSets = append(statefulSets, appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, Name: name, }, - Spec: appv1.StatefulSetSpec{ + Spec: appsv1.StatefulSetSpec{ Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: annotations, @@ -307,17 +388,17 @@ func TestAnnotationMutators_StatefulSets(t *testing.T) { }) } ctx := context.Background() - client := fake.NewClientBuilder().WithLists(&appv1.StatefulSetList{Items: statefulSets}).Build() + fakeClient := fake.NewClientBuilder().WithLists(&appsv1.StatefulSetList{Items: statefulSets}).Build() mutators := NewAnnotationMutators( - client, - client, + fakeClient, + fakeClient, logr.Logger{}, testCase.cfg, testCase.typeSet, ) - mutators.MutateAll(ctx) - gotStatefulSets := &appv1.StatefulSetList{} - require.NoError(t, client.List(ctx, gotStatefulSets)) + mutators.MutateAndUpdateAll(ctx) + gotStatefulSets := &appsv1.StatefulSetList{} + require.NoError(t, fakeClient.List(ctx, gotStatefulSets)) for _, gotStatefulSet := range gotStatefulSets.Items { name := namespacedName(gotStatefulSet.GetObjectMeta()) annotations, ok := testCase.want[name] @@ -328,6 +409,59 @@ func TestAnnotationMutators_StatefulSets(t *testing.T) { } } +type mockClient struct { + mock.Mock + client.Writer + client.Reader +} + +func (c *mockClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + args := c.Called(ctx, list, opts) + if args.Get(0) == nil { + return nil + } + return args.Error(0) +} + +func (c *mockClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + args := c.Called(ctx, obj, opts) + if args.Get(0) == nil { + return nil + } + return args.Error(0) +} + +func TestAnnotationMutators_ClientErrors(t *testing.T) { + err := errors.New("test error") + namespace := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + } + cfg := AnnotationConfig{ + Java: AnnotationResources{ + Namespaces: []string{"test"}, + }, + } + errClient := new(mockClient) + errClient.On("List", mock.Anything, mock.Anything, mock.Anything).Return(err) + errClient.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(err) + fakeClient := fake.NewClientBuilder().WithLists(&corev1.NamespaceList{Items: []corev1.Namespace{namespace}}).Build() + mutators := NewAnnotationMutators( + fakeClient, + errClient, + logr.Logger{}, + cfg, + instrumentation.NewTypeSet(instrumentation.TypeJava), + ) + mutators.MutateAndUpdateAll(context.Background()) + errClient.AssertCalled(t, "List", mock.Anything, mock.Anything, mock.Anything) + mutators.clientWriter = errClient + mutators.clientReader = fakeClient + mutators.MutateAndUpdateAll(context.Background()) + errClient.AssertCalled(t, "Update", mock.Anything, mock.Anything, mock.Anything) +} + func TestAnnotateKey(t *testing.T) { testCases := []struct { instType instrumentation.Type diff --git a/pkg/instrumentation/auto/callback.go b/pkg/instrumentation/auto/callback.go new file mode 100644 index 000000000..a37aa956f --- /dev/null +++ b/pkg/instrumentation/auto/callback.go @@ -0,0 +1,54 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package auto + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type objectCallbackFunc func(client.Object) bool + +func chainCallbacks(fns ...objectCallbackFunc) objectCallbackFunc { + return func(obj client.Object) bool { + for _, fn := range fns { + if !fn(obj) { + return false + } + } + return true + } +} + +func (m *AnnotationMutators) updateFunc(ctx context.Context) objectCallbackFunc { + return func(obj client.Object) bool { + if err := m.clientWriter.Update(ctx, obj); err != nil { + m.logger.Error(err, "Unable to send update", + "kind", fmt.Sprintf("%T", obj), + "name", obj.GetName(), + "namespace", obj.GetNamespace(), + ) + return false + } + return true + } +} + +func (m *AnnotationMutators) restartNamespaceFunc(ctx context.Context) objectCallbackFunc { + restartAndUpdateFunc := chainCallbacks(restart, m.updateFunc(ctx)) + return func(obj client.Object) bool { + namespace, ok := obj.(*corev1.Namespace) + if !ok { + return false + } + m.rangeObjectList(ctx, &appsv1.DeploymentList{}, client.InNamespace(namespace.Name), restartAndUpdateFunc) + m.rangeObjectList(ctx, &appsv1.DaemonSetList{}, client.InNamespace(namespace.Name), restartAndUpdateFunc) + m.rangeObjectList(ctx, &appsv1.StatefulSetList{}, client.InNamespace(namespace.Name), restartAndUpdateFunc) + return true + } +} diff --git a/pkg/instrumentation/auto/restart.go b/pkg/instrumentation/auto/restart.go new file mode 100644 index 000000000..8f29ad231 --- /dev/null +++ b/pkg/instrumentation/auto/restart.go @@ -0,0 +1,44 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package auto + +import ( + "time" + + appsv1 "k8s.io/api/apps/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/aws/amazon-cloudwatch-agent-operator/pkg/instrumentation" +) + +const ( + restartedAtAnnotation = "kubectl.kubernetes.io/restartedAt" +) + +var ( + restartAnnotationMutator = instrumentation.NewAnnotationMutator([]instrumentation.AnnotationMutation{&restartAnnotationMutation{}}) +) + +type restartAnnotationMutation struct { +} + +var _ instrumentation.AnnotationMutation = (*restartAnnotationMutation)(nil) + +func (m restartAnnotationMutation) Mutate(annotations map[string]string) bool { + annotations[restartedAtAnnotation] = time.Now().Format(time.RFC3339) + return true +} + +// restart based on kubectl implementation https://github.com/kubernetes/kubectl/blob/master/pkg/polymorphichelpers/objectrestarter.go#L32 +func restart(obj client.Object) bool { + switch o := obj.(type) { + case *appsv1.Deployment: + restartAnnotationMutator.Mutate(o.Spec.Template.GetObjectMeta()) + case *appsv1.DaemonSet: + restartAnnotationMutator.Mutate(o.Spec.Template.GetObjectMeta()) + case *appsv1.StatefulSet: + restartAnnotationMutator.Mutate(o.Spec.Template.GetObjectMeta()) + } + return true +}