Skip to content

Commit

Permalink
feat: add support for metallb shared addresses
Browse files Browse the repository at this point in the history
Add support for shared IP addresses with MetalLB in the CRD
configuration.

Signed-off-by: Tim Jones <[email protected]>
  • Loading branch information
TimJones committed Nov 7, 2024
1 parent 13e4c7c commit 78005b1
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 36 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,6 @@ If `MetalLB` management is enabled, then CCM does the following.
* If there is no other service, delete all CCM managed `bgpeers` and the default `bgpadvertisement`
* delete the Elastic IP reservation from Equinix Metal

**NOTE:** (IP Address sharing)[https://metallb.universe.tf/usage/#ip-address-sharing] is not yet supported in Cloud Provider Equinix Metal.

CCM itself does **not** install/deploy the load-balancer and it may exists before enable it. This can be deployed by the administrator separately, using the manifest provided in the releases page, or in any other manner. Not having metallb installed but enabled in the CCM configuration will end up allowing you to continue deploying kubernetes services, but the external ip assignment will remain pending, making it useless.

In order to instruct metallb which IPs to announce and from where, CCM takes direct responsibility for managing the
Expand Down
55 changes: 48 additions & 7 deletions metal/loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strconv"
"strings"

"golang.org/x/exp/slices"

"github.com/equinix/cloud-provider-equinix-metal/metal/loadbalancers"
"github.com/equinix/cloud-provider-equinix-metal/metal/loadbalancers/empty"
"github.com/equinix/cloud-provider-equinix-metal/metal/loadbalancers/kubevip"
Expand Down Expand Up @@ -255,16 +257,32 @@ func (l *loadBalancers) EnsureLoadBalancerDeleted(ctx context.Context, clusterNa
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: no IP reservation found for %s, nothing to delete", svcName)
return nil
}
// delete the reservation
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s EIP ID %s", svcName, ipReservation.ID)
if _, err := l.client.ProjectIPs.Remove(ipReservation.ID); err != nil {
return fmt.Errorf("failed to remove IP address reservation %s from project: %w", ipReservation.String(), err)
}
// remove it from any implementation-specific parts
svcIPCidr = fmt.Sprintf("%s/%d", ipReservation.Address, ipReservation.CIDR)
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s entry %s", svcName, svcIPCidr)
if err := l.implementor.RemoveService(ctx, service.Namespace, service.Name, svcIPCidr); err != nil {
return fmt.Errorf("error removing IP from configmap for %s: %w", svcName, err)
if errors.Is(err, metallb.ErrIPStillInUse) {
// IP is still in use by another service, just remove this service tag
klog.V(2).Info("EnsureLoadBalancerDeleted(): remove: not removing IP, still in use")
// TODO: Update go and update to use a simple DeleteFunc:
// tags := slices.DeleteFunc(ipReservation.Tags, func(s string) bool {
// return s == svcTag
// }
idx := slices.Index(ipReservation.Tags, svcTag)
tags := slices.Delete(ipReservation.Tags, idx, idx+1)
update := packngo.IPAddressUpdateRequest{Tags: &tags}
if _, _, err = l.client.ProjectIPs.Update(ipReservation.ID, &update, &packngo.GetOptions{}); err != nil {
return fmt.Errorf("failed to update IP removing old service tag: %w", err)
}
return nil
} else {
return fmt.Errorf("error removing IP from configmap for %s: %w", svcName, err)
}
}
// delete the reservation
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: for %s EIP ID %s", svcName, ipReservation.ID)
if _, err := l.client.ProjectIPs.Remove(ipReservation.ID); err != nil {
return fmt.Errorf("failed to remove IP address reservation %s from project: %w", ipReservation.String(), err)
}
klog.V(2).Infof("EnsureLoadBalancerDeleted(): remove: removed service %s from implementation", svcName)
return nil
Expand Down Expand Up @@ -493,7 +511,30 @@ func (l *loadBalancers) addService(ctx context.Context, svc *v1.Service, ips []p
})
}

return svcIPCidr, l.implementor.AddService(ctx, svc.Namespace, svc.Name, svcIPCidr, n)
if err = l.implementor.AddService(ctx, svc.Namespace, svc.Name, svcIPCidr, n); err != nil {
return svcIPCidr, err
}

