Skip to content

Commit

Permalink
Merge pull request #16 from shomron/namespaced-cache
Browse files Browse the repository at this point in the history
Scope secrets cache to single namespace
  • Loading branch information
maxsmythe authored Nov 18, 2020
2 parents e426f33 + c237280 commit 7df0cff
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 23 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ module github.com/open-policy-agent/cert-controller
go 1.14

require (
github.com/onsi/gomega v1.10.1
github.com/pkg/errors v0.9.1
go.uber.org/atomic v1.4.0
k8s.io/api v0.18.6
k8s.io/apimachinery v0.18.6
k8s.io/client-go v0.18.6
sigs.k8s.io/controller-runtime v0.6.3
)
113 changes: 92 additions & 21 deletions pkg/rotator/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -84,7 +85,20 @@ func (w WebhookInfo) gvk() schema.GroupVersionKind {

// AddRotator adds the CertRotator and ReconcileWH to the manager.
func AddRotator(mgr manager.Manager, cr *CertRotator) error {
cr.client = mgr.GetClient()
if mgr == nil || cr == nil {
return fmt.Errorf("nil arguments")
}
ns := cr.SecretKey.Namespace
if ns == "" {
return fmt.Errorf("invalid namespace for secret")
}
cache, err := addNamespacedCache(mgr, ns)
if err != nil {
return fmt.Errorf("creating namespaced cache: %w", err)
}

cr.reader = cache
cr.writer = mgr.GetClient() // TODO make overrideable
cr.certsMounted = make(chan struct{})
cr.certsNotMounted = make(chan struct{})
cr.wasCAInjected = atomic.NewBool(false)
Expand All @@ -94,7 +108,8 @@ func AddRotator(mgr manager.Manager, cr *CertRotator) error {
}

reconciler := &ReconcileWH{
client: mgr.GetClient(),
cache: cache,
writer: mgr.GetClient(), // TODO
scheme: mgr.GetScheme(),
ctx: context.Background(),
secretKey: cr.SecretKey,
Expand All @@ -107,9 +122,37 @@ func AddRotator(mgr manager.Manager, cr *CertRotator) error {
return nil
}

// addNamespacedCache will add a new namespace-scoped cache.Cache to the provided manager.
// Informers in the new cache will be scoped to the provided namespace for namespaced resources,
// but will still have cluster-wide visibility into cluster-scoped resources.
// The cache will be started by the manager when it starts, and consumers should synchronize on
// it using WaitForCacheSync().
func addNamespacedCache(mgr manager.Manager, namespace string) (cache.Cache, error) {
c, err := cache.New(mgr.GetConfig(),
cache.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
Namespace: namespace,
})
if err != nil {
return nil, err
}
if err := mgr.Add(c); err != nil {
return nil, fmt.Errorf("registering namespaced cache: %w", err)
}
return c, nil
}

// SyncingSource is a reader that needs syncing prior to being usable.
type SyncingReader interface {
client.Reader
WaitForCacheSync(stop <-chan struct{}) bool
}

// CertRotator contains cert artifacts and a channel to close when the certs are ready.
type CertRotator struct {
client client.Client
reader SyncingReader
writer client.Writer
SecretKey types.NamespacedName
CertDir string
CAName string
Expand All @@ -124,7 +167,14 @@ type CertRotator struct {
}

// Start starts the CertRotator runnable to rotate certs and ensure the certs are ready.
func (cr *CertRotator) Start(stop <-chan (struct{})) error {
func (cr *CertRotator) Start(stop <-chan struct{}) error {
if cr.reader == nil {
return errors.New("nil reader")
}
if !cr.reader.WaitForCacheSync(stop) {
return errors.New("failed waiting for reader to sync")
}

// explicitly rotate on the first round so that the certificate
// can be bootstrapped, otherwise manager exits before a cert can be written
crLog.Info("starting cert rotator controller")
Expand Down Expand Up @@ -164,7 +214,7 @@ tickerLoop:
func (cr *CertRotator) refreshCertIfNeeded() error {
refreshFn := func() (bool, error) {
secret := &corev1.Secret{}
if err := cr.client.Get(context.Background(), cr.SecretKey, secret); err != nil {
if err := cr.reader.Get(context.Background(), cr.SecretKey, secret); err != nil {
return false, errors.Wrap(err, "acquiring secret to update certificates")
}
if secret.Data == nil || !cr.validCACert(secret.Data[caCertName], secret.Data[caKeyName]) {
Expand Down Expand Up @@ -289,7 +339,7 @@ func injectCertToConversionWebhook(crd *unstructured.Unstructured, certPem []byt

func (cr *CertRotator) writeSecret(cert, key []byte, caArtifacts *KeyPairArtifacts, secret *corev1.Secret) error {
populateSecret(cert, key, caArtifacts, secret)
return cr.client.Update(context.Background(), secret)
return cr.writer.Update(context.Background(), secret)
}

// KeyPairArtifacts stores cert artifacts.
Expand Down Expand Up @@ -529,18 +579,32 @@ func (m *mapper) Map(object handler.MapObject) []reconcile.Request {
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func addController(mgr manager.Manager, r *ReconcileWH) error {
// Create a new controller
builder := ctrl.NewControllerManagedBy(mgr).For(&corev1.Secret{})
c, err := controller.New("cert-rotator", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

err = c.Watch(
source.NewKindWithCache(&corev1.Secret{}, r.cache),
&handler.EnqueueRequestForObject{},
)
if err != nil {
return fmt.Errorf("watching Secrets: %w", err)
}

for _, webhook := range r.webhooks {
wh := &unstructured.Unstructured{}
wh.SetGroupVersionKind(webhook.gvk())
builder = builder.Watches(&source.Kind{Type: wh}, &handler.EnqueueRequestsFromMapFunc{ToRequests: &mapper{
secretKey: r.secretKey,
whKey: types.NamespacedName{Name: webhook.Name},
}})
}
err := builder.Complete(r)
if err != nil {
return err
err = c.Watch(
source.NewKindWithCache(wh, r.cache),
&handler.EnqueueRequestsFromMapFunc{ToRequests: &mapper{
secretKey: r.secretKey,
whKey: types.NamespacedName{Name: webhook.Name},
}},
)
if err != nil {
return fmt.Errorf("watching webhook %s: %w", webhook.Name, err)
}
}

return nil
Expand All @@ -551,7 +615,8 @@ var _ reconcile.Reconciler = &ReconcileWH{}
// ReconcileWH reconciles a validatingwebhookconfiguration, making sure it
// has the appropriate CA cert
type ReconcileWH struct {
client client.Client
writer client.Writer
cache cache.Cache
scheme *runtime.Scheme
ctx context.Context
secretKey types.NamespacedName
Expand All @@ -565,8 +630,14 @@ func (r *ReconcileWH) Reconcile(request reconcile.Request) (reconcile.Result, er
if request.NamespacedName != r.secretKey {
return reconcile.Result{}, nil
}

stop := make(<-chan struct{})
if !r.cache.WaitForCacheSync(stop) {
return reconcile.Result{}, errors.New("cache not ready")
}

secret := &corev1.Secret{}
if err := r.client.Get(r.ctx, request.NamespacedName, secret); err != nil {
if err := r.cache.Get(r.ctx, request.NamespacedName, secret); err != nil {
if k8sErrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
Expand Down Expand Up @@ -613,7 +684,7 @@ func (r *ReconcileWH) ensureCerts(certPem []byte) error {
log := crLog.WithValues("name", webhook.Name, "gvk", gvk)
updatedResource := &unstructured.Unstructured{}
updatedResource.SetGroupVersionKind(gvk)
if err := r.client.Get(r.ctx, types.NamespacedName{Name: webhook.Name}, updatedResource); err != nil {
if err := r.cache.Get(r.ctx, types.NamespacedName{Name: webhook.Name}, updatedResource); err != nil {
if k8sErrors.IsNotFound(err) {
log.Error(err, "Webhook not found. Unable to update certificate.")
continue
Expand All @@ -633,7 +704,7 @@ func (r *ReconcileWH) ensureCerts(certPem []byte) error {
anyError = err
continue
}
if err := r.client.Update(r.ctx, updatedResource); err != nil {
if err := r.writer.Update(r.ctx, updatedResource); err != nil {
log.Error(err, "Error updating webhook with certificate")
anyError = err
continue
Expand Down Expand Up @@ -682,6 +753,6 @@ func (cr *CertRotator) ensureReady() {
close(cr.caNotInjected)
return
}
crLog.Info(fmt.Sprintf("CA certs are injected to webhooks"))
crLog.Info("CA certs are injected to webhooks")
close(cr.IsReady)
}
44 changes: 44 additions & 0 deletions pkg/rotator/rotator_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package rotator

import (
"fmt"
"log"
"os"
"sync"
"testing"

"github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var cfg *rest.Config

// TestMain runs before package tests and starts a local apiserver instance.
func TestMain(m *testing.M) {
t := &envtest.Environment{}

var err error
if cfg, err = t.Start(); err != nil {
log.Fatal(err)
}

code := m.Run()
if err := t.Stop(); err != nil {
log.Fatal(fmt.Errorf("shutting down: %w", err))
}
os.Exit(code)
}

// StartTestManager adds recFn
func StartTestManager(mgr manager.Manager, g *gomega.GomegaWithT) (chan struct{}, *sync.WaitGroup) {
stop := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
g.Expect(mgr.Start(stop)).NotTo(gomega.HaveOccurred())
}()
return stop, wg
}
Loading

0 comments on commit 7df0cff

Please sign in to comment.