Skip to content

Commit

Permalink
refactor Service Controller to it's own file (#6395)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabelf5 authored Sep 12, 2024
1 parent 3517036 commit f6f0aaa
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 337 deletions.
82 changes: 0 additions & 82 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,15 +507,6 @@ func (nsi *namespacedInformer) addSecretHandler(handlers cache.ResourceEventHand
nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
}

// addServiceHandler adds the handler for services to the controller
func (nsi *namespacedInformer) addServiceHandler(handlers cache.ResourceEventHandlerFuncs) {
informer := nsi.sharedInformerFactory.Core().V1().Services().Informer()
informer.AddEventHandler(handlers)
nsi.svcLister = informer.GetStore()

nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
}

// addIngressHandler adds the handler for ingresses to the controller
func (nsi *namespacedInformer) addIngressHandler(handlers cache.ResourceEventHandlerFuncs) {
informer := nsi.sharedInformerFactory.Networking().V1().Ingresses().Informer()
Expand Down Expand Up @@ -1618,79 +1609,6 @@ func (lbc *LoadBalancerController) updateVirtualServerMetrics() {
lbc.metricsCollector.SetVirtualServerRoutes(vsrCount)
}

func (lbc *LoadBalancerController) syncService(task task) {
key := task.Key

var obj interface{}
var exists bool
var err error

ns, _, _ := cache.SplitMetaNamespaceKey(key)
obj, exists, err = lbc.getNamespacedInformer(ns).svcLister.GetByKey(key)
if err != nil {
lbc.syncQueue.Requeue(task, err)
return
}

// First case: the service is the external service for the Ingress Controller
// In that case we need to update the statuses of all resources

if lbc.IsExternalServiceKeyForStatus(key) {
glog.V(3).Infof("Syncing service %v", key)

if !exists {
// service got removed
lbc.statusUpdater.ClearStatusFromExternalService()
} else {
// service added or updated
lbc.statusUpdater.SaveStatusFromExternalService(obj.(*api_v1.Service))
}

if lbc.reportStatusEnabled() {
ingresses := lbc.configuration.GetResourcesWithFilter(resourceFilter{Ingresses: true})

glog.V(3).Infof("Updating status for %v Ingresses", len(ingresses))

err := lbc.statusUpdater.UpdateExternalEndpointsForResources(ingresses)
if err != nil {
glog.Errorf("error updating ingress status in syncService: %v", err)
}
}

if lbc.areCustomResourcesEnabled && lbc.reportCustomResourceStatusEnabled() {
virtualServers := lbc.configuration.GetResourcesWithFilter(resourceFilter{VirtualServers: true})

glog.V(3).Infof("Updating status for %v VirtualServers", len(virtualServers))

err := lbc.statusUpdater.UpdateExternalEndpointsForResources(virtualServers)
if err != nil {
glog.V(3).Infof("error updating VirtualServer/VirtualServerRoute status in syncService: %v", err)
}
}

// we don't return here because technically the same service could be used in the second case
}

// Second case: the service is referenced by some resources in the cluster

// it is safe to ignore the error
namespace, name, _ := ParseNamespaceName(key)

resources := lbc.configuration.FindResourcesForService(namespace, name)

if len(resources) == 0 {
return
}
glog.V(3).Infof("Syncing service %v", key)

glog.V(3).Infof("Updating %v resources", len(resources))

resourceExes := lbc.createExtendedResources(resources)

warnings, updateErr := lbc.configurator.AddOrUpdateResources(resourceExes, true)
lbc.updateResourcesStatusAndEvents(resources, warnings, updateErr)
}

// IsExternalServiceForStatus matches the service specified by the external-service cli arg
func (lbc *LoadBalancerController) IsExternalServiceForStatus(svc *api_v1.Service) bool {
return lbc.statusUpdater.namespace == svc.Namespace && lbc.statusUpdater.externalServiceName == svc.Name
Expand Down
104 changes: 0 additions & 104 deletions internal/k8s/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package k8s
import (
"fmt"
"reflect"
"sort"

"github.com/jinzhu/copier"

Expand Down Expand Up @@ -103,109 +102,6 @@ func createSecretHandlers(lbc *LoadBalancerController) cache.ResourceEventHandle
}
}

// createServiceHandlers builds the handler funcs for services.
//
// In the update handlers below we catch two cases:
// (1) the service is the external service
// (2) the service had a change like a change of the port field of a service port (for such a change Kubernetes doesn't
// update the corresponding endpoints resource, that we monitor as well)
// or a change of the externalName field of an ExternalName service.
//
// In both cases we enqueue the service to be processed by syncService
func createServiceHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*v1.Service)

glog.V(3).Infof("Adding service: %v", svc.Name)
lbc.AddSyncQueue(svc)
},
DeleteFunc: func(obj interface{}) {
svc, isSvc := obj.(*v1.Service)
if !isSvc {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(3).Infof("Error received unexpected object: %v", obj)
return
}
svc, ok = deletedState.Obj.(*v1.Service)
if !ok {
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj)
return
}
}

