Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
pohly committed Jul 6, 2020
1 parent 9a52398 commit e13dcaf
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 198 deletions.
2 changes: 1 addition & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,9 @@ func main() {
// TODO: metrics for the queue?!
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
*controller,
namespace,
topologyInformer,
factory.Storage().V1().StorageClasses(),
-1, /* let API server generate names */
)
}

Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ github.com/containerd/ttrpc v1.0.0/go.mod h1:PvCDdDGpgqzQIzDW1TphrGLssLDZp2GuS+X
github.com/containerd/typeurl v0.0.0-20180627222232-a93fcdb778cd/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc=
github.com/containerd/typeurl v1.0.0/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc=
github.com/containernetworking/cni v0.7.1/go.mod h1:LGwApLUm2FpoOfxTDEeq8T9ipbpZ61X79hmU3w8FmsY=
github.com/containernetworking/cni v0.8.0/go.mod h1:LGwApLUm2FpoOfxTDEeq8T9ipbpZ61X79hmU3w8FmsY=
github.com/coredns/corefile-migration v1.0.8/go.mod h1:OFwBp/Wc9dJt5cAZzHWMNhK1r5L0p0jDwIBc6j8NC8E=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down Expand Up @@ -176,6 +177,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v0.1.0 h1:M1Tv3VzNlEHg6uyACnRdtrploV2P7wZqH8BoQMtz0cg=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0 h1:QvGt2nLcHH0WK9orKa+ppBPAxREcH364nPUedEpK0TY=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI=
github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
github.com/go-openapi/analysis v0.18.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
Expand Down Expand Up @@ -847,6 +850,8 @@ k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.1.0 h1:X3+Mru/L3jy4BI4vcAYkHvL6PyU+QBsuhEqwlI4mgkA=
k8s.io/klog/v2 v2.1.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0 h1:XRvcwJozkgZ1UQJmfMGpvRthQHOvihEhYtDfAaxMz/A=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/kube-aggregator v0.19.0-beta.2/go.mod h1:CdbKxeXJJQNMWL8ZeZyBHORXFNLKlQT2BhgM7M2vk78=
k8s.io/kube-controller-manager v0.19.0-beta.2/go.mod h1:gEPC5sAWKdjiw8aNnrCi7Np4H3q5vKK//m5ZNwbilMA=
k8s.io/kube-openapi v0.0.0-20200427153329-656914f816f9 h1:5NC2ITmvg8RoxoH0wgmL4zn4VZqXGsKbxrikjaQx6s4=
Expand Down
25 changes: 9 additions & 16 deletions pkg/capacity/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ type Controller struct {
client kubernetes.Interface
queue workqueue.RateLimitingInterface
owner metav1.OwnerReference
ownerNamespace string
topologyInformer topology.Informer
scInformer storageinformersv1.StorageClassInformer
enumerateObjects int

// capacities contains one entry for each object that is supposed
// to exist.
Expand Down Expand Up @@ -114,20 +114,20 @@ func NewCentralCapacityController(
client kubernetes.Interface,
queue workqueue.RateLimitingInterface,
owner metav1.OwnerReference,
ownerNamespace string,
topologyInformer topology.Informer,
scInformer storageinformersv1.StorageClassInformer,
enumerateObjects int,
) *Controller {
c := &Controller{
csiController: csiController,
driverName: driverName,
client: client,
queue: queue,
owner: owner,
ownerNamespace: ownerNamespace,
topologyInformer: topologyInformer,
scInformer: scInformer,
capacities: map[workItem]*storagev1alpha1.CSIStorageCapacity{},
enumerateObjects: enumerateObjects,
}

// Now register for changes. Depending on the implementation of the informers,
Expand Down Expand Up @@ -398,16 +398,16 @@ func (c *Controller) syncCapacity(ctx context.Context, item workItem) error {
capacity.Capacity = quantity
var err error
klog.V(5).Infof("Capacity Controller: updating %s for %+v, new capacity %v", capacity.Name, item, quantity)
capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities().Update(ctx, capacity, metav1.UpdateOptions{})
capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities(capacity.Namespace).Update(ctx, capacity, metav1.UpdateOptions{})
if err != nil && apierrs.IsConflict(err) {
// Handle the case where we had a stale copy of the object. Can only happen
// when someone else was making changes to it, which should be rare.
capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities().Get(ctx, capacity.Name, metav1.GetOptions{})
capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities(capacity.Namespace).Get(ctx, capacity.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("getting fresh copy of CSIStorageCapacity for %+v: %v", item, err)
}
capacity.Capacity = quantity
capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities().Update(ctx, capacity, metav1.UpdateOptions{})
capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities(capacity.Namespace).Update(ctx, capacity, metav1.UpdateOptions{})
}
if err != nil {
return fmt.Errorf("update CSIStorageCapacity for %+v: %v", item, err)
Expand All @@ -423,16 +423,9 @@ func (c *Controller) syncCapacity(ctx context.Context, item workItem) error {
NodeTopology: item.segment.GetLabelSelector(),
Capacity: quantity,
}
if c.enumerateObjects >= 0 {
// Workaround for testing with a fake client: it doesn't
// set the name, so we have to make up something ourselves.
c.enumerateObjects++
capacity.Name = fmt.Sprintf("csisc-test-%d", c.enumerateObjects)
capacity.GenerateName = ""
}
var err error
klog.V(5).Infof("Capacity Controller: creating new object for %+v, new capacity %v", item, quantity)
capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities().Create(ctx, capacity, metav1.CreateOptions{})
capacity, err = c.client.StorageV1alpha1().CSIStorageCapacities(c.ownerNamespace).Create(ctx, capacity, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create CSIStorageCapacity for %+v: %v", item, err)
}
Expand All @@ -455,12 +448,12 @@ func (c *Controller) syncCapacity(ctx context.Context, item workItem) error {

func (c *Controller) deleteCapacity(ctx context.Context, capacity *storagev1alpha1.CSIStorageCapacity) error {
klog.V(5).Infof("Capacity Controller: removing CSIStorageCapacity %s", capacity.Name)
return c.client.StorageV1alpha1().CSIStorageCapacities().Delete(ctx, capacity.Name, metav1.DeleteOptions{})
return c.client.StorageV1alpha1().CSIStorageCapacities(capacity.Namespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{})
}

func (c *Controller) syncCSIStorageObjects(ctx context.Context) error {
klog.V(3).Infof("Capacity Controller: syncing CSIStorageCapacity objects")
capacities, err := c.client.StorageV1alpha1().CSIStorageCapacities().List(ctx, metav1.ListOptions{})
capacities, err := c.client.StorageV1alpha1().CSIStorageCapacities(c.ownerNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
Expand Down
97 changes: 79 additions & 18 deletions pkg/capacity/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import (
storagev1alpha1 "k8s.io/api/storage/v1alpha1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
krand "k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
fakeclientset "k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
Expand All @@ -44,7 +47,8 @@ func init() {
}

const (
driverName = "test-driver"
driverName = "test-driver"
ownerNamespace = "testns"
)

var (
Expand Down Expand Up @@ -153,36 +157,34 @@ func TestController(t *testing.T) {
// TODO: multiple segments, multiple classes, both
// TODO: remove stale objects
// TODO: update tests - remove segment, remove class
// TODO: reuse existing CSIStorageClasses - must check that the exact same objects are used, not just something semantically equivalent!
// TODO: check that modifications by others are reverted (deleting an object, adding one, modifying capacity, modifying owner)
}

for name, tc := range testcases {
// Not run in parallel. That doesn't work well in combination with global logging.
t.Run(name, func(t *testing.T) {
// There is no good way to shut down the controller. It spawns
// various goroutines and some of them (in particular shared informer)
// become very unhappy ("close on close channel") when using a context
// become very unhappy ("close on closed channel") when using a context
// that gets cancelled. Therefore we just keep everything running.
ctx := context.Background()
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

var objects []runtime.Object
objects = append(objects, makeCapacities(tc.initialCapacities)...)
objects = append(objects, makeSCs(tc.initialSCs)...)
clientSet := fakeclientset.NewSimpleClientset(objects...)
clientSet.PrependReactor("create", "csistoragecapacity", generateNameReactor)
c := fakeController(ctx, clientSet, &tc.storage, &tc.topology)

// We don't know when the controller is in a quiesence state,
// we can only give it some time and then check.
// TODO (?): use some mock queue and process items until the queue is empty?
go c.Run(ctx, 1)
time.Sleep(time.Second)

c.prepare(ctx)
if err := process(ctx, c); err != nil {
t.Fatalf("unexpected error: %v", err)
}
matched := map[testCapacity]bool{}
for _, expected := range tc.expectedCapacities {
matched[expected] = false
}
actualCapacities, err := clientSet.StorageV1alpha1().CSIStorageCapacities().List(ctx, metav1.ListOptions{})
actualCapacities, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -216,6 +218,16 @@ func TestController(t *testing.T) {

}

// generateNameReactor implements the logic required for the GenerateName field to work when using
// the fake client. Add it with client.PrependReactor to your fake client.
func generateNameReactor(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
s := action.(ktesting.CreateAction).GetObject().(*storagev1alpha1.CSIStorageCapacity)
if s.Name == "" && s.GenerateName != "" {
s.Name = fmt.Sprintf("%s-%s", s.GenerateName, krand.String(16))
}
return false, nil, nil
}

func fakeController(ctx context.Context, client *fakeclientset.Clientset, storage CSICapacityClient, topologyInformer topology.Informer) *Controller {
utilruntime.ReallyCrash = false // avoids os.Exit after "close of closed channel" in shared informer code

Expand All @@ -227,19 +239,68 @@ func fakeController(ctx context.Context, client *fakeclientset.Clientset, storag
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 2*time.Second)
queue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "items")

// Not needed?
// informerFactory.WaitForCacheSync(ctx.Done())
// go informerFactory.Start(ctx.Done())

return NewCentralCapacityController(
c := NewCentralCapacityController(
storage,
driverName,
client,
queue,
owner,
ownerNamespace,
topologyInformer,
scInformer,
0 /* enumerate objects */)
)

// This ensures that the informers are running and up-to-date.
go informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())

return c
}

// process handles work items until the queue is empty and the informers are synced.
func process(ctx context.Context, c *Controller) error {
for {
if c.queue.Len() == 0 {
done, err := storageClassesSynced(ctx, c)
if err != nil {
return fmt.Errorf("check storage classes: %v", err)
}
if done {
return nil
}
}
// There's no atomic "try to get a work item". Let's
// check one more time before potentially blocking
// in c.queue.Get().
if c.queue.Len() > 0 {
c.processNextWorkItem(ctx)
}
}
}

func storageClassesSynced(ctx context.Context, c *Controller) (bool, error) {
actualStorageClasses, err := c.client.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
if err != nil {
return false, err
}
informerStorageClasses, err := c.scInformer.Lister().List(labels.Everything())
if len(informerStorageClasses) != len(actualStorageClasses.Items) {
return false, nil
}
if len(informerStorageClasses) > 0 && !func() bool {
for _, actualStorageClass := range actualStorageClasses.Items {
for _, informerStorageClass := range informerStorageClasses {
if reflect.DeepEqual(actualStorageClass, *informerStorageClass) {
return true
}
}
}
return false
}() {
return false, nil
}

return true, nil
}

const (
Expand Down
Loading

0 comments on commit e13dcaf

Please sign in to comment.