if ipReservation == nil {
// Need to ensure the service tag is on the IP for shared IP Services
klog.V(2).Infof("service tag %s not found on IP %s, adding", svcTag, svcIP)
ips, _, err := l.client.ProjectIPs.List(l.project, &packngo.ListOptions{})
if err != nil {
return svcIPCidr, fmt.Errorf("failed to list project IPs: %w", err)
}
for _, ip := range ips {
if ip.Address == svcIP && ip.CIDR == cidr {
tags := append(ip.Tags, svcTag)
update := packngo.IPAddressUpdateRequest{Tags: &tags}
if _, _, err = l.client.ProjectIPs.Update(ip.ID, &update, &packngo.GetOptions{}); err != nil {
return svcIPCidr, fmt.Errorf("failed to update IP with new service tag: %w", err)
}
break
}
}
}

return svcIPCidr, nil
}

func (l *loadBalancers) retrieveIPByTag(ctx context.Context, svc *v1.Service, ips []packngo.IPAddressReservation, tag string) (string, error) {
Expand Down
5 changes: 5 additions & 0 deletions metal/loadbalancers/metallb/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,10 @@ func (m *CMConfigurer) RemoveAddressPoolByAddress(ctx context.Context, addr stri
return nil
}

// RemoveFromAddressPool remove service from a pool by name. If the matching pool is not found, do not change anything
func (m *CMConfigurer) RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error {
return nil
}

// RemoveAddressPool remove a pool by name. If the matching pool does not exist, do not change anything
func (m *CMConfigurer) RemoveAddressPool(ctx context.Context, pool string) error { return nil }
139 changes: 115 additions & 24 deletions metal/loadbalancers/metallb/cr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@ import (
"strings"

metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
defaultBgpAdvertisement = "equinix-metal-bgp-adv"
cpemLabelKey = "cloud-provider"
cpemLabelValue = "equinix-metal"
svcLabelKeyPrefix = "service-"
svcLabelValuePrefix = "namespace-"
defaultBgpAdvertisement = "equinix-metal-bgp-adv"
cpemLabelKey = "cloud-provider"
cpemLabelValue = "equinix-metal"
svcLabelKeyPrefix = "service-"
svcLabelValuePrefix = "namespace-"
svcAnnotationSharedPrefix = "shared-"
metallbAnnotationSharedIP = "metallb.universe.tf/allow-shared-ip" // Not exported as a const from metallb package :(
)

type CRDConfigurer struct {
Expand Down Expand Up @@ -149,36 +153,60 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv

addIPAddr := convertToIPAddr(*add, m.namespace, svcNamespace, svcName)

svc := corev1.Service{}
if err = m.client.Get(ctx, client.ObjectKey{Namespace: svcNamespace, Name: svcName}, &svc); err != nil {
return false, fmt.Errorf("unable to retrieve service: %w", err)
}

// go through the pools and see if we have one that matches
// - if same service name return false
//
// TODO (ocobleseqx)
// - Metallb allows ip address sharing for services, so we need to find a way to share a pool
// EnsureLoadBalancerDeleted filters ips by service tags, so when ip is specified and already exists
// it must be updted to include the new serviceNamespace/service
for _, o := range olds.Items {
var updateLabels, updateAddresses bool
var updateLabels, updateAddresses, updateAnnotations bool
// if same name check services labels
if o.GetName() == addIPAddr.GetName() {
for k := range o.GetLabels() {
if strings.HasPrefix(k, svcLabelKeyPrefix) {
osvc := strings.TrimPrefix(k, svcLabelKeyPrefix)
if osvc == svcName {
// already exists
// if service label and key matches
if o.Labels[serviceLabelKey(svcName)] == serviceLabelValue(svcNamespace) {
// if is shared and service exsits in shared annotation
if k, ok := svc.Annotations[metallbAnnotationSharedIP]; ok {
if containsSharedService(o.Annotations[sharedAnnotationKey(k)], svcNamespace, svcName) {
// already exists, and in shared annotation
return false, nil
} else {
updateAnnotations = true
}
} else {
// already exists, and not shared
return false, nil
}
// if we got here, none matched exactly, update labels
updateLabels = true
}
// if we got here, none matched exactly, update labels
updateLabels = true
}
for _, addr := range addIPAddr.Spec.Addresses {
if slices.Contains(o.Spec.Addresses, addr) {
updateAddresses = true
break

// If we already need to update the annotations, then this is the owning service and it's just adding a shared-ip annotation
if !updateAnnotations {
// Otherwise we need to check that the IP is new or can be shared
for _, addr := range addIPAddr.Spec.Addresses {
if slices.Contains(o.Spec.Addresses, addr) {
// Check the Service is configured to share the IP
sharedIpKey, ok := svc.Annotations[metallbAnnotationSharedIP]
if !ok {
return false, fmt.Errorf("unable to configure IPAddressPool: requested ip %s already in use and no %s annotation found", addr, metallbAnnotationSharedIP)
}

// Check the shared IP key matches the pool annotation
if _, ok := o.Annotations[sharedAnnotationKey(sharedIpKey)]; !ok {
return false, fmt.Errorf("unable to configure IPAddressPool: requested ip %s already in use and %s annotation does not match", addr, metallbAnnotationSharedIP)
}

updateAnnotations = true
updateAddresses = true
break
}
}
}
if updateLabels || updateAddresses {

if updateLabels || updateAddresses || updateAnnotations {
// update pool
patch := client.MergeFrom(o.DeepCopy())
if updateLabels {
Expand All @@ -189,7 +217,19 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv
addresses := append(o.Spec.Addresses, addIPAddr.Spec.Addresses...)
slices.Sort(addresses)
o.Spec.Addresses = slices.Compact(addresses)
o.Spec.Addresses = addresses
}
if updateAnnotations {
sharedIpKey := sharedAnnotationKey(svc.Annotations[metallbAnnotationSharedIP])
if sharedSvcs, ok := o.Annotations[sharedIpKey]; !ok {
// Safer way to set annotations in case the annotation map itself is nil
o.SetAnnotations(map[string]string{sharedIpKey: sharedServiceName(svcNamespace, svcName)})
} else {
sharedSvcs := strings.Split(sharedSvcs, ",")
sharedSvcs = append(sharedSvcs, sharedServiceName(svcNamespace, svcName))
slices.Sort(sharedSvcs)
sharedSvcs = slices.Compact(sharedSvcs)
o.Annotations[sharedIpKey] = strings.Join(sharedSvcs, ",")
}
}
err := m.client.Patch(ctx, &o, patch)
if err != nil {
Expand Down Expand Up @@ -236,6 +276,57 @@ func (m *CRDConfigurer) AddAddressPool(ctx context.Context, add *AddressPool, sv
return true, nil
}

// RemoveFromAddressPool removes a service from a pool by name. If the matching pool is not found, do not change anything
func (m *CRDConfigurer) RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error {
if svcNamespace == "" || svcName == "" {
return nil
}

olds, err := m.listIPAddressPools(ctx)
if err != nil {
return err
}

// go through the pools and see if we have a match
pool := poolName(svcNamespace, svcName)
for _, o := range olds.Items {
if slices.ContainsFunc(maps.Keys(o.GetAnnotations()), func(s string) bool {
return strings.HasPrefix(s, svcAnnotationSharedPrefix) && containsSharedService(o.Annotations[s], svcNamespace, svcName)
}) {
// If there are more services sharing this pool, we only need to remove this service from the annotation
for k, v := range o.GetAnnotations() {
if strings.HasPrefix(k, svcAnnotationSharedPrefix) && containsSharedService(v, svcNamespace, svcName) {
// TODO: Update go and update to use a simple DeleteFunc:
// svcList := slices.DeleteFunc(strings.Split(v, ","), func(s string) bool {
// return s == sharedServiceName(svcNamespace, svcName)
// }
svcList := strings.Split(v, ",")
idx := slices.Index(svcList, sharedServiceName(svcNamespace, svcName))
svcList = slices.Delete(svcList, idx, idx+1)
if len(svcList) == 0 {
// No other shared services with this key
return m.RemoveAddressPool(ctx, o.GetName())
} else {
patch := client.MergeFrom(o.DeepCopy())
delete(o.Labels, serviceLabelKey(svcName))
o.Annotations[k] = strings.Join(svcList, ",")
if m.client.Patch(ctx, &o, patch); err != nil {
return fmt.Errorf("unable to update IPAddressPool %s: %w", o.GetName(), err)
}
// Other Services still use this IP
return ErrIPStillInUse
}

}
}
} else if o.GetName() == pool {
// Not shared, so just delete the pool
return m.RemoveAddressPool(ctx, pool)
}
}
return nil
}

// RemoveAddressPool removes a pool by name. If the matching pool does not exist, do not change anything
func (m *CRDConfigurer) RemoveAddressPool(ctx context.Context, pool string) error {
if pool == "" {
Expand Down
15 changes: 15 additions & 0 deletions metal/loadbalancers/metallb/cr_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"fmt"
"regexp"
"sort"
"strings"
"time"

metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
"golang.org/x/exp/slices"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -24,6 +26,19 @@ func serviceLabelValue(svcNamespace string) string {
return svcLabelValuePrefix + svcNamespace
}

func sharedAnnotationKey(sharedKey string) string {
return svcAnnotationSharedPrefix + sharedKey
}

func sharedServiceName(svcNamespace, svcName string) string {
return fmt.Sprintf("%s.%s", svcNamespace, svcName)
}

func containsSharedService(poolAnnotationValue, svcNamespace, svcName string) bool {
svcList := strings.Split(poolAnnotationValue, ",")
return slices.Contains(svcList, sharedServiceName(svcNamespace, svcName))
}

func convertToIPAddr(addr AddressPool, namespace, svcNamespace, svcName string) metallbv1beta1.IPAddressPool {
ip := metallbv1beta1.IPAddressPool{
Spec: metallbv1beta1.IPAddressPoolSpec{
Expand Down
14 changes: 11 additions & 3 deletions metal/loadbalancers/metallb/metallb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package metallb

import (
"context"
"errors"
"fmt"
"net/url"
"strconv"
"strings"

"github.com/equinix/cloud-provider-equinix-metal/metal/loadbalancers"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -43,6 +45,9 @@ type Configurer interface {
// Returns if anything changed
AddAddressPool(ctx context.Context, add *AddressPool, svcNamespace, svcName string) (bool, error)

// RemoveFromAddressPool remove service from a pool by name. If the matching pool if not found, do not change anything
RemoveFromAddressPool(ctx context.Context, svcNamespace, svcName string) error

// RemoveAddressPool remove a pool by name. If the matching pool does not exist, do not change anything
RemoveAddressPool(ctx context.Context, pool string) error

Expand All @@ -61,6 +66,8 @@ type LB struct {
var (
_ loadbalancers.LB = (*LB)(nil)
crdConfiguration = false

ErrIPStillInUse = errors.New("ip address still in use")
)

// func NewLB(k8sclient kubernetes.Interface, k8sApiextensionsClientset *k8sapiextensionsclient.Clientset, config string) *LB {
Expand Down Expand Up @@ -101,6 +108,7 @@ func NewLB(k8sclient kubernetes.Interface, config string, featureFlags url.Value
if crdConfiguration {
scheme := runtime.NewScheme()
_ = metallbv1beta1.AddToScheme(scheme)
_ = corev1.AddToScheme(scheme)
cl, err := client.New(clientconfig.GetConfigOrDie(), client.Options{Scheme: scheme})
if err != nil {
panic(err)
Expand Down Expand Up @@ -261,9 +269,9 @@ func updateIP(ctx context.Context, config Configurer, addr, svcNamespace, svcNam
return fmt.Errorf("error removing IP: %w", err)
}
} else {
if err := config.RemoveAddressPool(ctx, name); err != nil {
klog.V(2).Infof("error removing IPAddressPool: %v", err)
return fmt.Errorf("error removing IPAddressPool: %w", err)
if err := config.RemoveFromAddressPool(ctx, svcNamespace, svcName); err != nil {
klog.V(2).Infof("error removing from IPAddressPool: %v", err)
return fmt.Errorf("error removing from IPAddressPool: %w", err)
}
}
}
Expand Down

0 comments on commit 78005b1

Please sign in to comment.