Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate Service with LB ID and use for lookup #245

Merged
merged 7 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Changed

* Annotate Service objects by load-balancer UUIDs to enable free LB renames and improve the DO API consumption performance (@timoreimann)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed this got merged into the v0.1.16 section

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch -- submitted #249 to fix.

* Update Kubernetes dependencies to 1.15.0 (@timoreimann)
* Set default LB health check protocol to TCP if not specified (@snormore)
* Default to HTTP for sticky sessions if no protocol is defined (@snormore)
Expand Down
2 changes: 1 addition & 1 deletion cloud-controller-manager/do/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
clientset := clientBuilder.ClientOrDie("do-shared-informers")
sharedInformer := informers.NewSharedInformerFactory(clientset, 0)

res := NewResourcesController(c.resources, sharedInformer.Core().V1().Services(), clientset, c.client)
res := NewResourcesController(c.resources, sharedInformer.Core().V1().Services(), clientset)

sharedInformer.Start(nil)
sharedInformer.WaitForCacheSync(nil)
Expand Down
3 changes: 2 additions & 1 deletion cloud-controller-manager/do/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package do

import (
"context"
"errors"
"fmt"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -71,7 +72,7 @@ func allLoadBalancerList(ctx context.Context, client *godo.Client) ([]godo.LoadB
}

if resp == nil {
return nil, fmt.Errorf("load balancers list request returned no response")
return nil, errors.New("load balancers list request returned no response")
}

list = append(list, lbs...)
Expand Down
14 changes: 7 additions & 7 deletions cloud-controller-manager/do/droplets.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newInstances(resources *resources, region string) cloudprovider.Instances {
// When nodeName identifies more than one droplet, only the first will be
// considered.
func (i *instances) NodeAddresses(ctx context.Context, nodeName types.NodeName) ([]v1.NodeAddress, error) {
droplet, err := dropletByName(ctx, i.resources.client, nodeName)
droplet, err := dropletByName(ctx, i.resources.gclient, nodeName)
if err != nil {
return nil, err
}
Expand All @@ -69,7 +69,7 @@ func (i *instances) NodeAddressesByProviderID(ctx context.Context, providerID st
return nil, err
}

droplet, err := dropletByID(ctx, i.resources.client, id)
droplet, err := dropletByID(ctx, i.resources.gclient, id)
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (i *instances) ExternalID(ctx context.Context, nodeName types.NodeName) (st

// InstanceID returns the cloud provider ID of the droplet identified by nodeName.
func (i *instances) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
droplet, err := dropletByName(ctx, i.resources.client, nodeName)
droplet, err := dropletByName(ctx, i.resources.gclient, nodeName)
if err != nil {
return "", err
}
Expand All @@ -98,7 +98,7 @@ func (i *instances) InstanceID(ctx context.Context, nodeName types.NodeName) (st

// InstanceType returns the type of the droplet identified by name.
func (i *instances) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
droplet, err := dropletByName(ctx, i.resources.client, name)
droplet, err := dropletByName(ctx, i.resources.gclient, name)
if err != nil {
return "", err
}
Expand All @@ -113,7 +113,7 @@ func (i *instances) InstanceTypeByProviderID(ctx context.Context, providerID str
return "", err
}

droplet, err := dropletByID(ctx, i.resources.client, id)
droplet, err := dropletByID(ctx, i.resources.gclient, id)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (i *instances) InstanceExistsByProviderID(ctx context.Context, providerID s
return false, err
}

_, err = dropletByID(ctx, i.resources.client, id)
_, err = dropletByID(ctx, i.resources.gclient, id)
if err == nil {
return true, nil
}
Expand All @@ -166,7 +166,7 @@ func (i *instances) InstanceShutdownByProviderID(ctx context.Context, providerID
return false, fmt.Errorf("error getting droplet ID from provider ID %q: %s", providerID, err)
}

droplet, err := dropletByID(ctx, i.resources.client, dropletID)
droplet, err := dropletByID(ctx, i.resources.gclient, dropletID)
if err != nil {
return false, fmt.Errorf("error getting droplet %q by ID: %s", dropletID, err)
}
Expand Down
12 changes: 6 additions & 6 deletions cloud-controller-manager/do/droplets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestNodeAddresses(t *testing.T) {
return droplets, resp, nil
}

res := &resources{client: newFakeClient(fake)}
res := &resources{gclient: newFakeClient(fake)}
instances := newInstances(res, "nyc1")

expectedAddresses := []v1.NodeAddress{
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestNodeAddressesByProviderID(t *testing.T) {
resp := newFakeOKResponse()
return droplet, resp, nil
}
res := &resources{client: newFakeClient(fake)}
res := &resources{gclient: newFakeClient(fake)}
instances := newInstances(res, "nyc1")

expectedAddresses := []v1.NodeAddress{
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestInstanceID(t *testing.T) {
return droplets, resp, nil
}

res := &resources{client: newFakeClient(fake)}
res := &resources{gclient: newFakeClient(fake)}
instances := newInstances(res, "nyc1")

id, err := instances.InstanceID(context.TODO(), "test-droplet")
Expand All @@ -259,7 +259,7 @@ func TestInstanceType(t *testing.T) {
return droplets, resp, nil
}

res := &resources{client: newFakeClient(fake)}
res := &resources{gclient: newFakeClient(fake)}
instances := newInstances(res, "nyc1")

instanceType, err := instances.InstanceType(context.TODO(), "test-droplet")
Expand All @@ -280,7 +280,7 @@ func Test_InstanceShutdownByProviderID(t *testing.T) {
return droplet, resp, nil
}

res := &resources{client: newFakeClient(fake)}
res := &resources{gclient: newFakeClient(fake)}
instances := newInstances(res, "nyc1")

shutdown, err := instances.InstanceShutdownByProviderID(context.TODO(), "digitalocean://123")
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestDropletMatching(t *testing.T) {
return droplets, resp, nil
}

res := &resources{client: newFakeClient(fake)}
res := &resources{gclient: newFakeClient(fake)}
instances := newInstances(res, "nyc1")

addresses, err := instances.NodeAddresses(context.Background(), types.NodeName(test.nodeName))
Expand Down
171 changes: 124 additions & 47 deletions cloud-controller-manager/do/loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
)

const (
// annoDOLoadBalancerID is the annotation specifying the load-balancer ID
// used to enable fast retrievals of load-balancers from the API by UUID.
annoDOLoadBalancerID = "kubernetes.digitalocean.com/load-balancer-id"

// annDOProtocol is the annotation used to specify the default protocol
// for DO load balancers. For ports specified in annDOTLSPorts, this protocol
// is overwritten to https. Options are tcp, http and https. Defaults to tcp.
Expand Down Expand Up @@ -172,14 +176,12 @@ func newLoadBalancers(resources *resources, client *godo.Client, region string)
//
// GetLoadBalancer will not modify service.
func (l *loadBalancers) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
lbName := l.GetLoadBalancerName(ctx, clusterName, service)
lb, found := l.resources.LoadBalancerByName(lbName)
if !found {
return nil, false, nil
}

if lb.Status != lbStatusActive {
return nil, true, fmt.Errorf("load-balancer not active, currently %s", lb.Status)
lb, err := l.retrieveAndAnnotateLoadBalancer(ctx, service)
if err != nil {
if err == errLBNotFound {
return nil, false, nil
}
return nil, false, err
}

return &v1.LoadBalancerStatus{
Expand All @@ -206,45 +208,50 @@ func getDefaultLoadBalancerName(service *v1.Service) string {
//
// EnsureLoadBalancer will not modify service or nodes.
func (l *loadBalancers) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
lbStatus, exists, err := l.GetLoadBalancer(ctx, clusterName, service)
lbRequest, err := l.buildLoadBalancerRequest(service, nodes)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to build load-balancer request: %s", err)
}
if !exists {
lbRequest, err := l.buildLoadBalancerRequest(service, nodes)

var lb *godo.LoadBalancer
lb, err = l.retrieveAndAnnotateLoadBalancer(ctx, service)
switch err {
case nil:
// LB existing
lbID := lb.ID
lb, _, err = l.client.LoadBalancers.Update(ctx, lb.ID, lbRequest)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to update load-balancer with ID %s: %s", lbID, err)
}

lb, _, err := l.client.LoadBalancers.Create(ctx, lbRequest)
case errLBNotFound:
// LB missing
lb, _, err = l.client.LoadBalancers.Create(ctx, lbRequest)
if err != nil {
return nil, err
}
l.resources.AddLoadBalancer(*lb)
if lb.Status != lbStatusActive {
return nil, fmt.Errorf("load-balancer not active, currently %s", lb.Status)
return nil, fmt.Errorf("failed to create load-balancer: %s", err)
}

return &v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{
{
IP: lb.IP,
},
},
}, nil
}
err := l.ensureLoadBalancerIDAnnot(service, lb.ID)
if err != nil {
return nil, fmt.Errorf("failed to add load-balancer ID annotation to service %s/%s: %s", service.Namespace, service.Name, err)
}

err = l.UpdateLoadBalancer(ctx, clusterName, service, nodes)
if err != nil {
default:
// unrecoverable LB retrieval error
return nil, err
}

lbStatus, exists, err = l.GetLoadBalancer(ctx, clusterName, service)
if err != nil {
return nil, err
if lb.Status != lbStatusActive {
return nil, fmt.Errorf("load-balancer is not yet active (current status: %s)", lb.Status)
}

return lbStatus, nil
return &v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{
{
IP: lb.IP,
},
},
}, nil
}

// UpdateLoadBalancer updates the load balancer for service to balance across
Expand All @@ -254,20 +261,19 @@ func (l *loadBalancers) EnsureLoadBalancer(ctx context.Context, clusterName stri
func (l *loadBalancers) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
lbRequest, err := l.buildLoadBalancerRequest(service, nodes)
if err != nil {
return err
return fmt.Errorf("failed to build load-balancer request: %s", err)
}

lbName := l.GetLoadBalancerName(ctx, clusterName, service)
lb, found := l.resources.LoadBalancerByName(lbName)
if !found {
return errLBNotFound
lb, err := l.retrieveAndAnnotateLoadBalancer(ctx, service)
if err != nil {
return err
}

lb, _, err = l.client.LoadBalancers.Update(ctx, lb.ID, lbRequest)
_, _, err = l.client.LoadBalancers.Update(ctx, lb.ID, lbRequest)
if err != nil {
return err
return fmt.Errorf("failed to update load-balancer with ID %s: %s", lb.ID, err)
}
l.resources.AddLoadBalancer(*lb)

return nil
}

Expand All @@ -277,25 +283,92 @@ func (l *loadBalancers) UpdateLoadBalancer(ctx context.Context, clusterName stri
//
// EnsureLoadBalancerDeleted will not modify service.
func (l *loadBalancers) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
lbName := l.GetLoadBalancerName(ctx, clusterName, service)
lb, found := l.resources.LoadBalancerByName(lbName)
if !found {
return nil
// Not calling retrieveAndAnnotateLoadBalancer to save a potential PATCH API
// call: the load-balancer is destined to be removed anyway.
lb, err := l.retrieveLoadBalancer(ctx, service)
if err != nil {
if err == errLBNotFound {
return nil
}
return err
}

resp, err := l.client.LoadBalancers.Delete(ctx, lb.ID)
if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound {
l.resources.DeleteLoadBalancer(*lb)
return nil
}
return fmt.Errorf("failed to delete load-balancer: %s", err)
}

l.resources.DeleteLoadBalancer(*lb)
return nil
}

func (l *loadBalancers) retrieveAndAnnotateLoadBalancer(ctx context.Context, service *v1.Service) (*godo.LoadBalancer, error) {
lb, err := l.retrieveLoadBalancer(ctx, service)
if err != nil {
// Return bare error to easily compare for errLBNotFound. Converting to
// a full error type doesn't seem worth it.
return nil, err
}

if err := l.ensureLoadBalancerIDAnnot(service, lb.ID); err != nil {
return nil, fmt.Errorf("failed to add load-balancer ID annotation to service %s/%s: %s", service.Namespace, service.Name, err)
}

return lb, nil
}

func (l *loadBalancers) retrieveLoadBalancer(ctx context.Context, service *v1.Service) (*godo.LoadBalancer, error) {
if id := getLoadBalancerID(service); id != "" {
klog.V(2).Infof("Looking up load-balancer for service %s/%s by ID %s", service.Namespace, service.Name, id)
lb, resp, err := l.client.LoadBalancers.Get(ctx, id)
if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound {
return nil, errLBNotFound
}
return nil, fmt.Errorf("failed to get load-balancer by ID %s: %s", id, err)
}

return lb, nil
}

// Retrieve by exhaustive iteration.
lbName := getDefaultLoadBalancerName(service)
klog.V(2).Infof("Looking up load-balancer for service %s/%s by name %s", service.Namespace, service.Name, lbName)
return l.lbByName(ctx, lbName)
}

func (l *loadBalancers) ensureLoadBalancerIDAnnot(service *v1.Service, lbID string) error {
if val := getLoadBalancerID(service); val != "" {
return nil
}

// Make a copy so we don't mutate the shared informer cache from the cloud
// provider framework.
updated := service.DeepCopy()
updated.ObjectMeta.Annotations[annoDOLoadBalancerID] = lbID

return patchService(l.resources.kclient, service, updated)
}

// lbByName gets a DigitalOcean Load Balancer by name. The returned error will
// be lbNotFound if the load balancer does not exist.
func (l *loadBalancers) lbByName(ctx context.Context, name string) (*godo.LoadBalancer, error) {
lbs, err := allLoadBalancerList(ctx, l.client)
if err != nil {
return nil, err
}

for _, lb := range lbs {
if lb.Name == name {
return &lb, nil
}
}

return nil, errLBNotFound
}

// nodesToDropletID returns a []int containing ids of all droplets identified by name in nodes.
//
// Node names are assumed to match droplet names.
Expand Down Expand Up @@ -766,3 +839,7 @@ func getEnableProxyProtocol(service *v1.Service) (bool, error) {

return enableProxyProtocol, nil
}

func getLoadBalancerID(service *v1.Service) string {
return service.ObjectMeta.Annotations[annoDOLoadBalancerID]
}
Loading