Skip to content

Commit

Permalink
Add restart for namespace workloads.
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien committed Feb 3, 2024
1 parent cf477d2 commit 704981f
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 175 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
go.opentelemetry.io/collector/featuregate v0.77.0
go.opentelemetry.io/otel v1.21.0
go.uber.org/zap v1.25.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.0
k8s.io/apimachinery v0.29.0
Expand Down Expand Up @@ -126,12 +127,12 @@ require (
github.com/prometheus/procfs v0.11.1 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/vultr/govultr/v2 v2.17.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
Expand Down
58 changes: 22 additions & 36 deletions internal/webhook/workloadmutation/webhookhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

Expand All @@ -32,56 +31,43 @@ type WebhookHandler interface {

// the implementation.
type workloadMutationWebhook struct {
client client.Client
decoder *admission.Decoder
logger logr.Logger
config config.Config
annotationMutator *auto.AnnotationMutators
client client.Client
decoder *admission.Decoder
logger logr.Logger
config config.Config
annotationMutators *auto.AnnotationMutators
}

// NewWebhookHandler creates a new WorkloadWebhookHandler.
func NewWebhookHandler(cfg config.Config, logger logr.Logger, decoder *admission.Decoder, cl client.Client, annotationMutation *auto.AnnotationMutators) WebhookHandler {
func NewWebhookHandler(cfg config.Config, logger logr.Logger, decoder *admission.Decoder, cl client.Client, annotationMutators *auto.AnnotationMutators) WebhookHandler {
return &workloadMutationWebhook{
config: cfg,
decoder: decoder,
logger: logger,
client: cl,
annotationMutator: annotationMutation,
config: cfg,
decoder: decoder,
logger: logger,
client: cl,
annotationMutators: annotationMutators,
}
}

func (p *workloadMutationWebhook) Handle(ctx context.Context, req admission.Request) admission.Response {
var err error
var marshaledObject []byte
var object runtime.Object
func (p *workloadMutationWebhook) Handle(_ context.Context, req admission.Request) admission.Response {
var obj client.Object
switch objectKind := req.Kind.Kind; objectKind {
case "DaemonSet":
ds := appsv1.DaemonSet{}
err = p.decoder.Decode(req, &ds)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
object = &ds
obj = &appsv1.DaemonSet{}
case "Deployment":
d := appsv1.Deployment{}
err = p.decoder.Decode(req, &d)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
object = &d
obj = &appsv1.Deployment{}
case "StatefulSet":
ss := appsv1.StatefulSet{}
err = p.decoder.Decode(req, &ss)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
object = &ss
obj = &appsv1.StatefulSet{}
default:
return admission.Errored(http.StatusBadRequest, errors.New("failed to unmarshal request object"))
}

p.annotationMutator.Mutate(object)
marshaledObject, err = json.Marshal(object)
if err := p.decoder.Decode(req, obj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}

p.annotationMutators.MutateObject(obj)
marshaledObject, err := json.Marshal(obj)
if err != nil {
res := admission.Errored(http.StatusInternalServerError, err)
res.Allowed = true
Expand Down
10 changes: 5 additions & 5 deletions internal/webhook/workloadmutation/webhookhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ package workloadmutation
import (
"context"
"encoding/json"
"net/http"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
admv1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"net/http"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down
15 changes: 9 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,14 @@ func main() {

decoder := admission.NewDecoder(mgr.GetScheme())

if os.Getenv("DISABLE_AUTO_ANNOTATION") != "true" {
if os.Getenv("DISABLE_AUTO_ANNOTATION") == "true" || autoAnnotationConfigStr == "" {
setupLog.Info("Auto-annotation is disabled")
} else {
var autoAnnotationConfig auto.AnnotationConfig
if err := json.Unmarshal([]byte(autoAnnotationConfigStr), &autoAnnotationConfig); err != nil {
setupLog.Error(err, "unable to unmarshal auto-annotation config")
if err = json.Unmarshal([]byte(autoAnnotationConfigStr), &autoAnnotationConfig); err != nil {
setupLog.Error(err, "Unable to unmarshal auto-annotation config")
} else {
autoAnnotationMutator := auto.NewAnnotationMutators(
autoAnnotationMutators := auto.NewAnnotationMutators(
mgr.GetClient(),
mgr.GetAPIReader(),
logger,
Expand All @@ -204,8 +206,9 @@ func main() {
),
)
mgr.GetWebhookServer().Register("/mutate-v1-workload", &webhook.Admission{
Handler: workloadmutation.NewWebhookHandler(cfg, ctrl.Log.WithName("workload-webhook"), decoder, mgr.GetClient(), autoAnnotationMutator)})
go autoAnnotationMutator.MutateAll(ctx)
Handler: workloadmutation.NewWebhookHandler(cfg, ctrl.Log.WithName("workload-webhook"), decoder, mgr.GetClient(), autoAnnotationMutators)})
setupLog.Info("Starting auto-annotation")
go autoAnnotationMutators.MutateAndUpdateAll(ctx)
}
}

Expand Down
148 changes: 54 additions & 94 deletions pkg/instrumentation/auto/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/amazon-cloudwatch-agent-operator/pkg/instrumentation"
Expand All @@ -26,9 +25,10 @@ const (

// +kubebuilder:rbac:groups="",resources=namespaces,verbs=list;watch;update

// AnnotationMutators has an AnnotationMutator resource name
// AnnotationMutators contains functions that can be used to mutate annotations
// on all supported objects based on the configured mutators.
type AnnotationMutators struct {
client client.Client
clientWriter client.Writer
clientReader client.Reader
logger logr.Logger
namespaceMutators map[string]instrumentation.AnnotationMutator
Expand All @@ -38,96 +38,19 @@ type AnnotationMutators struct {
defaultMutator instrumentation.AnnotationMutator
}

// MutateAll runs the mutators for each of the configured resources.
func (m *AnnotationMutators) MutateAll(ctx context.Context) {
m.MutateNamespaces(ctx)
m.MutateDeployments(ctx)
m.MutateDaemonSets(ctx)
m.MutateStatefulSets(ctx)
// MutateAndUpdateAll runs the mutators for each of the support resources and updates them.
func (m *AnnotationMutators) MutateAndUpdateAll(ctx context.Context) {
mutateAndUpdateFunc := chainCallbacks(m.MutateObject, m.updateFunc(ctx))
m.rangeObjectList(ctx, &corev1.NamespaceList{}, &client.ListOptions{},
chainCallbacks(mutateAndUpdateFunc, m.restartNamespaceFunc(ctx)),
)
m.rangeObjectList(ctx, &appsv1.DeploymentList{}, &client.ListOptions{}, mutateAndUpdateFunc)
m.rangeObjectList(ctx, &appsv1.DaemonSetList{}, &client.ListOptions{}, mutateAndUpdateFunc)
m.rangeObjectList(ctx, &appsv1.StatefulSetList{}, &client.ListOptions{}, mutateAndUpdateFunc)
}

// MutateNamespaces lists all namespaces and runs MutateNamespace on each.
func (m *AnnotationMutators) MutateNamespaces(ctx context.Context) {
namespaces := &corev1.NamespaceList{}
if err := m.clientReader.List(ctx, namespaces); err != nil {
m.logger.Error(err, "Unable to list namespaces")
return
}

for _, namespace := range namespaces.Items {
if m.Mutate(&namespace) {
if err := m.client.Update(ctx, &namespace); err != nil {
m.logger.Error(err, "Unable to send update",
"kind", namespace.Kind,
"name", namespace.Name,
)
}
}
}
}

// MutateDeployments lists all deployments and runs MutateDeployment on each.
func (m *AnnotationMutators) MutateDeployments(ctx context.Context) {
deployments := &appsv1.DeploymentList{}
if err := m.clientReader.List(ctx, deployments); err != nil {
m.logger.Error(err, "Unable to list deployments")
return
}
for _, deployment := range deployments.Items {
if m.Mutate(&deployment) {
if err := m.client.Update(ctx, &deployment); err != nil {
m.logger.Error(err, "Unable to send update",
"kind", deployment.Kind,
"name", deployment.Name,
"namespace", deployment.Namespace,
)
}
}
}
}

// MutateDaemonSets lists all daemonsets and runs MutateDaemonSet on each.
func (m *AnnotationMutators) MutateDaemonSets(ctx context.Context) {
daemonSets := &appsv1.DaemonSetList{}
if err := m.clientReader.List(ctx, daemonSets); err != nil {
m.logger.Error(err, "Unable to list daemonsets")
return
}
for _, daemonSet := range daemonSets.Items {
if m.Mutate(&daemonSet) {
if err := m.client.Update(ctx, &daemonSet); err != nil {
m.logger.Error(err, "Unable to send update",
"kind", daemonSet.Kind,
"name", daemonSet.Name,
"namespace", daemonSet.Namespace,
)
}
}
}
}

// MutateStatefulSets lists all statefulsets and runs MutateStatefulSet on each.
func (m *AnnotationMutators) MutateStatefulSets(ctx context.Context) {
statefulSets := &appsv1.StatefulSetList{}
if err := m.clientReader.List(ctx, statefulSets); err != nil {
m.logger.Error(err, "Unable to list statefulsets")
return
}
for _, statefulSet := range statefulSets.Items {
if m.Mutate(&statefulSet) {
if err := m.client.Update(ctx, &statefulSet); err != nil {
m.logger.Error(err, "Unable to send update",
"kind", statefulSet.Kind,
"name", statefulSet.Name,
"namespace", statefulSet.Namespace,
)
}
}
}
}

// Mutate modifies annotations for a single object using the configured mutators.
func (m *AnnotationMutators) Mutate(obj runtime.Object) bool {
// MutateObject modifies annotations for a single object using the configured mutators.
func (m *AnnotationMutators) MutateObject(obj client.Object) bool {
switch o := obj.(type) {
case *corev1.Namespace:
return m.mutate(o.GetName(), m.namespaceMutators, o.GetObjectMeta())
Expand All @@ -142,6 +65,33 @@ func (m *AnnotationMutators) Mutate(obj runtime.Object) bool {
}
}

func (m *AnnotationMutators) rangeObjectList(ctx context.Context, list client.ObjectList, option client.ListOption, fn objectCallbackFunc) {
if err := m.clientReader.List(ctx, list, option); err != nil {
m.logger.Error(err, "Unable to list objects",
"kind", fmt.Sprintf("%T", list),
)
return
}
switch l := list.(type) {
case *corev1.NamespaceList:
for _, item := range l.Items {
fn(&item)
}
case *appsv1.DeploymentList:
for _, item := range l.Items {
fn(&item)
}
case *appsv1.DaemonSetList:
for _, item := range l.Items {
fn(&item)
}
case *appsv1.StatefulSetList:
for _, item := range l.Items {
fn(&item)
}
}
}

func (m *AnnotationMutators) mutate(name string, mutators map[string]instrumentation.AnnotationMutator, obj metav1.Object) bool {
mutator, ok := mutators[name]
if !ok {
Expand All @@ -157,10 +107,16 @@ func namespacedName(obj metav1.Object) string {
// NewAnnotationMutators creates mutators based on the AnnotationConfig provided and enabled instrumentation.TypeSet.
// The default mutator, which is used for non-configured resources, removes all auto-annotated annotations in the type
// set.
func NewAnnotationMutators(client client.Client, clientReader client.Reader, logger logr.Logger, cfg AnnotationConfig, typeSet instrumentation.TypeSet) *AnnotationMutators {
func NewAnnotationMutators(
clientWriter client.Writer,
clientReader client.Reader,
logger logr.Logger,
cfg AnnotationConfig,
typeSet instrumentation.TypeSet,
) *AnnotationMutators {
builder := newMutatorBuilder(typeSet)
return &AnnotationMutators{
client: client,
clientWriter: clientWriter,
clientReader: clientReader,
logger: logger,
namespaceMutators: builder.buildMutators(getResources(cfg, typeSet, getNamespaces)),
Expand All @@ -171,7 +127,11 @@ func NewAnnotationMutators(client client.Client, clientReader client.Reader, log
}
}

func getResources(cfg AnnotationConfig, typeSet instrumentation.TypeSet, resourceFn func(AnnotationResources) []string) map[instrumentation.Type][]string {
func getResources(
cfg AnnotationConfig,
typeSet instrumentation.TypeSet,
resourceFn func(AnnotationResources) []string,
) map[instrumentation.Type][]string {
resources := map[instrumentation.Type][]string{}
for instType := range typeSet {
resources[instType] = resourceFn(cfg.getResources(instType))
Expand Down
Loading

0 comments on commit 704981f

Please sign in to comment.