Skip to content

Commit

Permalink
Support for Nodes with Empty Zone
Browse files Browse the repository at this point in the history
Nodes can have empty zone if it not has been assigned yet.

The zone was retrieved from providerID using the following Regular Expression Matching and taking the second match: ```var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]+)/([^/]+)$`)```

In the case of empty zone it was generating an Error.
In order to allow an empty zone to match the Regular Expression, it has been updated to:

```var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]*)/([^/]+)$`)```

After that, I updated all the instances where the code was checking the error, to check also for the empty zone.

I updated the Instance Group Manager to not remove the nodes which do not have a zone assigned.
  • Loading branch information
08volt committed Oct 16, 2024
1 parent 1a2a781 commit 012b702
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 23 deletions.
17 changes: 16 additions & 1 deletion pkg/instancegroups/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,17 @@ func (m *manager) Sync(nodes []string, logger klog.Logger) (err error) {
// https://github.com/kubernetes/cloud-provider-gcp/blob/fca628cb3bf9267def0abb509eaae87d2d4040f3/providers/gce/gce_loadbalancer_internal.go#L606C1-L675C1
// the m.maxIGSize should be set to 1000 as is in the cloud-provider-gcp.
zonedNodes := m.splitNodesByZone(nodes, iglogger)
iglogger.Info(fmt.Sprintf("Syncing nodes: %d nodes over %d zones", len(nodes), len(zonedNodes)))

emptyZoneNodesNames := sets.NewString(zonedNodes[zonegetter.EmptyZone]...)
if len(emptyZoneNodesNames) > 0 {
iglogger.Info(fmt.Sprintf("%d nodes have empty zone: %v. They will not be removed from instance group as long as zone is missing", len(emptyZoneNodesNames), emptyZoneNodesNames))
}

for zone, kubeNodesFromZone := range zonedNodes {
if zone == zonegetter.EmptyZone {
continue // skip ensuring instance group for empty zone
}
igName := m.namer.InstanceGroup()
if len(kubeNodesFromZone) > m.maxIGSize {
sortedKubeNodesFromZone := sets.NewString(kubeNodesFromZone...).List()
Expand All @@ -335,7 +345,12 @@ func (m *manager) Sync(nodes []string, logger klog.Logger) (err error) {
gceNodes.Insert(instance)
}

removeNodes := gceNodes.Difference(kubeNodes).List()
removalCandidates := gceNodes.Difference(kubeNodes)
iglogger.V(2).Info("Nodes that are removal candidates", "removalCandidates", events.TruncatedStringList(removalCandidates.List()))

removeNodes := removalCandidates.Difference(emptyZoneNodesNames).List() // Do not remove nodes which zone label still need to be assigned
iglogger.V(2).Info("Removing nodes (after ignoring nodes without zone assigned)", "removeNodes", events.TruncatedStringList(removeNodes)) // Do not remove nodes which zone label still need to be assigned

addNodes := kubeNodes.Difference(gceNodes).List()

iglogger.V(2).Info("Removing nodes", "removeNodes", events.TruncatedStringList(removeNodes))
Expand Down
104 changes: 101 additions & 3 deletions pkg/instancegroups/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package instancegroups

import (
"fmt"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-gce/pkg/utils"
"net/http"
"strings"
"testing"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-gce/pkg/utils"

"google.golang.org/api/googleapi"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -62,6 +63,103 @@ func newNodePool(f Provider, maxIGSize int) Manager {
return pool
}

func getNodeNames(nodes map[string]string) []string {
names := make([]string, 0)
for name, _ := range nodes {
names = append(names, name)
}
return names
}

func TestNodePoolSyncWithEmptyZone(t *testing.T) {

testCases := []struct {
desc string
instanceGroupVMs []string // VMs present before the sync, all in defaultTestZone
kubeNodes map[string]string // map of node:zone during the sync "ensure instance group"
wantedInstanceGroupVMs []string // VMs that should be there at the end of the updates
}{
{
desc: "Both nodes have zone during update do not get deleted",
instanceGroupVMs: []string{"n1", "n2"},
kubeNodes: map[string]string{
"n1": defaultTestZone,
"n2": defaultTestZone,
},
wantedInstanceGroupVMs: []string{"n1", "n2"},
},
{
desc: "Create node when zone ready and do not delete node when zone empty",
instanceGroupVMs: []string{"n1"},
kubeNodes: map[string]string{
"n1": "",
"n2": defaultTestZone,
},
wantedInstanceGroupVMs: []string{"n1", "n2"},
},
{
desc: "Do not delete nodes if zone is empty but delete if node not there",
instanceGroupVMs: []string{"n1", "n2", "n3"},
kubeNodes: map[string]string{
"n2": "",
"n3": defaultTestZone,
},
wantedInstanceGroupVMs: []string{"n2", "n3"},
},
{
desc: "Do not create one Node without zone assigned",
instanceGroupVMs: []string{"n1"},
kubeNodes: map[string]string{
"n1": defaultTestZone,
"n2": "",
},
wantedInstanceGroupVMs: []string{"n1"},
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
// create fake gce node pool with nodes in instanceGroupVMs
ig := &compute.InstanceGroup{Name: defaultNamer.InstanceGroup()}
zonesToIGs := map[string]IGsToInstances{
defaultTestZone: {
ig: sets.NewString(tc.instanceGroupVMs...),
},
}
fakeGCEInstanceGroups := NewFakeInstanceGroups(zonesToIGs, 10)

// assigne zones to nodes in kubeNodes
pool := newNodePool(fakeGCEInstanceGroups, 10)
for name, zone := range tc.kubeNodes {
manager := pool.(*manager)
zonegetter.AddFakeNodes(manager.ZoneGetter, zone, name)
}

// run sync step
nodeNames := getNodeNames(tc.kubeNodes)
err := pool.Sync(nodeNames, klog.TODO())
if err != nil {
t.Fatalf("pool.Sync(%v) returned error %v, want nil", nodeNames, err)
}

instancesList, err := fakeGCEInstanceGroups.ListInstancesInInstanceGroup(ig.Name, defaultTestZone, allInstances)
if err != nil {
t.Fatalf("fakeGCEInstanceGroups.ListInstancesInInstanceGroup(%s, %s, %s) returned error %v, want nil", ig.Name, defaultTestZone, allInstances, err)
}
instances, err := test.InstancesListToNameSet(instancesList)
if err != nil {
t.Fatalf("test.InstancesListToNameSet(%v) returned error %v, want nil", ig, err)
}

// check nodes are exactly the ones we expect to have in the instance group after the sync
wantedIGVMsSet := sets.NewString(tc.wantedInstanceGroupVMs...)
if !wantedIGVMsSet.Equal(instances) {
t.Errorf("Expected kubeNodeSet = %v is not equal to instance set = %v", wantedIGVMsSet, instances)
}
})
}
}

func TestNodePoolSync(t *testing.T) {
maxIGSize := 1000

Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (l *LocalL4EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsDat
continue
}
zone, err := l.zoneGetter.ZoneForNode(node.Name, l.logger)
if err != nil {
if err != nil || zone == zonegetter.EmptyZone {
l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
Expand Down Expand Up @@ -183,7 +183,7 @@ func (l *ClusterL4EndpointsCalculator) CalculateEndpoints(_ []types.EndpointsDat
continue
}
zone, err := l.zoneGetter.ZoneForNode(node.Name, l.logger)
if err != nil {
if err != nil || zone == zonegetter.EmptyZone {
l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
Expand Down
14 changes: 8 additions & 6 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,14 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
continue
}
zone, getZoneErr := zoneGetter.ZoneForNode(nodeName, logger)
if getZoneErr != nil {
metrics.PublishNegControllerErrorCountMetrics(getZoneErr, true)
if enableMultiSubnetCluster && errors.Is(getZoneErr, zonegetter.ErrNodeNotInDefaultSubnet) {
epLogger.Error(getZoneErr, "Detected endpoint not from default subnet. Skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeInNonDefaultSubnet]++
continue
if getZoneErr != nil || zone == zonegetter.EmptyZone {
if getZoneErr != nil {
metrics.PublishNegControllerErrorCountMetrics(getZoneErr, true)
if enableMultiSubnetCluster && errors.Is(getZoneErr, zonegetter.ErrNodeNotInDefaultSubnet) {
epLogger.Error(getZoneErr, "Detected endpoint not from default subnet. Skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeInNonDefaultSubnet]++
continue
}
}
epLogger.Error(getZoneErr, "Endpoint's corresponding node does not have valid zone information, skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeNotFound]++
Expand Down
8 changes: 3 additions & 5 deletions pkg/utils/zonegetter/zone_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
AllNodesFilter = Filter("AllNodesFilter")
CandidateNodesFilter = Filter("CandidateNodesFilter")
CandidateAndUnreadyNodesFilter = Filter("CandidateAndUnreadyNodesFilter")
EmptyZone = ""
)

var ErrProviderIDNotFound = errors.New("providerID not found")
Expand All @@ -58,7 +59,7 @@ var ErrNodePodCIDRNotSet = errors.New("Node does not have PodCIDR set")

// providerIDRE is the regex to process providerID.
// A providerID is build out of '${ProviderName}://${project-id}/${zone}/${instance-name}'
var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]+)/([^/]+)$`)
var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]*)/([^/]+)$`)

// ZoneGetter manages lookups for GCE instances to zones.
type ZoneGetter struct {
Expand Down Expand Up @@ -161,7 +162,7 @@ func (z *ZoneGetter) ListZones(filter Filter, logger klog.Logger) ([]string, err
zones := sets.String{}
for _, n := range nodes {
zone, err := getZone(n)
if err != nil {
if err != nil || zone == EmptyZone {
filterLogger.Error(err, "Failed to get zone from providerID", "nodeName", n.Name)
continue
}
Expand Down Expand Up @@ -325,9 +326,6 @@ func getZone(node *api_v1.Node) (string, error) {
if len(matches) != 4 {
return "", fmt.Errorf("%w: providerID %q of node %s is not valid", ErrSplitProviderID, node.Spec.ProviderID, node.Name)
}
if matches[2] == "" {
return "", fmt.Errorf("%w: node %s has an empty zone", ErrSplitProviderID, node.Name)
}
return matches[2], nil
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/utils/zonegetter/zone_getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestListZones(t *testing.T) {
t.Errorf("For test case %q with onlyIncludeDefaultSubnetNodes = %v, got %d zones, want %d zones", tc.desc, enableMultiSubnetCluster, len(zones), tc.expectLen)
}
for _, zone := range zones {
if zone == "" {
if zone == EmptyZone {
t.Errorf("For test case %q with onlyIncludeDefaultSubnetNodes = %v, got an empty zone,", tc.desc, enableMultiSubnetCluster)
}
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestListZonesMultipleSubnets(t *testing.T) {
t.Errorf("For test case %q with multi subnet cluster enabled, got %d zones, want %d zones", tc.desc, len(zones), tc.expectLen)
}
for _, zone := range zones {
if zone == "" {
if zone == EmptyZone {
t.Errorf("For test case %q with multi subnet cluster enabled, got an empty zone,", tc.desc)
}
}
Expand Down Expand Up @@ -239,8 +239,8 @@ func TestZoneForNode(t *testing.T) {
{
desc: "Node with empty zone in providerID",
nodeName: "instance-empty-zone-providerID",
expectZone: "",
expectErr: ErrSplitProviderID,
expectZone: EmptyZone,
expectErr: nil,
},
}
for _, tc := range testCases {
Expand Down Expand Up @@ -355,8 +355,8 @@ func TestGetZone(t *testing.T) {
ProviderID: "gce://foo-project//bar-node",
},
},
expectZone: "",
expectErr: ErrSplitProviderID,
expectZone: EmptyZone,
expectErr: nil,
},
} {
zone, err := getZone(&tc.node)
Expand Down

0 comments on commit 012b702

Please sign in to comment.