glog.V(3).Infof("Removing service: %v", svc.Name)
lbc.AddSyncQueue(svc)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
curSvc := cur.(*v1.Service)
if lbc.IsExternalServiceForStatus(curSvc) {
lbc.AddSyncQueue(curSvc)
return
}
oldSvc := old.(*v1.Service)
if hasServiceChanges(oldSvc, curSvc) {
glog.V(3).Infof("Service %v changed, syncing", curSvc.Name)
lbc.AddSyncQueue(curSvc)
}
}
},
}
}

type portSort []v1.ServicePort

func (a portSort) Len() int {
return len(a)
}

func (a portSort) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}

func (a portSort) Less(i, j int) bool {
if a[i].Name == a[j].Name {
return a[i].Port < a[j].Port
}
return a[i].Name < a[j].Name
}

// hasServicedChanged checks if the service has changed based on custom rules we define (eg. port).
func hasServiceChanges(oldSvc, curSvc *v1.Service) bool {
if hasServicePortChanges(oldSvc.Spec.Ports, curSvc.Spec.Ports) {
return true
}
if hasServiceExternalNameChanges(oldSvc, curSvc) {
return true
}
return false
}

// hasServiceExternalNameChanges only compares Service.Spec.Externalname for Type ExternalName services.
func hasServiceExternalNameChanges(oldSvc, curSvc *v1.Service) bool {
return curSvc.Spec.Type == v1.ServiceTypeExternalName && oldSvc.Spec.ExternalName != curSvc.Spec.ExternalName
}

// hasServicePortChanges only compares ServicePort.Name and .Port.
func hasServicePortChanges(oldServicePorts []v1.ServicePort, curServicePorts []v1.ServicePort) bool {
if len(oldServicePorts) != len(curServicePorts) {
return true
}

sort.Sort(portSort(oldServicePorts))
sort.Sort(portSort(curServicePorts))

for i := range oldServicePorts {
if oldServicePorts[i].Port != curServicePorts[i].Port ||
oldServicePorts[i].Name != curServicePorts[i].Name {
return true
}
}
return false
}

func createVirtualServerHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down
151 changes: 0 additions & 151 deletions internal/k8s/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,160 +4,9 @@ import (
"errors"
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/intstr"
)

func TestHasServicePortChanges(t *testing.T) {
t.Parallel()
cases := []struct {
a []v1.ServicePort
b []v1.ServicePort
result bool
reason string
}{
{
[]v1.ServicePort{},
[]v1.ServicePort{},
false,
"Empty should report no changes",
},
{
[]v1.ServicePort{{
Port: 80,
}},
[]v1.ServicePort{{
Port: 8080,
}},
true,
"Different Ports",
},
{
[]v1.ServicePort{{
Port: 80,
}},
[]v1.ServicePort{{
Port: 80,
}},
false,
"Same Ports",
},
{
[]v1.ServicePort{{
Name: "asdf",
Port: 80,
}},
[]v1.ServicePort{{
Name: "asdf",
Port: 80,
}},
false,
"Same Port and Name",
},
{
[]v1.ServicePort{{
Name: "foo",
Port: 80,
}},
[]v1.ServicePort{{
Name: "bar",
Port: 80,
}},
true,
"Different Name same Port",
},
{
[]v1.ServicePort{{
Name: "foo",
Port: 8080,
}},
[]v1.ServicePort{{
Name: "bar",
Port: 80,
}},
true,
"Different Name different Port",
},
{
[]v1.ServicePort{{
Name: "foo",
}},
[]v1.ServicePort{{
Name: "fooo",
}},
true,
"Very similar Name",
},
{
[]v1.ServicePort{{
Name: "asdf",
Port: 80,
TargetPort: intstr.IntOrString{
IntVal: 80,
},
}},
[]v1.ServicePort{{
Name: "asdf",
Port: 80,
TargetPort: intstr.IntOrString{
IntVal: 8080,
},
}},
false,
"TargetPort should be ignored",
},
{
[]v1.ServicePort{{
Name: "foo",
}, {
Name: "bar",
}},
[]v1.ServicePort{{
Name: "foo",
}, {
Name: "bar",
}},
false,
"Multiple same names",
},
{
[]v1.ServicePort{{
Name: "foo",
}, {
Name: "bar",
}},
[]v1.ServicePort{{
Name: "foo",
}, {
Name: "bars",
}},
true,
"Multiple different names",
},
{
[]v1.ServicePort{{
Name: "foo",
}, {
Port: 80,
}},
[]v1.ServicePort{{
Port: 80,
}, {
Name: "foo",
}},
false,
"Some names some ports",
},
}

for _, c := range cases {
if c.result != hasServicePortChanges(c.a, c.b) {
t.Errorf("hasServicePortChanges returned %v, but expected %v for %q case", c.result, !c.result, c.reason)
}
}
}

func TestAreResourcesDifferent(t *testing.T) {
t.Parallel()
tests := []struct {
Expand Down
Loading

0 comments on commit f6f0aaa

Please sign in to comment.