Skip to content

Commit

Permalink
Merge pull request #694 from feiskyer/cherry-1
Browse files Browse the repository at this point in the history
cherry-pick of #691: add support for additional public IPs via service annotations
  • Loading branch information
k8s-ci-robot authored Jul 5, 2021
2 parents 6187cf8 + 94361aa commit a72ebf2
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 36 deletions.
5 changes: 5 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ const (
// is `a=b,c=d,...`. After updated, the old user-assigned tags would not be replaced by the new ones.
ServiceAnnotationAzurePIPTags = "service.beta.kubernetes.io/azure-pip-tags"

// ServiceAnnotationAzurePIPTags sets the additional Public IPs (split by comma) besides the service's Public IP configured on LoadBalancer.
// These additional Public IPs would be consumed by kube-proxy to configure the iptables rules on each node. Note they would not be configured
// automatically on Azure LoadBalancer. Instead, they need to be configured manually (e.g. on Azure cross-region LoadBalancer by another operator).
ServiceAnnotationAdditionalPublicIPs = "service.beta.kubernetes.io/azure-additional-public-ips"

// ServiceTagKey is the service key applied for public IP tags.
ServiceTagKey = "service"
// ClusterNameKey is the cluster name key applied for public IP tags.
Expand Down
113 changes: 78 additions & 35 deletions pkg/provider/azure_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,22 @@ func (az *Cloud) getServiceLoadBalancerStatus(service *v1.Service, lb *network.L
}

klog.V(2).Infof("getServiceLoadBalancerStatus gets ingress IP %q from frontendIPConfiguration %q for service %q", to.String(lbIP), to.String(ipConfiguration.Name), serviceName)
return &v1.LoadBalancerStatus{Ingress: []v1.LoadBalancerIngress{{IP: to.String(lbIP)}}}, &ipConfiguration, nil

// set additional public IPs to LoadBalancerStatus, so that kube-proxy would create their iptables rules.
lbIngress := []v1.LoadBalancerIngress{{IP: to.String(lbIP)}}
additionalIPs, err := getServiceAdditionalPublicIPs(service)
if err != nil {
return &v1.LoadBalancerStatus{Ingress: lbIngress}, &ipConfiguration, err
}
if len(additionalIPs) > 0 {
for _, pip := range additionalIPs {
lbIngress = append(lbIngress, v1.LoadBalancerIngress{
IP: pip,
})
}
}

return &v1.LoadBalancerStatus{Ingress: lbIngress}, &ipConfiguration, nil
}
}

Expand Down Expand Up @@ -2170,6 +2185,16 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
destinationIPAddress = "*"
}

additionalIPs, err := getServiceAdditionalPublicIPs(service)
if err != nil {
return nil, fmt.Errorf("unable to get additional public IPs, error=%v", err)
}

destinationIPAddresses := []string{destinationIPAddress}
if destinationIPAddress != "*" {
destinationIPAddresses = append(destinationIPAddresses, additionalIPs...)
}

sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(service)
if err != nil {
return nil, err
Expand All @@ -2191,13 +2216,13 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
sourceAddressPrefixes = append(sourceAddressPrefixes, serviceTags...)
}

expectedSecurityRules, err := az.getExpectedSecurityRules(wantLb, ports, sourceAddressPrefixes, service, destinationIPAddress, sourceRanges)
expectedSecurityRules, err := az.getExpectedSecurityRules(wantLb, ports, sourceAddressPrefixes, service, destinationIPAddresses, sourceRanges)
if err != nil {
return nil, err
}

// update security rules
dirtySg, updatedRules, err := az.reconcileSecurityRules(sg, service, serviceName, wantLb, expectedSecurityRules, ports, sourceAddressPrefixes, destinationIPAddress)
dirtySg, updatedRules, err := az.reconcileSecurityRules(sg, service, serviceName, wantLb, expectedSecurityRules, ports, sourceAddressPrefixes, destinationIPAddresses)
if err != nil {
return nil, err
}
Expand All @@ -2222,7 +2247,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
return &sg, nil
}

