Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
🌱 Add ssh cache metrics
Browse files Browse the repository at this point in the history
cahillsf committed Jan 2, 2025
1 parent 3da71a8 commit 33017f9
Showing 5 changed files with 83 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -54,14 +54,14 @@ func dryRunSSAPatch(ctx context.Context, dryRunCtx *dryRunSSAPatchInput) (bool,
// The identifier consists of: gvk, namespace, name and resourceVersion of originalUnstructured
// and a hash of modifiedUnstructured.
// This ensures that we re-run the request as soon as either original or modified changes.
requestIdentifier, err := ssa.ComputeRequestIdentifier(dryRunCtx.client.Scheme(), dryRunCtx.originalUnstructured, dryRunCtx.modifiedUnstructured)
requestIdentifier, kind, err := ssa.ComputeRequestIdentifier(dryRunCtx.client.Scheme(), dryRunCtx.originalUnstructured, dryRunCtx.modifiedUnstructured)
if err != nil {
return false, false, nil, err
}

// Check if we already ran this request before by checking if the cache already contains this identifier.
// Note: We only add an identifier to the cache if the result of the dry run was no diff.
if exists := dryRunCtx.ssaCache.Has(requestIdentifier); exists {
if exists := dryRunCtx.ssaCache.Has(requestIdentifier, kind); exists {
return false, false, nil, nil
}

40 changes: 33 additions & 7 deletions internal/util/ssa/cache.go
Original file line number Diff line number Diff line change
@@ -39,6 +39,11 @@ const (
expirationInterval = 10 * time.Hour
)

var boolToStatus = map[bool]string{
true: "hit",
false: "miss",
}

// Cache caches SSA request results.
// Specifically we only use it to cache that a certain request
// doesn't have to be repeated anymore because there was no diff.
@@ -49,16 +54,21 @@ type Cache interface {

// Has checks if the given key (still) exists in the Cache.
// Note: keys expire after the ttl.
Has(key string) bool
Has(key, groupKind string) bool
}

// NewCache creates a new cache.
func NewCache() Cache {
func NewCache(opts ...NewCacheOption) Cache {
config := &newCacheConfig{}
for _, opt := range opts {
opt(config)
}
r := &ssaCache{
Store: cache.NewTTLStore(func(obj interface{}) (string, error) {
// We only add strings to the cache, so it's safe to cast to string.
return obj.(string), nil
}, ttl),
newCacheConfig: *config,
}
go func() {
for {
@@ -73,8 +83,23 @@ func NewCache() Cache {
return r
}

type newCacheConfig struct {
owner string
}

// NewCacheOption is a configuration option supplied to NewCache.
type NewCacheOption func(*newCacheConfig)

// WithOwner allows definition of the owner field to be used in NewCache.
func WithOwner(owner string) NewCacheOption {
return func(c *newCacheConfig) {
c.owner = owner
}
}

type ssaCache struct {
cache.Store
newCacheConfig
}

// Add adds the given key to the Cache.
@@ -88,9 +113,10 @@ func (r *ssaCache) Add(key string) {

// Has checks if the given key (still) exists in the Cache.
// Note: keys expire after the ttl.
func (r *ssaCache) Has(key string) bool {
func (r *ssaCache) Has(key, kind string) bool {
// Note: We can ignore the error here because GetByKey never returns an error.
_, exists, _ := r.Store.GetByKey(key)
requestTotal.WithLabelValues(boolToStatus[exists], kind, r.newCacheConfig.owner).Inc()
return exists
}

@@ -99,16 +125,16 @@ func (r *ssaCache) Has(key string) bool {
// once we found out that it would not produce a diff.
// The identifier consists of: gvk, namespace, name and resourceVersion of the original object and a hash of the modified
// object. This ensures that we re-run the request as soon as either original or modified changes.
func ComputeRequestIdentifier(scheme *runtime.Scheme, original, modified client.Object) (string, error) {
func ComputeRequestIdentifier(scheme *runtime.Scheme, original, modified client.Object) (id, kind string, err error) {
modifiedObjectHash, err := hash.Compute(modified)
if err != nil {
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for modified object")
return "", "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for modified object")
}

gvk, err := apiutil.GVKForObject(original, scheme)
if err != nil {
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of original object %s", klog.KObj(original))
return "", "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of original object %s", klog.KObj(original))
}

return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(original), original.GetResourceVersion(), modifiedObjectHash), nil
return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(original), original.GetResourceVersion(), modifiedObjectHash), gvk.GroupKind().Kind, nil
}
37 changes: 37 additions & 0 deletions internal/util/ssa/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ssa

import (
"github.com/prometheus/client_golang/prometheus"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

func init() {
// Register the metrics at the controller-runtime metrics registry.
ctrlmetrics.Registry.MustRegister(requestTotal)
}

var (
// requestTotal reports request results.
requestTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "capi_ssa_cache_request_total",
Help: "Total number of ssa cache hit and miss requests.",
}, []string{
"status", "kind", "cache_owner",
})
)
6 changes: 3 additions & 3 deletions internal/util/ssa/patch.go
Original file line number Diff line number Diff line change
@@ -81,14 +81,14 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c
return errors.Wrapf(err, "failed to apply object: failed to get GroupVersionKind of modified object %s", klog.KObj(modifiedUnstructured))
}

var requestIdentifier string
var requestIdentifier, kind string
if options.WithCachingProxy {
// Check if the request is cached.
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original, modifiedUnstructured)
requestIdentifier, kind, err = ComputeRequestIdentifier(c.Scheme(), options.Original, modifiedUnstructured)
if err != nil {
return errors.Wrapf(err, "failed to apply object")
}
if options.Cache.Has(requestIdentifier) {
if options.Cache.Has(requestIdentifier, kind) {
// If the request is cached return the original object.
if err := c.Scheme().Convert(options.Original, modified, ctx); err != nil {
return errors.Wrapf(err, "failed to write original into modified object")
16 changes: 8 additions & 8 deletions internal/util/ssa/patch_test.go
Original file line number Diff line number Diff line change
@@ -63,12 +63,12 @@ func TestPatch(t *testing.T) {
// Compute request identifier, so we can later verify that the update call was not cached.
modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject)
g.Expect(err).ToNot(HaveOccurred())
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
requestIdentifier, kind, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
g.Expect(err).ToNot(HaveOccurred())
// Update the object
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
// Verify that request was not cached (as it changed the object)
g.Expect(ssaCache.Has(requestIdentifier)).To(BeFalse())
g.Expect(ssaCache.Has(requestIdentifier, kind)).To(BeFalse())

// 3. Repeat the same update and verify that the request was cached as the object was not changed.
// Get the original object.
@@ -80,12 +80,12 @@ func TestPatch(t *testing.T) {
// Compute request identifier, so we can later verify that the update call was cached.
modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject)
g.Expect(err).ToNot(HaveOccurred())
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
requestIdentifier, kind, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
g.Expect(err).ToNot(HaveOccurred())
// Update the object
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
// Verify that request was cached (as it did not change the object)
g.Expect(ssaCache.Has(requestIdentifier)).To(BeTrue())
g.Expect(ssaCache.Has(requestIdentifier, kind)).To(BeTrue())
})

t.Run("Test patch with Machine", func(*testing.T) {
@@ -138,14 +138,14 @@ func TestPatch(t *testing.T) {
// Compute request identifier, so we can later verify that the update call was not cached.
modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject)
g.Expect(err).ToNot(HaveOccurred())
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
requestIdentifier, kind, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
g.Expect(err).ToNot(HaveOccurred())
// Update the object
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
// Verify that gvk is still set
g.Expect(modifiedObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind()))
// Verify that request was not cached (as it changed the object)
g.Expect(ssaCache.Has(requestIdentifier)).To(BeFalse())
g.Expect(ssaCache.Has(requestIdentifier, kind)).To(BeFalse())

// Wait for 1 second. We are also trying to verify in this test that the resourceVersion of the Machine
// is not increased. Under some circumstances this would only happen if the timestamp in managedFields would
@@ -165,12 +165,12 @@ func TestPatch(t *testing.T) {
// Compute request identifier, so we can later verify that the update call was cached.
modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject)
g.Expect(err).ToNot(HaveOccurred())
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
requestIdentifier, kind, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
g.Expect(err).ToNot(HaveOccurred())
// Update the object
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
// Verify that request was cached (as it did not change the object)
g.Expect(ssaCache.Has(requestIdentifier)).To(BeTrue())
g.Expect(ssaCache.Has(requestIdentifier, kind)).To(BeTrue())
// Verify that gvk is still set
g.Expect(modifiedObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind()))
})

0 comments on commit 33017f9

Please sign in to comment.