Skip to content

Commit

Permalink
feat: update atm endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhiying Lin committed Nov 13, 2024
1 parent 0cb38e4 commit fbd0e00
Show file tree
Hide file tree
Showing 10 changed files with 943 additions and 151 deletions.
179 changes: 170 additions & 9 deletions pkg/controllers/hub/trafficmanagerbackend/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,10 +52,11 @@ const (
AzureResourceEndpointNamePrefix = "fleet-%s#"

// AzureResourceEndpointNameFormat is the name format of the Azure Traffic Manager Endpoint created by the fleet controller.
// The naming convention of a Traffic Manager Endpoint is fleet-{TrafficManagerBackendUUID}#{ServiceImportName}#{ClusterName}.
// The naming convention of a Traffic Manager Endpoint is {AzureResourceEndpointNamePrefix}{ServiceImportName}#{ClusterName}.
// which is fleet-{TrafficManagerBackendUUID}#{ServiceImportName}#{ClusterName}.
// All the object name length should be restricted to <= 63 characters.
// The endpoint name must contain no more than 260 characters, excluding the following characters "< > * % $ : \ ? + /".
AzureResourceEndpointNameFormat = AzureResourceEndpointNamePrefix + "%s#%s"
AzureResourceEndpointNameFormat = "%s%s#%s"
)

