diff --git a/pkg/controller/csi_handler_test.go b/pkg/controller/csi_handler_test.go index 3238b843d..c0a3b9ef8 100644 --- a/pkg/controller/csi_handler_test.go +++ b/pkg/controller/csi_handler_test.go @@ -937,12 +937,12 @@ func TestCSIHandler(t *testing.T) { initialObjects: []runtime.Object{pvWithFinalizer(), csiNode()}, addedVA: deleted(va(true, fin, ann)), expectedActions: []core.Action{ + core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, + types.MergePatchType, patch(deleted(va(true, fin, ann)), + deleted(va(true /*attached*/, "", ann)))), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch(deleted(va(true, "", ann)), deleted(va(false /*attached*/, "", ann))), "status"), - core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(va(false, fin, ann)), - deleted(va(false /*attached*/, "", ann)))), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0}, @@ -953,12 +953,12 @@ func TestCSIHandler(t *testing.T) { initialObjects: []runtime.Object{csiNode()}, addedVA: deleted(vaWithInlineSpec(va(true, fin, ann))), expectedActions: []core.Action{ + core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, + types.MergePatchType, patch(deleted(vaWithInlineSpec(va(true, fin, ann))), + deleted(vaWithInlineSpec(va(true /*attached*/, "", ann))))), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch(deleted(vaWithInlineSpec(va(true, "", ann))), deleted(vaWithInlineSpec(va(false /*attached*/, "", ann)))), "status"), - core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(vaWithInlineSpec(va(false, fin, ann))), - deleted(vaWithInlineSpec(va(false /*attached*/, "", ann))))), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0}, @@ -970,12 +970,12 @@ func TestCSIHandler(t *testing.T) { addedVA: deleted(va(true, fin, ann)), expectedActions: []core.Action{ core.NewGetAction(secretGroupResourceVersion, "default", "secret"), + core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, + types.MergePatchType, patch(deleted(vaWithInlineSpec(va(true, fin, ann))), + deleted(vaWithInlineSpec(va(true /*attached*/, "", ann))))), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch(deleted(va(true, "", ann)), deleted(va(false /*attached*/, "", ann))), "status"), - core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(vaWithInlineSpec(va(false, fin, ann))), - deleted(vaWithInlineSpec(va(false /*attached*/, "", ann))))), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, readWrite, success, ignored, noMetadata, 0}, @@ -987,14 +987,14 @@ func TestCSIHandler(t *testing.T) { addedVA: deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(true, fin, ann)), "secret")), expectedActions: []core.Action{ core.NewGetAction(secretGroupResourceVersion, "default", "secret"), + core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, + types.MergePatchType, patch(deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(true, fin, ann)), "secret")), + deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(true /*attached*/, "", ann)), + "secret"))), ""), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch(deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(true, "", ann)), "secret")), deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(false /*attached*/, "", ann)), "secret"))), "status"), - core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(false, fin, ann)), "secret")), - deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(false /*attached*/, "", ann)), - "secret"))), ""), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, readWrite, success, ignored, noMetadata, 0}, @@ -1006,12 +1006,12 @@ func TestCSIHandler(t *testing.T) { addedVA: deleted(va(true, fin, ann)), expectedActions: []core.Action{ core.NewGetAction(secretGroupResourceVersion, "default", "emptySecret"), + core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, + types.MergePatchType, patch(deleted(va(true, fin, ann)), + deleted(va(true /*attached*/, "", ann)))), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch(deleted(va(true, "", ann)), deleted(va(false /*attached*/, "", ann))), "status"), - core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(va(false, fin, ann)), - deleted(va(false /*attached*/, "", ann)))), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, readWrite, success, ignored, noMetadata, 0}, @@ -1025,14 +1025,14 @@ func TestCSIHandler(t *testing.T) { core.NewGetAction(secretGroupResourceVersion, "default", "emptySecret"), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(true, "", ann)), "emptySecret")), - deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(false /*attached*/, "", ann)), - "emptySecret"))), "status"), + types.MergePatchType, patch(deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(true, fin, ann)), "emptySecret")), + deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(true /*attached*/, "", ann)), + "emptySecret"))), ""), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(false, fin, ann)), "emptySecret")), + types.MergePatchType, patch(deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(true, "", ann)), "emptySecret")), deleted(vaInlineSpecWithSecret(vaWithInlineSpec(va(false /*attached*/, "", ann)), - "emptySecret"))), ""), + "emptySecret"))), "status"), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, readWrite, success, ignored, noMetadata, 0}, @@ -1062,12 +1062,12 @@ func TestCSIHandler(t *testing.T) { testPVName+"-"+testNodeName, types.MergePatchType, patch(deleted(va(true, "", ann)), deleted(vaWithDetachError(va(true, "", ann), "mock error"))), "status"), + core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, + types.MergePatchType, patch(deleted(va(true, fin, ann)), + deleted(va(true, "", ann)))), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(va(true, "", ann)), + types.MergePatchType, patch(deleted(vaWithDetachError(va(true, "", ann), "mock error")), deleted(va(false, "", ann))), "status"), - core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(va(false, fin, ann)), - deleted(va(false, "", ann)))), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, fmt.Errorf("mock error"), ignored, noMetadata, 0}, @@ -1083,12 +1083,12 @@ func TestCSIHandler(t *testing.T) { testPVName+"-"+testNodeName, types.MergePatchType, patch(deleted(va(true, "", ann)), deleted(vaWithDetachError(va(true, "", ann), "context deadline exceeded"))), "status"), + core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, + types.MergePatchType, patch(deleted(va(true, fin, ann)), + deleted(va(true /*attached*/, "", ann)))), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(va(true, "", ann)), + types.MergePatchType, patch(deleted(vaWithDetachError(va(true, "", ann), "context deadline exceeded")), deleted(va(false /*attached*/, "", ann))), "status"), - core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, - types.MergePatchType, patch(deleted(va(false, fin, ann)), - deleted(va(false /*attached*/, "", ann)))), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 500 * time.Millisecond}, @@ -1207,23 +1207,23 @@ func TestCSIHandler(t *testing.T) { expectedActions: []core.Action{ core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch( - deleted(va(true, "", map[string]string{vaNodeIDAnnotation: "annotatedNodeID"})), - deleted(va(false /*attached*/, "", - map[string]string{vaNodeIDAnnotation: "annotatedNodeID"}))), "status"), + deleted(va(true, fin, map[string]string{vaNodeIDAnnotation: "annotatedNodeID"})), + deleted(va(true /*attached*/, "", + map[string]string{vaNodeIDAnnotation: "annotatedNodeID"}))), ""), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch( - deleted(va(false, fin, map[string]string{vaNodeIDAnnotation: "annotatedNodeID"})), + deleted(va(true, "", map[string]string{vaNodeIDAnnotation: "annotatedNodeID"})), deleted(va(false /*attached*/, "", - map[string]string{vaNodeIDAnnotation: "annotatedNodeID"}))), ""), + map[string]string{vaNodeIDAnnotation: "annotatedNodeID"}))), "status"), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, "annotatedNodeID", noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0}, }, }, { - name: "failed write with attached=false -> controller retries", + name: "failed write with finalizer removal -> controller retries", initialObjects: []runtime.Object{pvWithFinalizer(), csiNode()}, - addedVA: deleted(va(false, fin, ann)), + addedVA: deleted(va(true, fin, ann)), reactors: []reaction{ { verb: "patch", @@ -1242,28 +1242,29 @@ func TestCSIHandler(t *testing.T) { }, expectedActions: []core.Action{ // This fails - core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, - testPVName+"-"+testNodeName, + core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch( - deleted(va(false, "", ann)), - deleted(va(false, "", ann))), "status"), + deleted(va(true, fin, ann)), + deleted(va(true, "", ann))), ""), // Saving error succeeds core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch( - deleted(va(false, "", ann)), - vaWithDetachError(deleted(va(false, "", ann)), + deleted(va(true, fin, ann)), + vaWithDetachError(deleted(va(true, fin, ann)), "could not mark as detached: volumeattachments.storage.k8s."+ "io \"pv1-node1\" is forbidden: mock error")), "status"), // Second save of attached=false succeeds and the finalizer is subsequently deleted. core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch( - deleted(va(false, "", ann)), - deleted(va(false, "", ann))), "status"), + deleted(va(true, fin, ann)), + deleted(va(true, "", ann))), ""), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch( - deleted(va(false, fin, ann)), - deleted(va(false, "", ann))), ""), + vaWithDetachError(deleted(va(true, "", ann)), + "could not mark as detached: volumeattachments.storage.k8s."+ + "io \"pv1-node1\" is forbidden: mock error"), + deleted(va(false, "", ann))), "status"), }, expectedCSICalls: []csiCall{ {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0}, @@ -1275,15 +1276,14 @@ func TestCSIHandler(t *testing.T) { initialObjects: []runtime.Object{gcePDPVWithFinalizer(), csiNode()}, addedVA: deleted(va(true /*attached*/, fin /*finalizer*/, ann)), expectedActions: []core.Action{ - // Finalizer is saved first core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch( - deleted(va(true, "", ann)), - deleted(va(false /*attached*/, "", ann))), "status"), + deleted(va(true, fin, ann)), + deleted(va(true /*attached*/, "", ann))), ""), core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName, types.MergePatchType, patch( - deleted(va(false, fin, ann)), - deleted(va(false /*attached*/, "", ann))), ""), + deleted(va(true, "", ann)), + deleted(va(false /*attached*/, "", ann))), "status"), }, expectedCSICalls: []csiCall{ {"detach", "projects/UNSPECIFIED/zones/testZone/disks/testpd", testNodeID, diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index 0d0515cd5..d9961813a 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -108,212 +108,214 @@ type handlerFactory func(client kubernetes.Interface, informerFactory informers. func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) { for _, test := range tests { - logger, ctx := ktesting.NewTestContext(t) - logger = klog.LoggerWithValues(logger, "test", test.name) - ctx = klog.NewContext(ctx, logger) - logger.Info("Starting test") - objs := test.initialObjects - if test.addedVA != nil { - objs = append(objs, test.addedVA) - } - if test.updatedVA != nil { - objs = append(objs, test.updatedVA) - } - if test.updatedPV != nil { - objs = append(objs, test.updatedPV) - } - - coreObjs := []runtime.Object{} - for _, obj := range objs { - switch obj.(type) { - case *storage.CSINode: - default: - coreObjs = append(coreObjs, obj) + t.Run(test.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + logger = klog.LoggerWithValues(logger, "test", test.name) + ctx = klog.NewContext(ctx, logger) + logger.Info("Starting test") + objs := test.initialObjects + if test.addedVA != nil { + objs = append(objs, test.addedVA) } - } - - // Create client and informers - client := fake.NewSimpleClientset(coreObjs...) - informers := informers.NewSharedInformerFactory(client, time.Hour /* disable resync*/) - vaInformer := informers.Storage().V1().VolumeAttachments() - pvInformer := informers.Core().V1().PersistentVolumes() - nodeInformer := informers.Core().V1().Nodes() - csiNodeInformer := informers.Storage().V1().CSINodes() - // Fill the informers with initial objects so controller can Get() them - for _, obj := range objs { - switch obj.(type) { - case *v1.PersistentVolume: - pvInformer.Informer().GetStore().Add(obj) - case *v1.Node: - nodeInformer.Informer().GetStore().Add(obj) - case *storage.VolumeAttachment: - vaInformer.Informer().GetStore().Add(obj) - case *v1.Secret: - // Secrets are not cached in any informer - case *storage.CSINode: - csiNodeInformer.Informer().GetStore().Add(obj) - default: - t.Fatalf("Unknown initalObject type: %+v", obj) + if test.updatedVA != nil { + objs = append(objs, test.updatedVA) } - } - // This reactor makes sure that all updates that the controller does are - // reflected in its informers so Lister.Get() finds them. This does not - // enqueue events! - client.Fake.PrependReactor("update", "*", func(action core.Action) (bool, runtime.Object, error) { - if action.GetVerb() == "update" { - switch action.GetResource().Resource { - case "volumeattachments": - logger.V(5).Info("Test reactor: updated VA") - vaInformer.Informer().GetStore().Update(action.(core.UpdateAction).GetObject()) - case "persistentvolumes": - logger.V(5).Info("Test reactor: updated PV") - pvInformer.Informer().GetStore().Update(action.(core.UpdateAction).GetObject()) + if test.updatedPV != nil { + objs = append(objs, test.updatedPV) + } + + coreObjs := []runtime.Object{} + for _, obj := range objs { + switch obj.(type) { + case *storage.CSINode: default: - t.Errorf("Unknown update resource: %s", action.GetResource()) + coreObjs = append(coreObjs, obj) } } - return false, nil, nil - }) - // Run any reactors that the test needs *before* the above one. - for _, reactor := range test.reactors { - client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactor(t)) - } - - // Construct controller - lister := &fakeLister{t: t, publishedNodes: test.listerResponse} - csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls, lister: lister} - handler := handlerFactory(client, informers, csiConnection, lister) - ctrl := NewCSIAttachController(logger, client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultControllerRateLimiter(), workqueue.DefaultControllerRateLimiter(), test.listerResponse != nil, 1*time.Minute) - - // Start the test by enqueueing the right event - if test.addedVA != nil { - ctrl.vaAdded(test.addedVA) - } - if test.updatedVA != nil { - ctrl.vaUpdatedFunc(logger)(test.updatedVA, test.updatedVA) - } - if test.deletedVA != nil { - ctrl.vaDeleted(test.deletedVA) - } - if test.updatedPV != nil { - ctrl.pvUpdated(test.updatedPV, test.updatedPV) - } - - // Process the queue until we get expected results - timeout := time.Now().Add(10 * time.Second) - lastReportedActionCount := 0 - for { - if time.Now().After(timeout) { - t.Errorf("Test %q: timed out", test.name) - break - } - if ctrl.vaQueue.Len() > 0 { - logger.V(5).Info("VA queue, processing one", "queueLength", ctrl.vaQueue.Len()) - ctrl.syncVA(ctx) - } - if ctrl.pvQueue.Len() > 0 { - logger.V(5).Info("PV queue, processing one", "queueLength", ctrl.pvQueue.Len()) - ctrl.syncPV(ctx) - } - if ctrl.vaQueue.Len() > 0 || ctrl.pvQueue.Len() > 0 { - // There is still some work in the queue, process it now - continue - } - if test.listerResponse != nil { - // Reconcile VA with the actual state - err := ctrl.handler.ReconcileVA(ctx) - if err != nil { - t.Errorf("Failed to reconcile Volume Attachment objects: %v", err) + + // Create client and informers + client := fake.NewSimpleClientset(coreObjs...) + informers := informers.NewSharedInformerFactory(client, time.Hour /* disable resync*/) + vaInformer := informers.Storage().V1().VolumeAttachments() + pvInformer := informers.Core().V1().PersistentVolumes() + nodeInformer := informers.Core().V1().Nodes() + csiNodeInformer := informers.Storage().V1().CSINodes() + // Fill the informers with initial objects so controller can Get() them + for _, obj := range objs { + switch obj.(type) { + case *v1.PersistentVolume: + pvInformer.Informer().GetStore().Add(obj) + case *v1.Node: + nodeInformer.Informer().GetStore().Add(obj) + case *storage.VolumeAttachment: + vaInformer.Informer().GetStore().Add(obj) + case *v1.Secret: + // Secrets are not cached in any informer + case *storage.CSINode: + csiNodeInformer.Informer().GetStore().Add(obj) + default: + t.Fatalf("Unknown initalObject type: %+v", obj) } } - if ctrl.vaQueue.Len() > 0 || ctrl.pvQueue.Len() > 0 { - // Reconciler created some work, process the queues once again - continue - } - currentActionCount := len(client.Actions()) - if currentActionCount < len(test.expectedActions) { - if lastReportedActionCount < currentActionCount { - logger.V(5).Info("Waiting for the rest", "currentActionCount", currentActionCount, "expectedActionsCount", len(test.expectedActions)) - lastReportedActionCount = currentActionCount + // This reactor makes sure that all updates that the controller does are + // reflected in its informers so Lister.Get() finds them. This does not + // enqueue events! + client.Fake.PrependReactor("update", "*", func(action core.Action) (bool, runtime.Object, error) { + if action.GetVerb() == "update" { + switch action.GetResource().Resource { + case "volumeattachments": + logger.V(5).Info("Test reactor: updated VA") + vaInformer.Informer().GetStore().Update(action.(core.UpdateAction).GetObject()) + case "persistentvolumes": + logger.V(5).Info("Test reactor: updated PV") + pvInformer.Informer().GetStore().Update(action.(core.UpdateAction).GetObject()) + default: + t.Errorf("Unknown update resource: %s", action.GetResource()) + } } - // The test expected more to happen, wait for them - time.Sleep(10 * time.Millisecond) - continue + return false, nil, nil + }) + // Run any reactors that the test needs *before* the above one. + for _, reactor := range test.reactors { + client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactor(t)) } - break - } - actions := client.Actions() - for i, action := range actions { - if len(test.expectedActions) < i+1 { - t.Errorf("Test %q: %d unexpected actions: %+v", test.name, len(actions)-len(test.expectedActions), spew.Sdump(actions[i:])) - break + // Construct controller + lister := &fakeLister{t: t, publishedNodes: test.listerResponse} + csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls, lister: lister} + handler := handlerFactory(client, informers, csiConnection, lister) + ctrl := NewCSIAttachController(logger, client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultControllerRateLimiter(), workqueue.DefaultControllerRateLimiter(), test.listerResponse != nil, 1*time.Minute) + + // Start the test by enqueueing the right event + if test.addedVA != nil { + ctrl.vaAdded(test.addedVA) + } + if test.updatedVA != nil { + ctrl.vaUpdatedFunc(logger)(test.updatedVA, test.updatedVA) + } + if test.deletedVA != nil { + ctrl.vaDeleted(test.deletedVA) + } + if test.updatedPV != nil { + ctrl.pvUpdated(test.updatedPV, test.updatedPV) } - // Sanitize time in attach/detach errors - if action.GetVerb() == "update" && action.GetResource().Resource == "volumeattachments" { - obj := action.(core.UpdateAction).GetObject() - o := obj.(*storage.VolumeAttachment) - if o.Status.AttachError != nil { - o.Status.AttachError.Time = metav1.Time{} + // Process the queue until we get expected results + timeout := time.Now().Add(10 * time.Second) + lastReportedActionCount := 0 + for { + if time.Now().After(timeout) { + t.Errorf("Test %q: timed out", test.name) + break } - if o.Status.DetachError != nil { - o.Status.DetachError.Time = metav1.Time{} + if ctrl.vaQueue.Len() > 0 { + logger.V(5).Info("VA queue, processing one", "queueLength", ctrl.vaQueue.Len()) + ctrl.syncVA(ctx) } - } - - if action.GetVerb() == "patch" && action.GetResource().Resource == "volumeattachments" { - patchAction := action.(core.PatchActionImpl) - patch := patchAction.GetPatch() - var va storage.VolumeAttachment - err := json.Unmarshal(patch, &va) - if err != nil { - t.Errorf("Failed to unmarshal: %v", err) + if ctrl.pvQueue.Len() > 0 { + logger.V(5).Info("PV queue, processing one", "queueLength", ctrl.pvQueue.Len()) + ctrl.syncPV(ctx) } - if va.Status.AttachError != nil { - va.Status.AttachError.Time = metav1.Time{} + if ctrl.vaQueue.Len() > 0 || ctrl.pvQueue.Len() > 0 { + // There is still some work in the queue, process it now + continue } - if va.Status.DetachError != nil { - va.Status.DetachError.Time = metav1.Time{} + if test.listerResponse != nil { + // Reconcile VA with the actual state + err := ctrl.handler.ReconcileVA(ctx) + if err != nil { + t.Errorf("Failed to reconcile Volume Attachment objects: %v", err) + } } + if ctrl.vaQueue.Len() > 0 || ctrl.pvQueue.Len() > 0 { + // Reconciler created some work, process the queues once again + continue + } + currentActionCount := len(client.Actions()) + if currentActionCount < len(test.expectedActions) { + if lastReportedActionCount < currentActionCount { + logger.V(5).Info("Waiting for the rest", "currentActionCount", currentActionCount, "expectedActionsCount", len(test.expectedActions)) + lastReportedActionCount = currentActionCount + } + // The test expected more to happen, wait for them + time.Sleep(10 * time.Millisecond) + continue + } + break + } - if va.Status.AttachError != nil || va.Status.DetachError != nil { + actions := client.Actions() + for i, action := range actions { + if len(test.expectedActions) < i+1 { + t.Errorf("Test %q: %d unexpected actions: %+v", test.name, len(actions)-len(test.expectedActions), spew.Sdump(actions[i:])) + break + } + + // Sanitize time in attach/detach errors + if action.GetVerb() == "update" && action.GetResource().Resource == "volumeattachments" { + obj := action.(core.UpdateAction).GetObject() + o := obj.(*storage.VolumeAttachment) + if o.Status.AttachError != nil { + o.Status.AttachError.Time = metav1.Time{} + } + if o.Status.DetachError != nil { + o.Status.DetachError.Time = metav1.Time{} + } + } - patch, err = createMergePatch(storage.VolumeAttachment{}, va) + if action.GetVerb() == "patch" && action.GetResource().Resource == "volumeattachments" { + patchAction := action.(core.PatchActionImpl) + patch := patchAction.GetPatch() + var va storage.VolumeAttachment + err := json.Unmarshal(patch, &va) if err != nil { - t.Errorf("Test %q create patch failed", t.Name()) + t.Errorf("Failed to unmarshal: %v", err) } - patchAction.Patch = patch - action = patchAction + if va.Status.AttachError != nil { + va.Status.AttachError.Time = metav1.Time{} + } + if va.Status.DetachError != nil { + va.Status.DetachError.Time = metav1.Time{} + } + + if va.Status.AttachError != nil || va.Status.DetachError != nil { + + patch, err = createMergePatch(storage.VolumeAttachment{}, va) + if err != nil { + t.Errorf("Test %q create patch failed", t.Name()) + } + patchAction.Patch = patch + action = patchAction + } + } + expectedAction := test.expectedActions[i] + if !reflect.DeepEqual(expectedAction, action) { + t.Errorf("Test %q: action %d\nExpected:\n%s\ngot:\n%s", test.name, i, spew.Sdump(expectedAction), spew.Sdump(action)) + continue + } } - expectedAction := test.expectedActions[i] - if !reflect.DeepEqual(expectedAction, action) { - t.Errorf("Test %q: action %d\nExpected:\n%s\ngot:\n%s", test.name, i, spew.Sdump(expectedAction), spew.Sdump(action)) - continue + if len(test.expectedActions) > len(actions) { + t.Errorf("Test %q: %d additional expected actions", test.name, len(test.expectedActions)-len(actions)) + for _, a := range test.expectedActions[len(actions):] { + t.Logf(" %+v", a) + } } - } - if len(test.expectedActions) > len(actions) { - t.Errorf("Test %q: %d additional expected actions", test.name, len(test.expectedActions)-len(actions)) - for _, a := range test.expectedActions[len(actions):] { - t.Logf(" %+v", a) + if test.additionalCheck != nil { + test.additionalCheck(t, test) } - } - - if test.additionalCheck != nil { - test.additionalCheck(t, test) - } - // makesure all the csi calls were executed. - if csiConnection.index < len(csiConnection.calls) { - t.Errorf("Test %q: %d additional expected CSI calls", test.name, len(csiConnection.calls)-csiConnection.index) - for _, a := range csiConnection.calls[csiConnection.index:] { - t.Logf(" %+v", a) + // makesure all the csi calls were executed. + if csiConnection.index < len(csiConnection.calls) { + t.Errorf("Test %q: %d additional expected CSI calls", test.name, len(csiConnection.calls)-csiConnection.index) + for _, a := range csiConnection.calls[csiConnection.index:] { + t.Logf(" %+v", a) + } } - } - logger.Info("Test was finished") + logger.Info("Test was finished") + }) } } diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 110cd6c65..9c615dc11 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -28,6 +28,7 @@ import ( "github.com/kubernetes-csi/csi-lib-utils/accessmodes" v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -55,28 +56,47 @@ func markAsAttached(ctx context.Context, client kubernetes.Interface, va *storag } func markAsDetached(ctx context.Context, client kubernetes.Interface, va *storage.VolumeAttachment) (*storage.VolumeAttachment, error) { + logger := klog.FromContext(ctx) finalizerName := GetFinalizerName(va.Spec.Attacher) // Prepare new array of finalizers newFinalizers := make([]string, 0, len(va.Finalizers)) - found := false + finalizerFound := false for _, f := range va.Finalizers { if f == finalizerName { - found = true + finalizerFound = true continue } newFinalizers = append(newFinalizers, f) } + if !finalizerFound && !va.Status.Attached { + // Finalizer was not present, nothing to update + logger.V(4).Info("Already fully detached") + return va, nil + } // Mostly to simplify unit tests, but it won't harm in production too if len(newFinalizers) == 0 { newFinalizers = nil } - logger := klog.FromContext(ctx) - if !found && !va.Status.Attached { - // Finalizer was not present, nothing to update - logger.V(4).Info("Already fully detached") - return va, nil + // Remove the finalizer first. A VolumeAttachment with DeletionTimestamp and without Finalizer will be considered as detached + // even when status says `attached: true`. Therefore the attacher won't re-try detaching already detached volume. + // The other way around (mark as detached and then remove the finalizer) leads to a second ControllerUnpublish call, + // because a VolumeAttachment with the finalizer present and `attached: false` could mean the attachment could have timed out + // previously and the attacher needs to confirm the volume is detached with ControllerUnpublish. + if finalizerFound { + clone := va.DeepCopy() + clone.Finalizers = newFinalizers + patch, err := createMergePatch(va, clone) + if err != nil { + return va, err + } + newVA, err := client.StorageV1().VolumeAttachments().Patch(ctx, va.Name, types.MergePatchType, patch, metav1.PatchOptions{}) + if err != nil { + return va, err + } + logger.V(4).Info("Finalizer removed") + va = newVA } logger.V(4).Info("Marking as detached") @@ -90,21 +110,13 @@ func markAsDetached(ctx context.Context, client kubernetes.Interface, va *storag } newVA, err := client.StorageV1().VolumeAttachments().Patch(ctx, va.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status") if err != nil { + if apierrors.IsNotFound(err) { + // The VolumeAttachment does not have any finalizer, it might have been deleted by the API server. + return va, nil + } return va, err } - // As Finalizers is not in the status subresource it must be patched separately. It is removed after the status update so the resource is not prematurely deleted. - clone = newVA.DeepCopy() - clone.Finalizers = newFinalizers - patch, err = createMergePatch(newVA, clone) - if err != nil { - return newVA, err - } - newVA, err = client.StorageV1().VolumeAttachments().Patch(ctx, newVA.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "") - if err != nil { - return newVA, err - } - logger.V(4).Info("Finalizer removed") return newVA, nil }