func (az *Cloud) reconcileSecurityRules(sg network.SecurityGroup, service *v1.Service, serviceName string, wantLb bool, expectedSecurityRules []network.SecurityRule, ports []v1.ServicePort, sourceAddressPrefixes []string, destinationIPAddress string) (bool, []network.SecurityRule, error) {
func (az *Cloud) reconcileSecurityRules(sg network.SecurityGroup, service *v1.Service, serviceName string, wantLb bool, expectedSecurityRules []network.SecurityRule, ports []v1.ServicePort, sourceAddressPrefixes []string, destinationIPAddresses []string) (bool, []network.SecurityRule, error) {
dirtySg := false
var updatedRules []network.SecurityRule
if sg.SecurityGroupPropertiesFormat != nil && sg.SecurityGroupPropertiesFormat.SecurityRules != nil {
Expand Down Expand Up @@ -2268,19 +2293,22 @@ func (az *Cloud) reconcileSecurityRules(sg network.SecurityGroup, service *v1.Se
return false, nil, fmt.Errorf("expected to have array of destinations in shared rule for service %s being deleted, but did not", service.Name)
}
existingPrefixes := *sharedRule.DestinationAddressPrefixes
addressIndex, found := findIndex(existingPrefixes, destinationIPAddress)
if !found {
klog.V(4).Infof("Expected to find destination address %s in shared rule %s for service %s being deleted, but did not", destinationIPAddress, sharedRuleName, service.Name)
return false, nil, fmt.Errorf("expected to find destination address %s in shared rule %s for service %s being deleted, but did not", destinationIPAddress, sharedRuleName, service.Name)
}
if len(existingPrefixes) == 1 {
updatedRules = append(updatedRules[:sharedIndex], updatedRules[sharedIndex+1:]...)
} else {
newDestinations := append(existingPrefixes[:addressIndex], existingPrefixes[addressIndex+1:]...)
sharedRule.DestinationAddressPrefixes = &newDestinations
updatedRules[sharedIndex] = sharedRule
for _, destinationIPAddress := range destinationIPAddresses {
addressIndex, found := findIndex(existingPrefixes, destinationIPAddress)
if !found {
klog.Warningf("Expected to find destination address %v in shared rule %s for service %s being deleted, but did not", destinationIPAddress, sharedRuleName, service.Name)
continue
}
if len(existingPrefixes) == 1 {
updatedRules = append(updatedRules[:sharedIndex], updatedRules[sharedIndex+1:]...)
} else {
newDestinations := append(existingPrefixes[:addressIndex], existingPrefixes[addressIndex+1:]...)
sharedRule.DestinationAddressPrefixes = &newDestinations
updatedRules[sharedIndex] = sharedRule
}
dirtySg = true
}
dirtySg = true

}
}
}
Expand Down Expand Up @@ -2328,7 +2356,7 @@ func (az *Cloud) reconcileSecurityRules(sg network.SecurityGroup, service *v1.Se
return dirtySg, updatedRules, nil
}

func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, sourceAddressPrefixes []string, service *v1.Service, destinationIPAddress string, sourceRanges utilnet.IPNetSet) ([]network.SecurityRule, error) {
func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, sourceAddressPrefixes []string, service *v1.Service, destinationIPAddresses []string, sourceRanges utilnet.IPNetSet) ([]network.SecurityRule, error) {
expectedSecurityRules := []network.SecurityRule{}

if wantLb {
Expand All @@ -2342,18 +2370,24 @@ func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, s
for j := range sourceAddressPrefixes {
ix := i*len(sourceAddressPrefixes) + j
securityRuleName := az.getSecurityRuleName(service, port, sourceAddressPrefixes[j])
expectedSecurityRules[ix] = network.SecurityRule{
nsgRule := network.SecurityRule{
Name: to.StringPtr(securityRuleName),
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
Protocol: *securityProto,
SourcePortRange: to.StringPtr("*"),
DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.Port))),
SourceAddressPrefix: to.StringPtr(sourceAddressPrefixes[j]),
DestinationAddressPrefix: to.StringPtr(destinationIPAddress),
Access: network.SecurityRuleAccessAllow,
Direction: network.SecurityRuleDirectionInbound,
Protocol: *securityProto,
SourcePortRange: to.StringPtr("*"),
DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.Port))),
SourceAddressPrefix: to.StringPtr(sourceAddressPrefixes[j]),
Access: network.SecurityRuleAccessAllow,
Direction: network.SecurityRuleDirectionInbound,
},
}
if len(destinationIPAddresses) == 1 {
// continue to use DestinationAddressPrefix to avoid NSG updates for existing rules.
nsgRule.DestinationAddressPrefix = to.StringPtr(destinationIPAddresses[0])
} else {
nsgRule.DestinationAddressPrefixes = to.StringSlicePtr(destinationIPAddresses)
}
expectedSecurityRules[ix] = nsgRule
}
}

