Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drain and volume detachment status conditions #1876

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ConditionTypeInitialized = "Initialized"
ConditionTypeConsolidatable = "Consolidatable"
ConditionTypeDrifted = "Drifted"
ConditionTypeDrained = "Drained"
ConditionTypeVolumesDetached = "VolumesDetached"
ConditionTypeInstanceTerminating = "InstanceTerminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
ConditionTypeDisruptionReason = "DisruptionReason"
Expand Down
184 changes: 127 additions & 57 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou

func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.termination")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))

if !n.GetDeletionTimestamp().IsZero() {
return c.finalize(ctx, n)
Expand All @@ -92,112 +94,180 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, nil
}

nodeClaims, err := nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
// This should not occur. The NodeClaim is required to track details about the termination stage and termination grace
// period and will not be finalized until after the Node has been terminated by Karpenter. If there are duplicates or
// the nodeclaim does not exist, this indicates a customer induced error (e.g. removing finalizers or manually
// creating nodeclaims with matching provider IDs).
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we throw a comment over this one that indicates that we don't expect this case to happen and (if it does) then we expect that something has gone wrong and we have broken some tenant of the system?

return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name)))
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, fmt.Errorf("deleting nodeclaim, %w", err)
}
}

if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully drain nodes that are
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remind me again: I recall there was a bug and a reason that we moved this up -- something with us getting stuck on the terminationGracePeriod and continually trying to drain even if the instance was already terminated, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we were only checking this in the drain logic, if we drained but were stuck awaiting volume attachments we never hit this check and could get stuck indefinitely. I don't think there was any interaction with terminationGracePeriod, if anything it would save users in that case.

// no longer alive. We do a check on the Ready condition of the node since, even though the CloudProvider says the
// instance is not around, we know that the kubelet process is still running if the Node Ready condition is true.
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}
}

nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaims...)
nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaim)
if err != nil {
return reconcile.Result{}, err
}

if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err))
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}
if err = c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil {
if !terminator.IsNodeDrainError(err) {
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err))
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified {
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be Drained (Unknown) here since we are in the process of draining but we haven't completed our drain logic -- at which point we would mark the status as Drained=true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is at this point we know it is not drained, since it's in the process of draining. Whereas before we do the check we don't know if there are any drainable pods on the node, so drained is unknown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: #1876 (comment) but I personally disagree with this framing -- personally, I think we should have done InstanceTerminated and then gone from Unknown to True/False as well there -- transitioning from True -> False or False -> True for a terminal status condition in general is a little odd because it suggests that the process has finished when in fact it hasn't

if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
}

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.Inc(map[string]string{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
if !nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() {
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained)
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
NodesDrainedTotal.Inc(map[string]string{
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}

// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) {
areVolumesDetached, err := c.ensureVolumesDetached(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !areVolumesDetached {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
}
nodeClaims, err = nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
pendingVolumeAttachments, err := c.pendingVolumeAttachments(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
for _, nodeClaim := range nodeClaims {
isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if err != nil {
// 404 = the nodeClaim no longer exists
if errors.IsNotFound(err) {
continue
if len(pendingVolumeAttachments) == 0 {
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
// 409 - The nodeClaim exists, but its status has already been modified
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
} else if !c.hasTerminationGracePeriodElapsed(nodeTerminationTime) {
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node, pendingVolumeAttachments...))
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this also be setting to Unknown since we are in the process of Detaching the volumes so it hasn't hit a terminal state yet

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to Drained, I think it's more clear to set it to False here since we know the volumes aren't detached. Unknown indicates to me that we don't know one way or the other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is counter to how we have been treating status conditions throughout the project -- we are indicating with status conditions that a process hasn't completed and we don't know whether it's going to succeed or fail (for instance, the Completed status condition for a job doesn't go into a False state while the job is running, it stays in Unknown because we don't know if the job is going to complete or not and then transitions to True/False based on whether it entered a terminal state or not

if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
} else {
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "TerminationGracePeriodElapsed", "TerminationGracePeriodElapsed"); modified {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see us setting it to False here since this indicates that we failed to attach the volumes and we had to terminate due to hitting our terminationGracePeriod on the node

if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
if errors.IsNotFound(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, expected a single associated nodeclaim, %w", err))
}
return reconcile.Result{}, err
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if client.IgnoreNotFound(err) != nil {
// 409 - The nodeClaim exists, but its status has already been modified
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}
if err := c.removeFinalizer(ctx, node); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.NodeClaim) error {
for _, nodeClaim := range nodeClaims {
// If we still get the NodeClaim, but it's already marked as terminating, we don't need to call Delete again
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return client.IgnoreNotFound(err)
}
}
func (c *Controller) hasTerminationGracePeriodElapsed(nodeTerminationTime *time.Time) bool {
if nodeTerminationTime == nil {
return false
}
return nil
return !c.clock.Now().Before(*nodeTerminationTime)
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
func (c *Controller) pendingVolumeAttachments(ctx context.Context, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
return false, err
return nil, err
}
// Filter out VolumeAttachments associated with not drain-able Pods
filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock)
if err != nil {
return false, err
return nil, err
}
return len(filteredVolumeAttachments) == 0, nil
return filteredVolumeAttachments, nil
}

// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination
Expand Down
Loading
Loading