-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: drain and volume detachment status conditions #1876
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ import ( | |
"k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/util/sets" | ||
"k8s.io/client-go/util/workqueue" | ||
"k8s.io/klog/v2" | ||
"k8s.io/utils/clock" | ||
controllerruntime "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/builder" | ||
|
@@ -76,6 +77,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou | |
|
||
func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) { | ||
ctx = injection.WithControllerName(ctx, "node.termination") | ||
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name))) | ||
|
||
if !n.GetDeletionTimestamp().IsZero() { | ||
return c.finalize(ctx, n) | ||
|
@@ -92,112 +94,180 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile | |
return reconcile.Result{}, nil | ||
} | ||
|
||
nodeClaims, err := nodeutils.GetNodeClaims(ctx, c.kubeClient, node) | ||
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node) | ||
if err != nil { | ||
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err) | ||
// This should not occur. The NodeClaim is required to track details about the termination stage and termination grace | ||
// period and will not be finalized until after the Node has been terminated by Karpenter. If there are duplicates or | ||
// the nodeclaim does not exist, this indicates a customer induced error (e.g. removing finalizers or manually | ||
// creating nodeclaims with matching provider IDs). | ||
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) { | ||
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err)) | ||
} | ||
return reconcile.Result{}, err | ||
} | ||
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name))) | ||
if nodeClaim.DeletionTimestamp.IsZero() { | ||
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { | ||
if errors.IsNotFound(err) { | ||
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err)) | ||
} | ||
return reconcile.Result{}, fmt.Errorf("deleting nodeclaim, %w", err) | ||
} | ||
} | ||
|
||
if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil { | ||
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) | ||
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully drain nodes that are | ||
jonathan-innis marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remind me again: I recall there was a bug and a reason that we moved this up -- something with us getting stuck on the terminationGracePeriod and continually trying to drain even if the instance was already terminated, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we were only checking this in the drain logic, if we drained but were stuck awaiting volume attachments we never hit this check and could get stuck indefinitely. I don't think there was any interaction with |
||
// no longer alive. We do a check on the Ready condition of the node since, even though the CloudProvider says the | ||
// instance is not around, we know that the kubelet process is still running if the Node Ready condition is true. | ||
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144 | ||
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue { | ||
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil { | ||
if cloudprovider.IsNodeClaimNotFoundError(err) { | ||
return reconcile.Result{}, c.removeFinalizer(ctx, node) | ||
} | ||
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) | ||
} | ||
} | ||
|
||
nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaims...) | ||
nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaim) | ||
if err != nil { | ||
return reconcile.Result{}, err | ||
} | ||
|
||
if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil { | ||
if errors.IsConflict(err) { | ||
return reconcile.Result{Requeue: true}, nil | ||
} | ||
return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)) | ||
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err) | ||
} | ||
if err = c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil { | ||
if !terminator.IsNodeDrainError(err) { | ||
return reconcile.Result{}, fmt.Errorf("draining node, %w", err) | ||
} | ||
c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err)) | ||
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining | ||
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even | ||
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running | ||
// if the Node Ready condition is true | ||
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144 | ||
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue { | ||
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil { | ||
if cloudprovider.IsNodeClaimNotFoundError(err) { | ||
return reconcile.Result{}, c.removeFinalizer(ctx, node) | ||
stored := nodeClaim.DeepCopy() | ||
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified { | ||
jmdeal marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it be Drained (Unknown) here since we are in the process of draining but we haven't completed our drain logic -- at which point we would mark the status as Drained=true There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My thought is at this point we know it is not drained, since it's in the process of draining. Whereas before we do the check we don't know if there are any drainable pods on the node, so drained is unknown. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See: #1876 (comment) but I personally disagree with this framing -- personally, I think we should have done InstanceTerminated and then gone from Unknown to True/False as well there -- transitioning from True -> False or False -> True for a terminal status condition in general is a little odd because it suggests that the process has finished when in fact it hasn't |
||
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { | ||
if errors.IsConflict(err) { | ||
return reconcile.Result{Requeue: true}, nil | ||
} | ||
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) | ||
if errors.IsNotFound(err) { | ||
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err)) | ||
} | ||
return reconcile.Result{}, err | ||
} | ||
} | ||
|
||
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil | ||
} | ||
NodesDrainedTotal.Inc(map[string]string{ | ||
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey], | ||
}) | ||
if !nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() { | ||
jmdeal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
stored := nodeClaim.DeepCopy() | ||
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained) | ||
jmdeal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { | ||
if errors.IsConflict(err) { | ||
return reconcile.Result{Requeue: true}, nil | ||
} | ||
if errors.IsNotFound(err) { | ||
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err)) | ||
} | ||
return reconcile.Result{}, err | ||
} | ||
NodesDrainedTotal.Inc(map[string]string{ | ||
jmdeal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey], | ||
}) | ||
jmdeal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent | ||
// operations on the NodeClaim. | ||
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil | ||
} | ||
|
||
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait | ||
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer. | ||
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting. | ||
if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) { | ||
areVolumesDetached, err := c.ensureVolumesDetached(ctx, node) | ||
if err != nil { | ||
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err) | ||
} | ||
if !areVolumesDetached { | ||
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil | ||
} | ||
} | ||
nodeClaims, err = nodeutils.GetNodeClaims(ctx, c.kubeClient, node) | ||
pendingVolumeAttachments, err := c.pendingVolumeAttachments(ctx, node) | ||
if err != nil { | ||
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) | ||
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err) | ||
} | ||
for _, nodeClaim := range nodeClaims { | ||
isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider) | ||
if err != nil { | ||
// 404 = the nodeClaim no longer exists | ||
if errors.IsNotFound(err) { | ||
continue | ||
if len(pendingVolumeAttachments) == 0 { | ||
stored := nodeClaim.DeepCopy() | ||
if modified := nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached); modified { | ||
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { | ||
if errors.IsConflict(err) { | ||
return reconcile.Result{Requeue: true}, nil | ||
} | ||
if errors.IsNotFound(err) { | ||
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err)) | ||
} | ||
return reconcile.Result{}, err | ||
} | ||
// 409 - The nodeClaim exists, but its status has already been modified | ||
if errors.IsConflict(err) { | ||
return reconcile.Result{Requeue: true}, nil | ||
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent | ||
// operations on the NodeClaim. | ||
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil | ||
} | ||
} else if !c.hasTerminationGracePeriodElapsed(nodeTerminationTime) { | ||
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node, pendingVolumeAttachments...)) | ||
stored := nodeClaim.DeepCopy() | ||
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this also be setting to Unknown since we are in the process of Detaching the volumes so it hasn't hit a terminal state yet There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is counter to how we have been treating status conditions throughout the project -- we are indicating with status conditions that a process hasn't completed and we don't know whether it's going to succeed or fail (for instance, the Completed status condition for a job doesn't go into a False state while the job is running, it stays in Unknown because we don't know if the job is going to complete or not and then transitions to True/False based on whether it entered a terminal state or not |
||
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { | ||
if errors.IsConflict(err) { | ||
return reconcile.Result{Requeue: true}, nil | ||
} | ||
if errors.IsNotFound(err) { | ||
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err)) | ||
} | ||
return reconcile.Result{}, err | ||
} | ||
} | ||
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil | ||
} else { | ||
stored := nodeClaim.DeepCopy() | ||
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "TerminationGracePeriodElapsed", "TerminationGracePeriodElapsed"); modified { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could see us setting it to False here since this indicates that we failed to attach the volumes and we had to terminate due to hitting our terminationGracePeriod on the node |
||
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { | ||
if errors.IsConflict(err) { | ||
return reconcile.Result{Requeue: true}, nil | ||
} | ||
if errors.IsNotFound(err) { | ||
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err)) | ||
} | ||
return reconcile.Result{}, err | ||
} | ||
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err) | ||
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent | ||
// operations on the NodeClaim. | ||
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil | ||
} | ||
if !isInstanceTerminated { | ||
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil | ||
} | ||
|
||
isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider) | ||
if client.IgnoreNotFound(err) != nil { | ||
// 409 - The nodeClaim exists, but its status has already been modified | ||
if errors.IsConflict(err) { | ||
return reconcile.Result{Requeue: true}, nil | ||
} | ||
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err) | ||
} | ||
if !isInstanceTerminated { | ||
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil | ||
} | ||
if err := c.removeFinalizer(ctx, node); err != nil { | ||
return reconcile.Result{}, err | ||
} | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.NodeClaim) error { | ||
for _, nodeClaim := range nodeClaims { | ||
// If we still get the NodeClaim, but it's already marked as terminating, we don't need to call Delete again | ||
if nodeClaim.DeletionTimestamp.IsZero() { | ||
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { | ||
return client.IgnoreNotFound(err) | ||
} | ||
} | ||
func (c *Controller) hasTerminationGracePeriodElapsed(nodeTerminationTime *time.Time) bool { | ||
if nodeTerminationTime == nil { | ||
return false | ||
} | ||
return nil | ||
return !c.clock.Now().Before(*nodeTerminationTime) | ||
} | ||
|
||
func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) { | ||
func (c *Controller) pendingVolumeAttachments(ctx context.Context, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) { | ||
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node) | ||
if err != nil { | ||
return false, err | ||
return nil, err | ||
} | ||
// Filter out VolumeAttachments associated with not drain-able Pods | ||
filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock) | ||
if err != nil { | ||
return false, err | ||
return nil, err | ||
} | ||
return len(filteredVolumeAttachments) == 0, nil | ||
return filteredVolumeAttachments, nil | ||
} | ||
|
||
// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we throw a comment over this one that indicates that we don't expect this case to happen and (if it does) then we expect that something has gone wrong and we have broken some tenant of the system?