Expand All @@ -2370,24 +2404,30 @@ func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, s
return nil, err
}
securityRuleName := az.getSecurityRuleName(service, port, "deny_all")
expectedSecurityRules = append(expectedSecurityRules, network.SecurityRule{
nsgRule := network.SecurityRule{
Name: to.StringPtr(securityRuleName),
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
Protocol: *securityProto,
SourcePortRange: to.StringPtr("*"),
DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.Port))),
SourceAddressPrefix: to.StringPtr("*"),
DestinationAddressPrefix: to.StringPtr(destinationIPAddress),
Access: network.SecurityRuleAccessDeny,
Direction: network.SecurityRuleDirectionInbound,
Protocol: *securityProto,
SourcePortRange: to.StringPtr("*"),
DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.Port))),
SourceAddressPrefix: to.StringPtr("*"),
Access: network.SecurityRuleAccessDeny,
Direction: network.SecurityRuleDirectionInbound,
},
})
}
if len(destinationIPAddresses) == 1 {
// continue to use DestinationAddressPrefix to avoid NSG updates for existing rules.
nsgRule.DestinationAddressPrefix = to.StringPtr(destinationIPAddresses[0])
} else {
nsgRule.DestinationAddressPrefixes = to.StringSlicePtr(destinationIPAddresses)
}
expectedSecurityRules = append(expectedSecurityRules, nsgRule)
}
}
}

for _, r := range expectedSecurityRules {
klog.V(10).Infof("Expecting security rule for %s: %s:%s -> %s:%s", service.Name, *r.SourceAddressPrefix, *r.SourcePortRange, *r.DestinationAddressPrefix, *r.DestinationPortRange)
klog.V(10).Infof("Expecting security rule for %s: %s:%s -> %v %v :%s", service.Name, to.String(r.SourceAddressPrefix), to.String(r.SourcePortRange), to.String(r.DestinationAddressPrefix), to.StringSlice(r.DestinationAddressPrefixes), to.String(r.DestinationPortRange))
}
return expectedSecurityRules, nil
}
Expand Down Expand Up @@ -2895,6 +2935,9 @@ func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) b
if !strings.EqualFold(to.String(existingRule.DestinationAddressPrefix), to.String(rule.DestinationAddressPrefix)) {
continue
}
if !reflect.DeepEqual(to.StringSlice(existingRule.DestinationAddressPrefixes), to.StringSlice(rule.DestinationAddressPrefixes)) {
continue
}
}
if existingRule.Access != rule.Access {
continue
Expand Down
32 changes: 31 additions & 1 deletion pkg/provider/azure_loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2625,6 +2625,36 @@ func TestReconcileSecurityGroup(t *testing.T) {
},
},
},
{
desc: "reconcileSecurityGroup shall create sgs with correct destinationPrefix with additional public IPs",
service: getTestService("test1", v1.ProtocolTCP, map[string]string{consts.ServiceAnnotationAdditionalPublicIPs: "2.3.4.5"}, true, 80),
existingSgs: map[string]network.SecurityGroup{"nsg": {
Name: to.StringPtr("nsg"),
SecurityGroupPropertiesFormat: &network.SecurityGroupPropertiesFormat{},
}},
lbIP: to.StringPtr("1.2.3.4"),
wantLb: true,
expectedSg: &network.SecurityGroup{
Name: to.StringPtr("nsg"),
SecurityGroupPropertiesFormat: &network.SecurityGroupPropertiesFormat{
SecurityRules: &[]network.SecurityRule{
{
Name: to.StringPtr("atest1-TCP-80-Internet"),
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
Protocol: network.SecurityRuleProtocol("Tcp"),
SourcePortRange: to.StringPtr("*"),
DestinationPortRange: to.StringPtr("80"),
SourceAddressPrefix: to.StringPtr("Internet"),
DestinationAddressPrefixes: to.StringSlicePtr([]string{"1.2.3.4", "2.3.4.5"}),
Access: network.SecurityRuleAccess("Allow"),
Priority: to.Int32Ptr(500),
Direction: network.SecurityRuleDirection("Inbound"),
},
},
},
},
},
},
{
desc: "reconcileSecurityGroup shall not create unwanted security rules if there is service tags",
service: getTestService("test1", v1.ProtocolTCP, map[string]string{consts.ServiceAnnotationAllowedServiceTag: "tag"}, true, 80),
Expand All @@ -2639,7 +2669,7 @@ func TestReconcileSecurityGroup(t *testing.T) {
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
SourceAddressPrefix: to.StringPtr("prefix"),
SourcePortRange: to.StringPtr("range"),
DestinationAddressPrefix: to.StringPtr("desPrefix"),
DestinationAddressPrefix: to.StringPtr("destPrefix"),
DestinationPortRange: to.StringPtr("desRange"),
},
},
Expand Down
28 changes: 28 additions & 0 deletions pkg/provider/azure_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package provider

