Skip to content

Commit

Permalink
Merge pull request #2689 from sawsa307/refactor-layering
Browse files Browse the repository at this point in the history
Moving from string(zone) to endpointGroupInfo when mapping NEGs, and populate subnet values for nodes in zoneGetter and NEG syncers.
  • Loading branch information
k8s-ci-robot authored Oct 22, 2024
2 parents b67ef1c + 5c2ba6a commit a596e6e
Show file tree
Hide file tree
Showing 21 changed files with 1,032 additions and 845 deletions.
2 changes: 1 addition & 1 deletion pkg/instancegroups/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (m *manager) List(logger klog.Logger) ([]string, error) {
func (m *manager) splitNodesByZone(names []string, logger klog.Logger) map[string][]string {
nodesByZone := map[string][]string{}
for _, name := range names {
zone, err := m.ZoneGetter.ZoneForNode(name, logger)
zone, _, err := m.ZoneGetter.ZoneAndSubnetForNode(name, logger)
if err != nil {
logger.Error(err, "Failed to get zones for instance node, skipping", "name", name)
continue
Expand Down
15 changes: 11 additions & 4 deletions pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ const (

shortSessionAffinityIdleTimeout = int32(20) // 20 sec could be used for regular Session Affinity
longSessionAffinityIdleTimeout = int32(2 * 60) // 2 min or 120 sec for Strong Session Affinity

defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default"
)

var (
Expand Down Expand Up @@ -323,8 +325,13 @@ func buildContext(vals gce.TestClusterValues) *ingctx.ControllerContext {
}

func newL4NetLBServiceController() *L4NetLBController {
stopCh := make(chan struct{})
vals := gce.DefaultTestClusterValues()
vals.SubnetworkURL = defaultTestSubnetURL
return createL4NetLBServiceController(vals)
}

func createL4NetLBServiceController(vals gce.TestClusterValues) *L4NetLBController {
stopCh := make(chan struct{})
ctx := buildContext(vals)
nodes, err := test.CreateAndInsertNodes(ctx.Cloud, []string{"instance-1", "instance-2"}, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -1663,7 +1670,7 @@ func TestDualStackServiceNeedsUpdate(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()

controller := newL4NetLBServiceController()
controller := createL4NetLBServiceController(gce.DefaultTestClusterValues())
controller.enableDualStack = true
oldSvc := test.NewL4NetLBRBSService(8080)
oldSvc.Spec.IPFamilies = tc.initialIPFamilies
Expand Down Expand Up @@ -1862,7 +1869,7 @@ func TestCreateDeleteDualStackNetLBService(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
controller := newL4NetLBServiceController()
controller := createL4NetLBServiceController(gce.DefaultTestClusterValues())
controller.enableDualStack = true
svc := test.NewL4NetLBRBSService(8080)
svc.Spec.IPFamilies = tc.ipFamilies
Expand Down Expand Up @@ -1907,7 +1914,7 @@ func TestCreateDeleteDualStackNetLBService(t *testing.T) {
}
func TestProcessDualStackNetLBServiceOnUserError(t *testing.T) {
t.Parallel()
controller := newL4NetLBServiceController()
controller := createL4NetLBServiceController(gce.DefaultTestClusterValues())
controller.enableDualStack = true
svc := test.NewL4NetLBRBSService(8080)
svc.Spec.IPFamilies = []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol}
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
labelValue2 = "v2"
negName1 = "neg1"

defaultTestSubnet = "default"
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/metrics/metricscollector/metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (sm *SyncerMetrics) computeEndpointStateMetrics() (negtypes.StateCountMap,

// CollectDualStackMigrationMetrics will be used by dualstack.Migrator to export
// metrics.
func (sm *SyncerMetrics) CollectDualStackMigrationMetrics(key negtypes.NegSyncerKey, committedEndpoints map[string]negtypes.NetworkEndpointSet, migrationCount int) {
func (sm *SyncerMetrics) CollectDualStackMigrationMetrics(key negtypes.NegSyncerKey, committedEndpoints map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet, migrationCount int) {
sm.updateMigrationStartAndEndTime(key, migrationCount)
sm.updateEndpointsCountPerType(key, committedEndpoints, migrationCount)
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (sm *SyncerMetrics) updateMigrationStartAndEndTime(key negtypes.NegSyncerKe
sm.dualStackMigrationStartTime[key] = sm.clock.Now()
}

func (sm *SyncerMetrics) updateEndpointsCountPerType(key negtypes.NegSyncerKey, committedEndpoints map[string]negtypes.NetworkEndpointSet, migrationCount int) {
func (sm *SyncerMetrics) updateEndpointsCountPerType(key negtypes.NegSyncerKey, committedEndpoints map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet, migrationCount int) {
sm.mu.Lock()
defer sm.mu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions pkg/neg/metrics/metricscollector/metrics_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ func TestUpdateMigrationStartAndEndTime(t *testing.T) {

func TestUpdateEndpointsCountPerType(t *testing.T) {
syncerKey := syncerKey(1)
inputCommittedEndpoints := map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
inputCommittedEndpoints := map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet{
{Zone: "zone1"}: types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "ipv4only-1"}, {IP: "ipv4only-2"}, {IP: "ipv4only-3"},
{IPv6: "ipv6only-1"},
{IP: "dualStack-1a", IPv6: "dualStack-1b"}, {IP: "dualStack-2a", IPv6: "dualStack-2b"},
}...),
"zone2": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{Zone: "zone2"}: types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "ipv4only-4"},
{IP: "dualStack-3a", IPv6: "dualStack-3b"},
}...),
Expand Down
91 changes: 46 additions & 45 deletions pkg/neg/syncers/dualstack/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"k8s.io/ingress-gce/pkg/neg/types"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ type errorStateChecker interface {
}

type MetricsCollector interface {
CollectDualStackMigrationMetrics(key types.NegSyncerKey, committedEndpoints map[string]types.NetworkEndpointSet, migrationCount int)
CollectDualStackMigrationMetrics(key types.NegSyncerKey, committedEndpoints map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet, migrationCount int)
}

func NewMigrator(enableDualStackNEG bool, syncer syncable, syncerKey types.NegSyncerKey, metricsCollector MetricsCollector, errorStateChecker errorStateChecker, logger klog.Logger) *Migrator {
Expand All @@ -130,13 +131,13 @@ func NewMigrator(enableDualStackNEG bool, syncer syncable, syncerKey types.NegSy
// 2. If the migrator is not currently paused, it will also start the
// detachment of a subset of migration-endpoints from a single zone.
//
// The returned string represents the zone for which detachment was started. An
// empty return value signifies that detachment was not started (which is the
// case when there were no migration-endpoints to begin with, or the migrator
// was paused.)
func (d *Migrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) string {
// The returned EndpointGroup represents the zone and subnet of NEG for which
// detachment was started on. An empty subnet and zone value signifies that
// detachment was not started (which is the case when there were no
// migration-endpoints to begin with, or the migrator was paused.)
func (d *Migrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet) negtypes.EndpointGroupInfo {
if !d.enableDualStack {
return ""
return negtypes.EndpointGroupInfo{}
}

_, migrationEndpointsInRemoveSet := findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints)
Expand All @@ -147,7 +148,7 @@ func (d *Migrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[
paused := d.isPaused()
if migrationCount == 0 || paused {
d.logger.V(2).Info("Not starting migration detachments", "migrationCount", migrationCount, "paused", paused)
return ""
return negtypes.EndpointGroupInfo{}
}

return d.calculateMigrationEndpointsToDetach(addEndpoints, removeEndpoints, committedEndpoints, migrationEndpointsInRemoveSet)
Expand Down Expand Up @@ -243,7 +244,7 @@ func (d *Migrator) isPaused() bool {
// function) AND (2) we are in degraded mode OR the previous successful
// detach was quite recent (as determined by the
// tooLongSincePreviousDetach() function)
func (d *Migrator) calculateMigrationEndpointsToDetach(addEndpoints, removeEndpoints, committedEndpoints, migrationEndpoints map[string]types.NetworkEndpointSet) string {
func (d *Migrator) calculateMigrationEndpointsToDetach(addEndpoints, removeEndpoints, committedEndpoints, migrationEndpoints map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet) negtypes.EndpointGroupInfo {
addCount := endpointsCount(addEndpoints)
committedCount := endpointsCount(committedEndpoints)
migrationCount := endpointsCount(migrationEndpoints)
Expand All @@ -258,19 +259,19 @@ func (d *Migrator) calculateMigrationEndpointsToDetach(addEndpoints, removeEndpo
"previousDetach", d.previousDetach,
"previousDetachThreshold", d.previousDetachThreshold,
)
return ""
return negtypes.EndpointGroupInfo{}
}

// Find the zone which has the maximum number of migration-endpoints.
zone, maxZoneEndpointCount := "", 0
for curZone, endpointSet := range migrationEndpoints {
// Find the zone and subnet which has the maximum number of migration-endpoints.
endpointGroupInfo, maxZoneEndpointCount := negtypes.EndpointGroupInfo{}, 0
for curEndpointInfo, endpointSet := range migrationEndpoints {
if endpointSet.Len() > maxZoneEndpointCount {
maxZoneEndpointCount = endpointSet.Len()
zone = curZone
endpointGroupInfo = curEndpointInfo
}
}
if zone == "" {
return ""
if endpointGroupInfo.Zone == "" && endpointGroupInfo.Subnet == "" {
return negtypes.EndpointGroupInfo{}
}

currentlyMigratingCount := int(math.Ceil(float64(committedCount+migrationCount) * d.fractionOfMigratingEndpoints))
Expand All @@ -279,18 +280,18 @@ func (d *Migrator) calculateMigrationEndpointsToDetach(addEndpoints, removeEndpo
}
d.logger.V(2).Info("Result of migration heuristic calculations", "currentlyMigratingCount", currentlyMigratingCount, "totalMigrationCount", migrationCount)

if removeEndpoints[zone] == nil {
removeEndpoints[zone] = types.NewNetworkEndpointSet()
if removeEndpoints[endpointGroupInfo] == nil {
removeEndpoints[endpointGroupInfo] = types.NewNetworkEndpointSet()
}
for i := 0; i < currentlyMigratingCount; i++ {
endpoint, ok := migrationEndpoints[zone].PopAny()
endpoint, ok := migrationEndpoints[endpointGroupInfo].PopAny()
if !ok {
break
}
removeEndpoints[zone].Insert(endpoint)
removeEndpoints[endpointGroupInfo].Insert(endpoint)
}

return zone
return endpointGroupInfo
}

// Returns true if there are many endpoints waiting to be attached.
Expand All @@ -314,18 +315,18 @@ func (d *Migrator) tooLongSincePreviousDetach() bool {
// modified. The returned value will be two endpoints sets which will contain
// the values that were filtered out from the `addEndpoints` and
// `removeEndpoints` sets respectively.
func findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, map[string]types.NetworkEndpointSet) {
allEndpoints := make(map[string]types.NetworkEndpointSet)
for zone, endpointSet := range addEndpoints {
allEndpoints[zone] = allEndpoints[zone].Union(endpointSet)
func findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet) (map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet, map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet) {
allEndpoints := make(map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet)
for endpointGroupInfo, endpointSet := range addEndpoints {
allEndpoints[endpointGroupInfo] = allEndpoints[endpointGroupInfo].Union(endpointSet)
}
for zone, endpointSet := range removeEndpoints {
allEndpoints[zone] = allEndpoints[zone].Union(endpointSet)
for endpointGroupInfo, endpointSet := range removeEndpoints {
allEndpoints[endpointGroupInfo] = allEndpoints[endpointGroupInfo].Union(endpointSet)
}

migrationEndpointsInAddSet := make(map[string]types.NetworkEndpointSet)
migrationEndpointsInRemoveSet := make(map[string]types.NetworkEndpointSet)
for zone, endpointSet := range allEndpoints {
migrationEndpointsInAddSet := make(map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet)
migrationEndpointsInRemoveSet := make(map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet)
for endpointGroupInfo, endpointSet := range allEndpoints {
for endpoint := range endpointSet {
if endpoint.IP == "" || endpoint.IPv6 == "" {
// Endpoint is not dual-stack so continue.
Expand All @@ -346,42 +347,42 @@ func findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints map[string]ty

isMigrating := false
// Check if endpoint is migrating from dual-stack to single-stack.
isMigrating = isMigrating || moveEndpoint(ipv4Only, addEndpoints, migrationEndpointsInAddSet, zone)
isMigrating = isMigrating || moveEndpoint(ipv6Only, addEndpoints, migrationEndpointsInAddSet, zone)
isMigrating = isMigrating || moveEndpoint(ipv4Only, addEndpoints, migrationEndpointsInAddSet, endpointGroupInfo)
isMigrating = isMigrating || moveEndpoint(ipv6Only, addEndpoints, migrationEndpointsInAddSet, endpointGroupInfo)
// Check if endpoint is migrating from single-stack to dual-stack.
isMigrating = isMigrating || moveEndpoint(ipv4Only, removeEndpoints, migrationEndpointsInRemoveSet, zone)
isMigrating = isMigrating || moveEndpoint(ipv6Only, removeEndpoints, migrationEndpointsInRemoveSet, zone)
isMigrating = isMigrating || moveEndpoint(ipv4Only, removeEndpoints, migrationEndpointsInRemoveSet, endpointGroupInfo)
isMigrating = isMigrating || moveEndpoint(ipv6Only, removeEndpoints, migrationEndpointsInRemoveSet, endpointGroupInfo)

if isMigrating {
moveEndpoint(endpoint, addEndpoints, migrationEndpointsInAddSet, zone)
moveEndpoint(endpoint, removeEndpoints, migrationEndpointsInRemoveSet, zone)
moveEndpoint(endpoint, addEndpoints, migrationEndpointsInAddSet, endpointGroupInfo)
moveEndpoint(endpoint, removeEndpoints, migrationEndpointsInRemoveSet, endpointGroupInfo)
}
}
}

return migrationEndpointsInAddSet, migrationEndpointsInRemoveSet
}

// moveEndpoint deletes endpoint `e` from `source[zone]` and adds it to
// `dest[zone]`. If the move was successful, `true` is returned. A return value
// moveEndpoint deletes endpoint `e` from `source[endpointGroupInfo]` and adds it to
// `dest[endpointGroupInfo]`. If the move was successful, `true` is returned. A return value
// of `false` denotes that nothing was moved and no input variable were
// modified.
func moveEndpoint(e types.NetworkEndpoint, source, dest map[string]types.NetworkEndpointSet, zone string) bool {
func moveEndpoint(e types.NetworkEndpoint, source, dest map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet, endpointGroupInfo negtypes.EndpointGroupInfo) bool {
if source == nil || dest == nil {
return false
}
if source[zone].Has(e) {
source[zone].Delete(e)
if dest[zone] == nil {
dest[zone] = types.NewNetworkEndpointSet()
if source[endpointGroupInfo].Has(e) {
source[endpointGroupInfo].Delete(e)
if dest[endpointGroupInfo] == nil {
dest[endpointGroupInfo] = types.NewNetworkEndpointSet()
}
dest[zone].Insert(e)
dest[endpointGroupInfo].Insert(e)
return true
}
return false
}

func endpointsCount(endpointSets map[string]types.NetworkEndpointSet) int {
func endpointsCount(endpointSets map[negtypes.EndpointGroupInfo]types.NetworkEndpointSet) int {
var count int
for _, endpointSet := range endpointSets {
count += endpointSet.Len()
Expand Down
Loading

0 comments on commit a596e6e

Please sign in to comment.