var (
Expand Down Expand Up @@ -181,7 +184,7 @@ func (r *Reconciler) cleanupEndpoints(ctx context.Context, backend *fleetnetv1al
endpoint := atmProfile.Properties.Endpoints[i]
if endpoint.Name == nil {
err := controller.NewUnexpectedBehaviorError(errors.New("azure Traffic Manager endpoint name is nil"))
klog.ErrorS(err, "Invalid Traffic Manager endpoint", "azureEndpoint", endpoint)
klog.ErrorS(err, "Invalid Traffic Manager endpoint", "atmEndpoint", endpoint)
continue
}
// Traffic manager endpoint name is case-insensitive.
Expand All @@ -191,21 +194,21 @@ func (r *Reconciler) cleanupEndpoints(ctx context.Context, backend *fleetnetv1al
errs.Go(func() error {
if _, err := r.EndpointsClient.Delete(cctx, r.ResourceGroupName, atmProfileName, armtrafficmanager.EndpointTypeAzureEndpoints, *endpoint.Name, nil); err != nil {
if azureerrors.IsNotFound(err) {
klog.V(2).InfoS("Ignoring NotFound Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "azureEndpointName", *endpoint.Name)
klog.V(2).InfoS("Ignoring NotFound Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "atmEndpoint", *endpoint.Name)
return nil
}
klog.ErrorS(err, "Failed to delete the endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "azureEndpointName", *endpoint.Name)
klog.ErrorS(err, "Failed to delete the endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "atmEndpoint", *endpoint.Name)
return err
}
klog.V(2).InfoS("Deleted Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "azureEndpointName", *endpoint.Name)
klog.V(2).InfoS("Deleted Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "atmEndpoint", *endpoint.Name)
return nil
})
}
return errs.Wait()
}

func isEndpointOwnedByBackend(backend *fleetnetv1alpha1.TrafficManagerBackend, endpoint string) bool {
return strings.HasPrefix(strings.ToLower(endpoint), generateAzureTrafficManagerEndpointNamePrefixFunc(backend))
return strings.HasPrefix(endpoint, generateAzureTrafficManagerEndpointNamePrefixFunc(backend))
}

func (r *Reconciler) handleUpdate(ctx context.Context, backend *fleetnetv1alpha1.TrafficManagerBackend) (ctrl.Result, error) {
Expand Down Expand Up @@ -240,10 +243,33 @@ func (r *Reconciler) handleUpdate(ctx context.Context, backend *fleetnetv1alpha1
klog.V(2).InfoS("Found the serviceImport", "trafficManagerBackend", backendKObj, "serviceImport", klog.KObj(serviceImport), "clusters", serviceImport.Status.Clusters)

desiredEndpointsMaps, invalidServicesMaps, err := r.validateExportedServiceForServiceImport(ctx, backend, serviceImport)
if err != nil {
if err != nil || (desiredEndpointsMaps == nil && invalidServicesMaps == nil) {
// We don't need to requeue not found internalServiceExport(err == nil and desiredEndpointsMaps == nil && invalidServicesMaps)
// as when the serviceImport is updated, the controller will be re-triggered again.
// The controller will retry when err is not nil.
return ctrl.Result{}, err
}
klog.V(2).InfoS("Found the exported services behind the serviceImport", "trafficManagerBackend", backendKObj, "serviceImport", klog.KObj(serviceImport), "numberOfDesiredEndpoints", len(desiredEndpointsMaps), "numberOfInvalidServices", len(invalidServicesMaps))

acceptedEndpoints, err := r.updateTrafficManagerEndpoints(ctx, backend, atmProfile, desiredEndpointsMaps)
if err != nil {
return ctrl.Result{}, err
}
if len(invalidServicesMaps) > 0 {
for clusterID, invalidServiceErr := range invalidServicesMaps {
message := fmt.Sprintf("%v service(s) exported from clusters cannot be exposed as the Azure Traffic Manager, for example, service exported from %v is invalid: %v", len(invalidServicesMaps), clusterID, invalidServiceErr)
setFalseCondition(backend, acceptedEndpoints, message)
// Here we only populate the message with the first invalid exported service.
// Note, the loop of the invalidServicesMaps is not deterministic.
break
}
} else {
setTrueCondition(backend, acceptedEndpoints)
}
klog.V(2).InfoS("Updated Traffic Manager endpoints for the serviceImport and updating the condition", "trafficManagerBackend", backendKObj, "status", backend.Status)
if err := r.updateTrafficManagerBackendStatus(ctx, backend); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -367,6 +393,18 @@ func setUnknownCondition(backend *fleetnetv1alpha1.TrafficManagerBackend, messag
meta.SetStatusCondition(&backend.Status.Conditions, cond)
}

func setTrueCondition(backend *fleetnetv1alpha1.TrafficManagerBackend, acceptedEndpoints []fleetnetv1alpha1.TrafficManagerEndpointStatus) {
cond := metav1.Condition{
Type: string(fleetnetv1alpha1.TrafficManagerBackendConditionAccepted),
Status: metav1.ConditionTrue,
ObservedGeneration: backend.Generation,
Reason: string(fleetnetv1alpha1.TrafficManagerBackendReasonAccepted),
Message: fmt.Sprintf("%v service(s) exported from clusters have been accepted as Traffic Manager endpoints", len(acceptedEndpoints)),
}
backend.Status.Endpoints = acceptedEndpoints
meta.SetStatusCondition(&backend.Status.Conditions, cond)
}

func (r *Reconciler) updateTrafficManagerBackendStatus(ctx context.Context, backend *fleetnetv1alpha1.TrafficManagerBackend) error {
backendKObj := klog.KObj(backend)
if err := r.Client.Status().Update(ctx, backend); err != nil {
Expand Down Expand Up @@ -460,7 +498,7 @@ func isValidTrafficManagerEndpoint(export *fleetnetv1alpha1.InternalServiceExpor
}

func generateAzureTrafficManagerEndpoint(backend *fleetnetv1alpha1.TrafficManagerBackend, service *fleetnetv1alpha1.InternalServiceExport) armtrafficmanager.Endpoint {
endpointName := fmt.Sprintf(AzureResourceEndpointNameFormat, backend.UID, backend.Spec.Backend, service.Spec.ServiceReference.ClusterID)
endpointName := fmt.Sprintf(AzureResourceEndpointNameFormat, generateAzureTrafficManagerEndpointNamePrefixFunc(backend), backend.Spec.Backend.Name, service.Spec.ServiceReference.ClusterID)
return armtrafficmanager.Endpoint{
Name: &endpointName,
Type: ptr.To(string(armtrafficmanager.EndpointTypeAzureEndpoints)),
Expand All @@ -471,6 +509,129 @@ func generateAzureTrafficManagerEndpoint(backend *fleetnetv1alpha1.TrafficManage
}
}

func buildAcceptedEndpointStatus(endpoint *armtrafficmanager.Endpoint, cluster *fleetnetv1alpha1.ClusterStatus) fleetnetv1alpha1.TrafficManagerEndpointStatus {
return fleetnetv1alpha1.TrafficManagerEndpointStatus{
Name: strings.ToLower(*endpoint.Name), // name is case-insensitive
Target: endpoint.Properties.Target,
Weight: endpoint.Properties.Weight,
Cluster: cluster,
}
}

func (r *Reconciler) createOrUpdateTrafficManagerEndpoint(ctx context.Context, backend *fleetnetv1alpha1.TrafficManagerBackend, profile *armtrafficmanager.Profile, endpoint *armtrafficmanager.Endpoint) (*armtrafficmanager.Endpoint, error) {
backendKObj := klog.KObj(backend)
var responseError *azcore.ResponseError
res, updateErr := r.EndpointsClient.CreateOrUpdate(ctx, r.ResourceGroupName, *profile.Name, armtrafficmanager.EndpointTypeAzureEndpoints, *endpoint.Name, *endpoint, nil)
if updateErr != nil {
if !errors.As(updateErr, &responseError) {
klog.ErrorS(updateErr, "Failed to send the createOrUpdate request", "trafficManagerBackend", backendKObj, "atmProfile", *profile.Name, "atmEndpoint", *endpoint.Name)
return nil, updateErr
}
var cond metav1.Condition
if azureerrors.IsClientError(updateErr) && !azureerrors.IsThrottled(updateErr) {
cond = metav1.Condition{
Type: string(fleetnetv1alpha1.TrafficManagerBackendReasonInvalid),
Status: metav1.ConditionFalse,
ObservedGeneration: backend.Generation,
Reason: string(fleetnetv1alpha1.TrafficManagerProfileReasonInvalid),
Message: fmt.Sprintf("Invalid Traffic Manager endpoint: %v", updateErr),
}
meta.SetStatusCondition(&backend.Status.Conditions, cond)
return nil, r.updateTrafficManagerBackendStatus(ctx, backend) // requeue won't help until the exported services are updated
} else if updateErr != nil {
setUnknownCondition(backend, fmt.Sprintf("Failed to create or update %q for %q: %v", *endpoint.Name, *profile.Name, updateErr))
if err := r.updateTrafficManagerBackendStatus(ctx, backend); err != nil {
return nil, err
}
return nil, updateErr
}
}
klog.V(2).InfoS("Created or updated Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", *endpoint.Name)
return &res.Endpoint, nil
}

// compareAzureTrafficManagerEndpoint compares only few fields of the current and desired Azure Traffic Manager endpoints
// by ignoring others.
// The desired endpoint is built by the controllers and all the required fields should not be nil.
func compareAzureTrafficManagerEndpoint(current, desired armtrafficmanager.Endpoint) bool {
if current.Type == nil || *current.Type != *desired.Type {
return false
}
if current.Properties == nil || current.Properties.TargetResourceID == nil || current.Properties.Weight == nil || current.Properties.EndpointStatus == nil {
return false
}
return *current.Properties.TargetResourceID == *desired.Properties.TargetResourceID &&
*current.Properties.Weight == *desired.Properties.Weight &&
*current.Properties.EndpointStatus == *desired.Properties.EndpointStatus
}

func (r *Reconciler) updateTrafficManagerEndpoints(ctx context.Context, backend *fleetnetv1alpha1.TrafficManagerBackend, profile *armtrafficmanager.Profile, desiredEndpoints map[string]desiredEndpoint) ([]fleetnetv1alpha1.TrafficManagerEndpointStatus, error) {
backendKObj := klog.KObj(backend)
acceptedEndpoints := make([]fleetnetv1alpha1.TrafficManagerEndpointStatus, 0, len(desiredEndpoints))
for _, endpoint := range profile.Properties.Endpoints {
if endpoint.Name == nil {
err := controller.NewUnexpectedBehaviorError(errors.New("azure Traffic Manager endpoint name is nil"))
klog.ErrorS(err, "Invalid Traffic Manager endpoint", "atmEndpoint", endpoint)
continue
}

endpointName := strings.ToLower(*endpoint.Name) // resource name are case-insensitive
if !isEndpointOwnedByBackend(backend, endpointName) {
continue // skipping the endpoint which is owned by this backend
}

desired, ok := desiredEndpoints[endpointName]
if !ok {
klog.V(2).InfoS("Deleting the Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
if _, deleteErr := r.EndpointsClient.Delete(ctx, r.ResourceGroupName, *profile.Name, armtrafficmanager.EndpointTypeAzureEndpoints, *endpoint.Name, nil); deleteErr != nil {
if azureerrors.IsNotFound(deleteErr) {
klog.V(2).InfoS("Ignoring NotFound Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
continue
}
klog.ErrorS(deleteErr, "Failed to delete the Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
setUnknownCondition(backend, fmt.Sprintf("Failed to cleanup the existing %q for %q: %v", endpointName, *profile.Name, deleteErr))
if err := r.updateTrafficManagerBackendStatus(ctx, backend); err != nil {
return nil, err
}
return nil, deleteErr
}
klog.V(2).InfoS("Deleted the Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
continue
}
if compareAzureTrafficManagerEndpoint(*endpoint, desired.Endpoint) {
klog.V(2).InfoS("Skipping updating the existing Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
delete(desiredEndpoints, endpointName) // no need to update the existing endpoint
acceptedEndpoints = append(acceptedEndpoints, buildAcceptedEndpointStatus(endpoint, &desired.Cluster))
continue
}
endpoint.Type = desired.Endpoint.Type
if endpoint.Properties == nil {
endpoint.Properties = desired.Endpoint.Properties
} else {
endpoint.Properties.TargetResourceID = desired.Endpoint.Properties.TargetResourceID
endpoint.Properties.Weight = desired.Endpoint.Properties.Weight
endpoint.Properties.EndpointStatus = desired.Endpoint.Properties.EndpointStatus
}
klog.V(2).InfoS("Updating the existing Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpoint)
updatedEndpoint, err := r.createOrUpdateTrafficManagerEndpoint(ctx, backend, profile, endpoint)
if err != nil {
return nil, err
}
delete(desiredEndpoints, endpointName)
acceptedEndpoints = append(acceptedEndpoints, buildAcceptedEndpointStatus(updatedEndpoint, &desired.Cluster))
}
// The remaining endpoints in the desiredEndpoints should be created.
for _, endpoint := range desiredEndpoints {
klog.V(2).InfoS("Creating new Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpoint)
updatedEndpoint, err := r.createOrUpdateTrafficManagerEndpoint(ctx, backend, profile, &endpoint.Endpoint)
if err != nil {
return nil, err
}
acceptedEndpoints = append(acceptedEndpoints, buildAcceptedEndpointStatus(updatedEndpoint, &endpoint.Cluster))
}
return acceptedEndpoints, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// set up an index for efficient trafficManagerBackend lookup
Expand Down
Loading

0 comments on commit fbd0e00

Please sign in to comment.