import (
"context"
"fmt"
"net"
"strings"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/Azure/go-autorest/autorest/to"
Expand Down Expand Up @@ -174,3 +177,28 @@ func (az *Cloud) getVMSetNamesSharingPrimarySLB() sets.String {

return sets.NewString(vmSetNames...)
}

func getServiceAdditionalPublicIPs(service *v1.Service) ([]string, error) {
if service == nil {
return nil, nil
}

result := []string{}
if val, ok := service.Annotations[consts.ServiceAnnotationAdditionalPublicIPs]; ok {
pips := strings.Split(strings.TrimSpace(val), ",")
for _, pip := range pips {
ip := strings.TrimSpace(pip)
if ip == "" {
continue // skip empty string
}

if net.ParseIP(ip) == nil {
return nil, fmt.Errorf("%s is not a valid IP address", ip)
}

result = append(result, ip)
}
}

return result, nil
}
76 changes: 76 additions & 0 deletions pkg/provider/azure_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ limitations under the License.
package provider

import (
"fmt"
"testing"
"time"

"github.com/Azure/go-autorest/autorest/to"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/cloud-provider-azure/pkg/consts"
)

func TestSimpleLockEntry(t *testing.T) {
Expand Down Expand Up @@ -179,3 +183,75 @@ func TestReconcileTags(t *testing.T) {
})
}
}

func TestGetServiceAdditionalPublicIPs(t *testing.T) {
for _, testCase := range []struct {
description string
service *v1.Service
expectedIPs []string
expectedError error
}{
{
description: "nil service should return empty IP list",
},
{
description: "service without annotation should return empty IP list",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
},
},
expectedIPs: []string{},
},
{
description: "service without annotation should return empty IP list",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
consts.ServiceAnnotationAdditionalPublicIPs: "",
},
},
},
expectedIPs: []string{},
},
{
description: "service with one IP in annotation should return expected IPs",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
consts.ServiceAnnotationAdditionalPublicIPs: "1.2.3.4 ",
},
},
},
expectedIPs: []string{"1.2.3.4"},
},
{
description: "service with multiple IPs in annotation should return expected IPs",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
consts.ServiceAnnotationAdditionalPublicIPs: "1.2.3.4, 2.3.4.5 ",
},
},
},
expectedIPs: []string{"1.2.3.4", "2.3.4.5"},
},
{
description: "service with wrong IP in annotation should report an error",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
consts.ServiceAnnotationAdditionalPublicIPs: "invalid",
},
},
},
expectedError: fmt.Errorf("invalid is not a valid IP address"),
},
} {
t.Run(testCase.description, func(t *testing.T) {
ips, err := getServiceAdditionalPublicIPs(testCase.service)
assert.Equal(t, testCase.expectedIPs, ips)
assert.Equal(t, testCase.expectedError, err)
})
}
}

0 comments on commit a72ebf2

Please sign in to comment.