Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Race conditions in Targeted Deletion of machines by CA #341

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
43cffaf
[WIP] fix CA marking machines for deletion
Dec 19, 2024
0d939b0
[WIP] add mutex for machine deployment
Dec 19, 2024
98f20d3
initialise machinedeployment map in mcmManager
Dec 20, 2024
56d80ac
add Refresh method in nodegrp implementation
Dec 20, 2024
f3774f4
address review comments
Dec 24, 2024
9063248
address review comments - part 2
Dec 26, 2024
68d2046
update unit tests, misc code changes
Dec 28, 2024
b2e1c3c
review comments addressed
elankath Jan 16, 2025
c515f39
fixed broken test after refactor
elankath Jan 16, 2025
063f07a
fixed broken test
elankath Jan 16, 2025
945c74c
alternate solution using single annotation for deletion by CA
elankath Jan 28, 2025
b021a3c
fixed use of Go 1.23 functions
elankath Jan 28, 2025
3b02417
fixed test
elankath Jan 28, 2025
22ced54
added godoc for AcquireScalingMutex
elankath Jan 28, 2025
bc85986
correct godoc for machinesMarkedByCAForDeletionAnnotation
elankath Jan 29, 2025
4d33bef
changed to machineutils.TriggerDeletionByMCM
elankath Jan 29, 2025
6ea649b
removed var shadowing, added TODO to godoc for util fns
elankath Jan 29, 2025
f890502
corr var names, clear mutex acquire, releasing logs
elankath Jan 30, 2025
9d1d313
ensured IncreaseSize/DecreaseTargetSize logged delta in mutex acquir…
elankath Jan 30, 2025
b3f9167
review comments addressed: fixed unit test, adjusted log, removed red…
elankath Feb 3, 2025
7d1d0d7
all code review comments addressed
elankath Feb 3, 2025
194b65a
review feedback: unexport belongs, enforce interface
elankath Feb 3, 2025
136ea27
addreseed review comment, refactored added test for computeScaleDownData
elankath Feb 3, 2025
f31884d
review comment: preset capacity for slices
elankath Feb 3, 2025
36b59f1
review feedback: ->computeScaleDownData and revised godoc
elankath Feb 4, 2025
0e21faa
cordon nodes fix for when node is disabled for scaledown
elankath Feb 4, 2025
12e747f
fixed unit test string quoting issue
elankath Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 73 additions & 38 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ package mcm
import (
"context"
"fmt"
"github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"slices"
"strconv"
"strings"
"sync"
"time"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -67,15 +72,14 @@ const (
// MCMCloudProvider implements the cloud provider interface for machine-controller-manager
// Reference: https://github.com/gardener/machine-controller-manager
type mcmCloudProvider struct {
mcmManager *McmManager
machinedeployments map[types.NamespacedName]*MachineDeployment
resourceLimiter *cloudprovider.ResourceLimiter
mcmManager *McmManager
resourceLimiter *cloudprovider.ResourceLimiter
}

// BuildMcmCloudProvider builds CloudProvider implementation for machine-controller-manager.
func BuildMcmCloudProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) {
if mcmManager.discoveryOpts.StaticDiscoverySpecified() {
return buildStaticallyDiscoveringProvider(mcmManager, mcmManager.discoveryOpts.NodeGroupSpecs, resourceLimiter)
return buildStaticallyDiscoveringProvider(mcmManager, resourceLimiter)
}
return nil, fmt.Errorf("Failed to build an mcm cloud provider: Either node group specs or node group auto discovery spec must be specified")
}
Expand All @@ -96,16 +100,10 @@ func BuildMCM(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
return provider
}

func buildStaticallyDiscoveringProvider(mcmManager *McmManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) {
func buildStaticallyDiscoveringProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) {
mcm := &mcmCloudProvider{
mcmManager: mcmManager,
machinedeployments: make(map[types.NamespacedName]*MachineDeployment),
resourceLimiter: resourceLimiter,
}
for _, spec := range specs {
if err := mcm.addNodeGroup(spec); err != nil {
return nil, err
}
mcmManager: mcmManager,
resourceLimiter: resourceLimiter,
}
return mcm, nil
}
Expand All @@ -116,31 +114,14 @@ func (mcm *mcmCloudProvider) Cleanup() error {
return nil
}

// addNodeGroup adds node group defined in string spec. Format:
// minNodes:maxNodes:namespace.machineDeploymentName
func (mcm *mcmCloudProvider) addNodeGroup(spec string) error {
machinedeployment, err := buildMachineDeploymentFromSpec(spec, mcm.mcmManager)
if err != nil {
return err
}
mcm.addMachineDeployment(machinedeployment)
return nil
}

func (mcm *mcmCloudProvider) addMachineDeployment(machinedeployment *MachineDeployment) {
key := types.NamespacedName{Namespace: machinedeployment.Namespace, Name: machinedeployment.Name}
mcm.machinedeployments[key] = machinedeployment
return
}

func (mcm *mcmCloudProvider) Name() string {
return "machine-controller-manager"
}

// NodeGroups returns all node groups configured for this cloud provider.
func (mcm *mcmCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
result := make([]cloudprovider.NodeGroup, 0, len(mcm.machinedeployments))
for _, machinedeployment := range mcm.machinedeployments {
result := make([]cloudprovider.NodeGroup, 0, len(mcm.mcmManager.machineDeployments))
for _, machinedeployment := range mcm.mcmManager.machineDeployments {
if machinedeployment.maxSize == 0 {
continue
}
Expand Down Expand Up @@ -172,7 +153,7 @@ func (mcm *mcmCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.N
}

key := types.NamespacedName{Namespace: md.Namespace, Name: md.Name}
_, isManaged := mcm.machinedeployments[key]
_, isManaged := mcm.mcmManager.machineDeployments[key]
if !isManaged {
klog.V(4).Infof("Skipped node %v, it's not managed by this controller", node.Spec.ProviderID)
return nil, nil
Expand Down Expand Up @@ -293,8 +274,9 @@ type MachineDeployment struct {

mcmManager *McmManager

minSize int
maxSize int
scalingMutex sync.Mutex
minSize int
maxSize int
}

// MaxSize returns maximum size of the node group.
Expand Down Expand Up @@ -343,6 +325,8 @@ func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error {
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
machinedeployment.scalingMutex.Lock()
defer machinedeployment.scalingMutex.Unlock()
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
if err != nil {
return err
Expand All @@ -366,6 +350,8 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
if delta >= 0 {
return fmt.Errorf("size decrease size must be negative")
}
machinedeployment.scalingMutex.Lock()
defer machinedeployment.scalingMutex.Unlock()
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
if err != nil {
return err
Expand All @@ -380,6 +366,54 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
}, "MachineDeployment", "update", machinedeployment.Name)
}

// Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment
func (machineDeployment *MachineDeployment) Refresh() error {
machineDeployment.scalingMutex.Lock()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should a scaling mutex be used in Refresh?
Scaling mutex by its very name signifies mutex meant to be taken for any scaling event and not when its trying to read/get machine deployments.

Also can this result in the next reconcile of RunOnce to be stuck because it cannot go beyond CloudProvider.Refresh call?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a mutex when running scaling operations or modifying attributes that directly affect the scaling operation like the newly introduced annotation. There is not any doubt about this as we HAVE to prevent concurrent scaleups and scsledowns due to the fact that our MCM scaling is async and that multiple Go routines can perform scale downs, leading to non-deterministic behaviour.

You have a point that RunOnce should return an error and not block if the mutex is already acquired. Will change the code to check if mutex is acquired and if so return an informative error. Will also test this change.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified to take a TryLock in NodeGroupImpl.Refresh(). This is also a limited fix since it will block CA from changing other NodeGroups but won't block go-routines. I don't see any better way with current CA architecture. One possible improvement is to introduce a mutex with a timeout. If you find a better way, please let me know.

defer machineDeployment.scalingMutex.Unlock()
mcd, err := machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name)
Copy link

@elankath elankath Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is repeated nearly a dozen times everywhere including common error handling: machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name) . Move to a method in mcmManager called GetMachineDeploymentResource which returns a formatted error that can simply be returned, so that error message is fully consistent with all uses. we are already having methods like mcmManager.getMachinesForMachineDeployment so this matches the existing convention.

if err != nil {
return fmt.Errorf("failed to get machine deployment %s: %v", machineDeployment.Name, err)
}
// ignore the machine deployment if it is in rolling update
if !isRollingUpdateFinished(mcd) {
klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name)
return nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we return an error here ? We are doing it for other cases of !isRollingUpdateFinished.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove this check. Even if a machineDeployment is under rolling update, we should allow the annotation update if needed. wdyt?

}
markedMachines := sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to a small function mcm.getMachinesMarkedByCAForDeletion(mcd) (machineNames sets.Set[string]) which is unit testable and can be consumed by both the mcm_cloud_provider.go and mcm_manager.go

machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name)
if err != nil {
unmarshall marked this conversation as resolved.
Show resolved Hide resolved
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error())
return err
}
var incorrectlyMarkedMachines []*Ref

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can omit the Ref struct and just used types.NamespacedName which is already being used anyways in this file

for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a utiltiy method isMachineFailedOrTerminating . Is that not suitable or make it suitable ?

continue
}
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, Why is this called priorityValueForCandidateMachines ? Shouldn't it be defined as const PriorityDeletionValueForCandidateMachine = 1 ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rename it to PriorityValueForDeletionCandidateMachine. wdyt?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also add a comment here to explain what we are doing so next guy making a patch doesn't scratch his head.

incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace})
}
}
var updatedMarkedMachines []string

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updatedMarkedMachines -> updatedMarkedMachineNames. Also please add a comment explaining that we do to ensure that annotation do not have non-existent machine names.

for machineName := range markedMachines {
Copy link

@elankath elankath Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Technically, you can create allMachineNames and use Set.Intersection here - which is cheaper than nested for loop, but its OK.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK k8s.io/apimachinery/pkg/util/sets does not have an Intersection method.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but it is not part of the apimachinery version we currently use. Anyways, we do not need sets, I have replaced them with slices.

if slices.ContainsFunc(machines, func(mc *v1alpha1.Machine) bool {
return mc.Name == machineName
}) {
updatedMarkedMachines = append(updatedMarkedMachines, machineName)
}
}
clone := mcd.DeepCopy()
clone.Annotations[machinesMarkedByCAForDeletion] = strings.Join(updatedMarkedMachines, ",")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This strings.Join to construct the annotation repeated elsewhere. Make a utility method called getMarkedForDeletionAnnotationValue(machineNames []string) string

ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout)
defer cancelFn()
_, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return err
}
return machineDeployment.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines)
}

// Belongs returns true if the given node belongs to the NodeGroup.
// TODO: Implement this to iterate over machines under machinedeployment, and return true if node exists in list.
func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) {
Expand Down Expand Up @@ -541,9 +575,10 @@ func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*Mach

func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment {
return &MachineDeployment{
mcmManager: mcmManager,
minSize: minSize,
maxSize: maxSize,
mcmManager: mcmManager,
minSize: minSize,
maxSize: maxSize,
scalingMutex: sync.Mutex{},
Ref: Ref{
Name: name,
Namespace: namespace,
Expand Down
Loading
Loading