From c550b770205c1fedd37c16faf4dc11a8f64b480e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20K=C5=82obuszewski?= Date: Mon, 25 Apr 2022 16:24:54 +0200 Subject: [PATCH] Make NodeDeletionTracker implement ActuationStatus interface --- .../deletiontracker/nodedeletiontracker.go | 143 +++++++++++++----- .../core/scaledown/legacy/legacy.go | 38 ++--- .../core/scaledown/legacy/legacy_test.go | 12 +- .../core/scaledown/legacy/wrapper.go | 43 +----- .../core/scaledown/scaledown.go | 11 +- cluster-autoscaler/core/static_autoscaler.go | 14 +- .../core/static_autoscaler_test.go | 4 +- .../status/scale_down_status_processor.go | 13 +- cluster-autoscaler/utils/expiring/list.go | 93 ++++++++++++ .../utils/expiring/list_test.go | 94 ++++++++++++ 10 files changed, 337 insertions(+), 128 deletions(-) create mode 100644 cluster-autoscaler/utils/expiring/list.go create mode 100644 cluster-autoscaler/utils/expiring/list_test.go diff --git a/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go b/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go index 0a6fa6f749fe..6a672aebf42e 100644 --- a/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go +++ b/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go @@ -18,92 +18,153 @@ package deletiontracker import ( "sync" + "time" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/utils/expiring" + + apiv1 "k8s.io/api/core/v1" klog "k8s.io/klog/v2" + "k8s.io/utils/clock" ) // NodeDeletionTracker keeps track of node deletions. -// TODO: extend to implement ActuationStatus interface type NodeDeletionTracker struct { sync.Mutex - nonEmptyNodeDeleteInProgress bool - // A map of node delete results by node name. It's being constantly emptied into ScaleDownStatus - // objects in order to notify the ScaleDownStatusProcessor that the node drain has ended or that - // an error occurred during the deletion process. - nodeDeleteResults map[string]status.NodeDeleteResult // A map which keeps track of deletions in progress for nodepools. // Key is a node group id and value is a number of node deletions in progress. - deletionsInProgress map[string]int + deletionsPerNodeGroup map[string]int + // This mapping contains node names of all empty nodes currently undergoing deletion. + emptyNodeDeletions map[string]bool + // This mapping contains node names of all nodes currently undergoing drain and deletion. + drainedNodeDeletions map[string]bool + // Clock for checking current time. + clock clock.PassiveClock + // Helper struct for tracking pod evictions. + evictions *expiring.List + // How long evictions are considered as recent. + evictionsTTL time.Duration + // Helper struct for tracking deletion results. + deletionResults *expiring.List +} + +type deletionResult struct { + nodeName string + result status.NodeDeleteResult } // NewNodeDeletionTracker creates new NodeDeletionTracker. -func NewNodeDeletionTracker() *NodeDeletionTracker { +func NewNodeDeletionTracker(podEvictionsTTL time.Duration) *NodeDeletionTracker { return &NodeDeletionTracker{ - nodeDeleteResults: make(map[string]status.NodeDeleteResult), - deletionsInProgress: make(map[string]int), + deletionsPerNodeGroup: make(map[string]int), + emptyNodeDeletions: make(map[string]bool), + drainedNodeDeletions: make(map[string]bool), + clock: clock.RealClock{}, + evictions: expiring.NewList(), + evictionsTTL: podEvictionsTTL, + deletionResults: expiring.NewList(), } } -// IsNonEmptyNodeDeleteInProgress returns true if a non empty node is being deleted. -func (n *NodeDeletionTracker) IsNonEmptyNodeDeleteInProgress() bool { - n.Lock() - defer n.Unlock() - return n.nonEmptyNodeDeleteInProgress -} - -// SetNonEmptyNodeDeleteInProgress sets non empty node deletion in progress status. -func (n *NodeDeletionTracker) SetNonEmptyNodeDeleteInProgress(status bool) { +// StartDeletion increments node deletion in progress counter for the given nodegroup. +func (n *NodeDeletionTracker) StartDeletion(nodeGroupId, nodeName string) { n.Lock() defer n.Unlock() - n.nonEmptyNodeDeleteInProgress = status + n.deletionsPerNodeGroup[nodeGroupId]++ + n.emptyNodeDeletions[nodeName] = true } -// StartDeletion increments node deletion in progress counter for the given nodegroup. -func (n *NodeDeletionTracker) StartDeletion(nodeGroupId string) { +// StartDeletionWithDrain is equivalent to StartDeletion, but for counting nodes that are drained first. +func (n *NodeDeletionTracker) StartDeletionWithDrain(nodeGroupId, nodeName string) { n.Lock() defer n.Unlock() - n.deletionsInProgress[nodeGroupId]++ + n.deletionsPerNodeGroup[nodeGroupId]++ + n.drainedNodeDeletions[nodeName] = true } // EndDeletion decrements node deletion in progress counter for the given nodegroup. -func (n *NodeDeletionTracker) EndDeletion(nodeGroupId string) { +func (n *NodeDeletionTracker) EndDeletion(nodeGroupId, nodeName string, result status.NodeDeleteResult) { n.Lock() defer n.Unlock() - value, found := n.deletionsInProgress[nodeGroupId] + n.deletionResults.RegisterElement(&deletionResult{nodeName, result}) + value, found := n.deletionsPerNodeGroup[nodeGroupId] if !found { - klog.Errorf("This should never happen, counter for %s in DelayedNodeDeletionStatus wasn't found", nodeGroupId) + klog.Errorf("This should never happen, counter for %s in NodeDeletionTracker wasn't found", nodeGroupId) return } if value <= 0 { - klog.Errorf("This should never happen, counter for %s in DelayedNodeDeletionStatus isn't greater than 0, counter value is %d", nodeGroupId, value) + klog.Errorf("This should never happen, counter for %s in NodeDeletionTracker isn't greater than 0, counter value is %d", nodeGroupId, value) + } + n.deletionsPerNodeGroup[nodeGroupId]-- + if n.deletionsPerNodeGroup[nodeGroupId] <= 0 { + delete(n.deletionsPerNodeGroup, nodeGroupId) + } + delete(n.emptyNodeDeletions, nodeName) + delete(n.drainedNodeDeletions, nodeName) +} + +// DeletionsInProgress returns a list of all node names currently undergoing deletion. +func (n *NodeDeletionTracker) DeletionsInProgress() ([]string, []string) { + n.Lock() + defer n.Unlock() + return mapKeysSlice(n.emptyNodeDeletions), mapKeysSlice(n.drainedNodeDeletions) +} + +func mapKeysSlice(m map[string]bool) []string { + s := make([]string, len(m)) + i := 0 + for k := range m { + s[i] = k + i++ } - n.deletionsInProgress[nodeGroupId]-- - if n.deletionsInProgress[nodeGroupId] <= 0 { - delete(n.deletionsInProgress, nodeGroupId) + return s +} + +// RegisterEviction stores information about a pod that was recently evicted. +func (n *NodeDeletionTracker) RegisterEviction(pod *apiv1.Pod) { + n.Lock() + defer n.Unlock() + n.evictions.RegisterElement(pod) +} + +// RecentEvictions returns a list of pods that were recently evicted by Cluster Autoscaler. +func (n *NodeDeletionTracker) RecentEvictions() []*apiv1.Pod { + n.Lock() + defer n.Unlock() + n.evictions.DropNotNewerThan(n.clock.Now().Add(-n.evictionsTTL)) + els := n.evictions.ToSlice() + pods := make([]*apiv1.Pod, 0, len(els)) + for _, el := range els { + pods = append(pods, el.(*apiv1.Pod)) } + return pods } -// GetDeletionsInProgress returns the number of deletions in progress for the given node group. -func (n *NodeDeletionTracker) GetDeletionsInProgress(nodeGroupId string) int { +// DeletionsCount returns the number of deletions in progress for the given node group. +func (n *NodeDeletionTracker) DeletionsCount(nodeGroupId string) int { n.Lock() defer n.Unlock() - return n.deletionsInProgress[nodeGroupId] + return n.deletionsPerNodeGroup[nodeGroupId] } -// AddNodeDeleteResult adds a node delete result to the result map. -func (n *NodeDeletionTracker) AddNodeDeleteResult(nodeName string, result status.NodeDeleteResult) { +// DeletionResults returns deletion results in a map form, along with the timestamp of last result. +func (n *NodeDeletionTracker) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { n.Lock() defer n.Unlock() - n.nodeDeleteResults[nodeName] = result + els, ts := n.deletionResults.ToSliceWithTimestamp() + drs := make(map[string]status.NodeDeleteResult) + for _, el := range els { + dr := el.(*deletionResult) + drs[dr.nodeName] = dr.result + } + return drs, ts } -// GetAndClearNodeDeleteResults returns the whole result map and replaces it with a new empty one. -func (n *NodeDeletionTracker) GetAndClearNodeDeleteResults() map[string]status.NodeDeleteResult { +// ClearResultsNotNewerThan iterates over existing deletion results and keeps +// only the ones that are newer than the provided timestamp. +func (n *NodeDeletionTracker) ClearResultsNotNewerThan(t time.Time) { n.Lock() defer n.Unlock() - results := n.nodeDeleteResults - n.nodeDeleteResults = make(map[string]status.NodeDeleteResult) - return results + n.deletionResults.DropNotNewerThan(t) } diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy.go b/cluster-autoscaler/core/scaledown/legacy/legacy.go index 4275fc6093e0..b7f04e1605fc 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy.go @@ -295,7 +295,7 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au nodeUtilizationMap: make(map[string]utilization.Info), usageTracker: simulator.NewUsageTracker(), unneededNodesList: make([]*apiv1.Node, 0), - nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(), + nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second), unremovableNodeReasons: make(map[string]*simulator.UnremovableNode), } } @@ -637,11 +637,6 @@ func (sd *ScaleDown) UnremovableNodes() []*simulator.UnremovableNode { return ns } -// IsNonEmptyNodeDeleteInProgress returns true if any nodes are being deleted. -func (sd *ScaleDown) IsNonEmptyNodeDeleteInProgress() bool { - return sd.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() -} - // markSimulationError indicates a simulation error by clearing relevant scale // down state and returning an appropriate error. func (sd *ScaleDown) markSimulationError(simulatorErr errors.AutoscalerError, @@ -692,8 +687,8 @@ func (sd *ScaleDown) TryToScaleDown( currentTime time.Time, pdbs []*policyv1.PodDisruptionBudget, ) (*status.ScaleDownStatus, errors.AutoscalerError) { - - scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeletionTracker.GetAndClearNodeDeleteResults()} + ndr, ts := sd.nodeDeletionTracker.DeletionResults() + scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: ndr, NodeDeleteResultsAsOf: ts} nodeDeletionDuration := time.Duration(0) findNodesToRemoveDuration := time.Duration(0) defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration) @@ -790,7 +785,7 @@ func (sd *ScaleDown) TryToScaleDown( continue } - deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id()) + deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id()) if size-deletionsInProgress <= nodeGroup.MinSize() { klog.V(1).Infof("Skipping %s - node group min size reached", node.Name) sd.addUnremovableNodeReason(node, simulator.NodeGroupMinSizeReached) @@ -890,19 +885,16 @@ func (sd *ScaleDown) TryToScaleDown( // Starting deletion. nodeDeletionDuration = time.Now().Sub(nodeDeletionStart) - sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true) + nodeGroup, found := candidateNodeGroups[toRemove.Node.Name] + if !found { + return scaleDownStatus, errors.NewAutoscalerError(errors.InternalError, "failed to find node group for %s", toRemove.Node.Name) + } + sd.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), toRemove.Node.Name) go func() { // Finishing the delete process once this goroutine is over. var result status.NodeDeleteResult - defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(toRemove.Node.Name, result) }() - defer sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(false) - nodeGroup, found := candidateNodeGroups[toRemove.Node.Name] - if !found { - result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: errors.NewAutoscalerError( - errors.InternalError, "failed to find node group for %s", toRemove.Node.Name)} - return - } + defer func() { sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), toRemove.Node.Name, result) }() result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, toRemove.DaemonSetPods, nodeGroup) if result.ResultType != status.NodeDeleteOk { klog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, result.Err) @@ -968,7 +960,7 @@ func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits klog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err) continue } - deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id()) + deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id()) available = size - nodeGroup.MinSize() - deletionsInProgress if available < 0 { available = 0 @@ -1016,10 +1008,9 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodesToRemove []simulator.Nod } deletedNodes = append(deletedNodes, empty.Node) go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictByDefault bool) { - sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id()) - defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id()) + sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id(), nodeToDelete.Name) var result status.NodeDeleteResult - defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(nodeToDelete.Name, result) }() + defer func() { sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id(), nodeToDelete.Name, result) }() var deleteErr errors.AutoscalerError // If we fail to delete the node we want to remove delete taint @@ -1110,9 +1101,6 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)} } - sd.nodeDeletionTracker.StartDeletion(nodeGroup.Id()) - defer sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id()) - // If we fail to evict all the pods from the node we want to remove delete taint defer func() { if !deleteSuccessful { diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go index 45b5a05dfa33..189036007270 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go @@ -1220,7 +1220,8 @@ func TestScaleDown(t *testing.T) { func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) { for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) { - if !sd.IsNonEmptyNodeDeleteInProgress() { + _, drained := sd.nodeDeletionTracker.DeletionsInProgress() + if len(drained) == 0 { return } } @@ -1530,9 +1531,9 @@ func TestScaleDownEmptyMinGroupSizeLimitHit(t *testing.T) { } func TestScaleDownEmptyMinGroupSizeLimitHitWhenOneNodeIsBeingDeleted(t *testing.T) { - nodeDeletionTracker := deletiontracker.NewNodeDeletionTracker() - nodeDeletionTracker.StartDeletion("ng1") - nodeDeletionTracker.StartDeletion("ng1") + nodeDeletionTracker := deletiontracker.NewNodeDeletionTracker(0 * time.Second) + nodeDeletionTracker.StartDeletion("ng1", "n1") + nodeDeletionTracker.StartDeletion("ng1", "n2") options := defaultScaleDownOptions config := &ScaleTestConfig{ Nodes: []NodeConfig{ @@ -1622,7 +1623,6 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) { autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil) assert.NoError(t, autoscalererr) scaleDownStatus, err := scaleDown.TryToScaleDown(time.Now(), nil) - assert.False(t, scaleDown.IsNonEmptyNodeDeleteInProgress()) assert.NoError(t, err) var expectedScaleDownResult status.ScaleDownResult @@ -1652,6 +1652,8 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) { assert.Equal(t, expectedScaleDownCount, len(deleted)) assert.Subset(t, config.ExpectedScaleDowns, deleted) + _, nonEmptyDeletions := scaleDown.nodeDeletionTracker.DeletionsInProgress() + assert.Equal(t, 0, len(nonEmptyDeletions)) } func TestNoScaleDownUnready(t *testing.T) { diff --git a/cluster-autoscaler/core/scaledown/legacy/wrapper.go b/cluster-autoscaler/core/scaledown/legacy/wrapper.go index eef2b85a45ec..74409a34da02 100644 --- a/cluster-autoscaler/core/scaledown/legacy/wrapper.go +++ b/cluster-autoscaler/core/scaledown/legacy/wrapper.go @@ -20,7 +20,6 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" - "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" @@ -89,49 +88,11 @@ func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node, current func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus { // TODO: snapshot information from the tracker instead of keeping live // updated object. - return &actuationStatus{ - ndt: p.sd.nodeDeletionTracker, - } + return p.sd.nodeDeletionTracker } // ClearResultsNotNewerThan clears old node deletion results kept by the // Actuator. func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) { - // TODO: implement this once results are not cleared while being - // fetched. -} - -type actuationStatus struct { - ndt *deletiontracker.NodeDeletionTracker -} - -// DeletionsInProgress returns node names of currently deleted nodes. -// Current implementation is not aware of the actual nodes names, so it returns -// a fake node name instead. -// TODO: Return real node names -func (a *actuationStatus) DeletionsInProgress() []string { - if a.ndt.IsNonEmptyNodeDeleteInProgress() { - return []string{"fake-node-name"} - } - return nil -} - -// DeletionsCount returns total number of ongoing deletions in a given node -// group. -func (a *actuationStatus) DeletionsCount(nodeGroupId string) int { - return a.ndt.GetDeletionsInProgress(nodeGroupId) -} - -// RecentEvictions should return a list of recently evicted pods. Since legacy -// scale down logic only drains at most one node at a time, this safeguard is -// not really needed there, so we can just return an empty list. -func (a *actuationStatus) RecentEvictions() []*apiv1.Pod { - return nil -} - -// DeletionResults returns a map of recent node deletion results. -func (a *actuationStatus) DeletionResults() map[string]status.NodeDeleteResult { - // TODO: update nodeDeletionTracker so it doesn't get & clear in the - // same step. - return a.ndt.GetAndClearNodeDeleteResults() + p.sd.nodeDeletionTracker.ClearResultsNotNewerThan(t) } diff --git a/cluster-autoscaler/core/scaledown/scaledown.go b/cluster-autoscaler/core/scaledown/scaledown.go index 291e70db9ce5..5ae82c8d2b88 100644 --- a/cluster-autoscaler/core/scaledown/scaledown.go +++ b/cluster-autoscaler/core/scaledown/scaledown.go @@ -67,9 +67,10 @@ type Actuator interface { // ActuationStatus is used for feeding Actuator status back into Planner type ActuationStatus interface { - // DeletionsInProgress returns a list of nodes that are currently - // undergoing deletion. - DeletionsInProgress() (nodeNames []string) + // DeletionsInProgress returns two lists of node names that are + // currently undergoing deletion, for empty and non-empty (i.e. drained) + // nodes separately. + DeletionsInProgress() (empty, drained []string) // DeletionsCount returns total number of ongoing deletions in a given // node group. DeletionsCount(nodeGroupId string) int @@ -80,5 +81,7 @@ type ActuationStatus interface { // DeletionResults returns a map of recent node deletion results, keyed // by the node name. Note: if node deletion was scheduled more than // once, only the latest result will be present. - DeletionResults() map[string]status.NodeDeleteResult + // The timestamp returned as the second value indicates the time at + // which the last result was collected. + DeletionResults() (map[string]status.NodeDeleteResult, time.Time) } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 934a39975b79..074a45ff014f 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -523,22 +523,23 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) || a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) - // TODO(x13n): Move deletionsInProgress > 0 condition to the legacy scaledown implementation. - deletionsInProgress := len(actuationStatus.DeletionsInProgress()) + // TODO(x13n): Move nonEmptyDeletionsCount > 0 condition to the legacy scaledown implementation. + _, nonEmptyDeletionsInProgress := actuationStatus.DeletionsInProgress() + nonEmptyDeletionsCount := len(nonEmptyDeletionsInProgress) // In dry run only utilization is updated - calculateUnneededOnly := scaleDownInCooldown || deletionsInProgress > 0 + calculateUnneededOnly := scaleDownInCooldown || nonEmptyDeletionsCount > 0 klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+ "lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v "+ - "deletionsInProgress=%v scaleDownInCooldown=%v", + "nonEmptyDeletionsCount=%v scaleDownInCooldown=%v", calculateUnneededOnly, a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop, - deletionsInProgress, scaleDownInCooldown) + nonEmptyDeletionsCount, scaleDownInCooldown) metrics.UpdateScaleDownInCooldown(scaleDownInCooldown) if scaleDownInCooldown { scaleDownStatus.Result = status.ScaleDownInCooldown - } else if deletionsInProgress > 0 { + } else if nonEmptyDeletionsCount > 0 { scaleDownStatus.Result = status.ScaleDownInProgress } else { klog.V(4).Infof("Starting scale down") @@ -554,6 +555,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart) empty, needDrain := a.scaleDownPlanner.NodesToDelete() scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain, currentTime) + a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf) metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart) metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes())) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 67c7878dc1e3..566980223f28 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -1331,7 +1331,9 @@ func nodeNames(ns []*apiv1.Node) []string { func waitForDeleteToFinish(t *testing.T, sda scaledown.Actuator) { for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) { - if len(sda.CheckStatus().DeletionsInProgress()) == 0 { + _, dip := sda.CheckStatus().DeletionsInProgress() + klog.Infof("Non empty deletions in progress: %v", dip) + if len(dip) == 0 { return } } diff --git a/cluster-autoscaler/processors/status/scale_down_status_processor.go b/cluster-autoscaler/processors/status/scale_down_status_processor.go index 9bd359fa2507..ab98fc5dd09d 100644 --- a/cluster-autoscaler/processors/status/scale_down_status_processor.go +++ b/cluster-autoscaler/processors/status/scale_down_status_processor.go @@ -17,6 +17,8 @@ limitations under the License. package status import ( + "time" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -28,11 +30,12 @@ import ( // ScaleDownStatus represents the state of scale down. type ScaleDownStatus struct { - Result ScaleDownResult - ScaledDownNodes []*ScaleDownNode - UnremovableNodes []*UnremovableNode - RemovedNodeGroups []cloudprovider.NodeGroup - NodeDeleteResults map[string]NodeDeleteResult + Result ScaleDownResult + ScaledDownNodes []*ScaleDownNode + UnremovableNodes []*UnremovableNode + RemovedNodeGroups []cloudprovider.NodeGroup + NodeDeleteResults map[string]NodeDeleteResult + NodeDeleteResultsAsOf time.Time } // SetUnremovableNodesInfo sets the status of nodes that were found to be unremovable. diff --git a/cluster-autoscaler/utils/expiring/list.go b/cluster-autoscaler/utils/expiring/list.go new file mode 100644 index 000000000000..f18e99ef7aad --- /dev/null +++ b/cluster-autoscaler/utils/expiring/list.go @@ -0,0 +1,93 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package expiring + +import ( + "container/list" + "time" + + "k8s.io/utils/clock" +) + +type elementWithTimestamp struct { + value interface{} + added time.Time +} + +// List tracks elements along with their creation times. +// This is essentially a linked list with timestamp on each entry, allowing +// dropping old entries. This struct is not thread safe. +// TODO(x13n): Migrate to generics once supported by Go stdlib (container/list +// in particular). +type List struct { + lst list.List + clock clock.PassiveClock +} + +// NewList creates a new expiring list. +func NewList() *List { + return newListWithClock(clock.RealClock{}) +} + +// Warning: This object doesn't support time travel. Subsequent calls to +// clock.Now are expected to return non-decreasing time values. +func newListWithClock(clock clock.PassiveClock) *List { + return &List{ + clock: clock, + } +} + +// ToSlice converts the underlying list of elements into a slice. +func (e *List) ToSlice() []interface{} { + p := e.lst.Front() + ps := make([]interface{}, 0, e.lst.Len()) + for p != nil { + ps = append(ps, p.Value.(*elementWithTimestamp).value) + p = p.Next() + } + return ps +} + +// ToSliceWithTimestamp is identical to ToSlice, but additionally returns the +// timestamp of newest entry (or current time if there are no entries). +func (e *List) ToSliceWithTimestamp() ([]interface{}, time.Time) { + if e.lst.Len() == 0 { + return nil, e.clock.Now() + } + return e.ToSlice(), e.lst.Back().Value.(*elementWithTimestamp).added +} + +// RegisterElement adds new element to the list. +func (e *List) RegisterElement(elem interface{}) { + e.lst.PushBack(&elementWithTimestamp{elem, e.clock.Now()}) +} + +// DropNotNewerThan removes all elements of the list that are older or exactly +// as old as the provided time. +func (e *List) DropNotNewerThan(expiry time.Time) { + p := e.lst.Front() + for p != nil { + if p.Value.(*elementWithTimestamp).added.After(expiry) { + // First not-expired element on the list, skip checking + // the rest. + return + } + d := p + p = p.Next() + e.lst.Remove(d) + } +} diff --git a/cluster-autoscaler/utils/expiring/list_test.go b/cluster-autoscaler/utils/expiring/list_test.go new file mode 100644 index 000000000000..87a6c4849dad --- /dev/null +++ b/cluster-autoscaler/utils/expiring/list_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package expiring + +import ( + "testing" + "testing/quick" + "time" + + klog "k8s.io/klog/v2" + clock "k8s.io/utils/clock/testing" +) + +func TestToSlice(t *testing.T) { + if err := quick.Check(identityCheck, nil); err != nil { + t.Error(err) + } +} + +func identityCheck(list []int) bool { + l := NewList() + l.registerElementsFrom(list) + return l.equals(list) +} + +func TestDropNotNewer(t *testing.T) { + if err := quick.Check(dropChecks, nil); err != nil { + t.Error(err) + } +} + +func dropChecks(l1, l2, l3 []int) bool { + t0 := time.Now() + c := clock.NewFakePassiveClock(t0) + t1, t2 := t0.Add(1*time.Minute), t0.Add(2*time.Minute) + l := newListWithClock(c) + l.registerElementsFrom(l1) + c.SetTime(t1) + l.registerElementsFrom(l2) + c.SetTime(t2) + if !l.equals(append(l1, l2...)) { + return false + } + l.DropNotNewerThan(t0) + if !l.equals(l2) { + return false + } + l.registerElementsFrom(l3) + if !l.equals(append(l2, l3...)) { + return false + } + l.DropNotNewerThan(t1) + if !l.equals(l3) { + return false + } + l.DropNotNewerThan(t2) + return len(l.ToSlice()) == 0 +} + +func (e *List) registerElementsFrom(list []int) { + for _, i := range list { + e.RegisterElement(i) + } +} + +func (e *List) equals(want []int) bool { + got := e.ToSlice() + if len(got) != len(want) { + klog.Errorf("len(%v) != len(%v)", got, want) + return false + } + for i, g := range got { + w := want[i] + if g.(int) != w { + klog.Errorf("%v != %v (difference at index %v)", got, want, i) + return false + } + } + return true +}