Skip to content

HIVE-2849: Migrate AWS SDK to v2 #2695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
11 changes: 6 additions & 5 deletions contrib/pkg/awsprivatelink/enable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"os/user"
"path/filepath"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"

configv1 "github.com/openshift/api/config/v1"
hivev1 "github.com/openshift/hive/apis/hive/v1"
Expand Down Expand Up @@ -119,10 +120,10 @@ func (o *enableOptions) Run(cmd *cobra.Command, args []string) error {
// Get active cluster's VPC, filtering by infra-id
targetTagKey := "kubernetes.io/cluster/" + o.infraId
describeVPCsOutput, err := o.awsClients.DescribeVpcs(&ec2.DescribeVpcsInput{
Filters: []*ec2.Filter{
Filters: []ec2types.Filter{
{
Name: aws.String("tag-key"),
Values: []*string{aws.String(targetTagKey)},
Values: []string{targetTagKey},
},
},
})
Expand All @@ -135,7 +136,7 @@ func (o *enableOptions) Run(cmd *cobra.Command, args []string) error {
if len(describeVPCsOutput.Vpcs) > 1 {
log.Fatalf("Multiple VPCs found with tag key %s, cannot determine VPC of the active cluster", targetTagKey)
}
vpcID := *describeVPCsOutput.Vpcs[0].VpcId
vpcID := aws.ToString(describeVPCsOutput.Vpcs[0].VpcId)
log.Debugf("Found VPC ID = %v for the active cluster", vpcID)

hiveNS := operatorutils.GetHiveNamespace(hiveConfig)
Expand Down
90 changes: 44 additions & 46 deletions contrib/pkg/awsprivatelink/endpointvpc/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"reflect"
"sort"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"

hivev1 "github.com/openshift/hive/apis/hive/v1"
"github.com/openshift/hive/contrib/pkg/awsprivatelink/common"
Expand Down Expand Up @@ -108,20 +108,20 @@ func (o *endpointVPCAddOptions) Complete(cmd *cobra.Command, args []string) erro
func (o *endpointVPCAddOptions) Validate(cmd *cobra.Command, args []string) error {
// Check if the endpoint VPC exists
if _, err := o.endpointVpcClients.DescribeVpcs(&ec2.DescribeVpcsInput{
VpcIds: []*string{aws.String(o.endpointVpcId)},
VpcIds: []string{o.endpointVpcId},
}); err != nil {
log.WithError(err).Fatal("Failed to describe endpoint VPC")
}

// Check if the endpoint subnets belong to the endpoint VPC
err := o.endpointVpcClients.DescribeSubnetsPages(
&ec2.DescribeSubnetsInput{
SubnetIds: aws.StringSlice(o.endpointSubnetIds),
SubnetIds: o.endpointSubnetIds,
},
func(page *ec2.DescribeSubnetsOutput, lastPage bool) bool {
for _, subnet := range page.Subnets {
if *subnet.VpcId != o.endpointVpcId {
log.Fatalf("Subnet %v does not belong to the endpoint VPC", *subnet.SubnetId)
if aws.ToString(subnet.VpcId) != o.endpointVpcId {
log.WithField("subnet", aws.ToString(subnet.SubnetId)).Fatal("Subnet does not belong to the endpoint VPC")
}
}
return !lastPage
Expand All @@ -136,7 +136,7 @@ func (o *endpointVPCAddOptions) Validate(cmd *cobra.Command, args []string) erro

func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
// Get default SG of the endpoint VPC
endpointVPCDefaultSG, err := awsutils.GetDefaultSGOfVpc(o.endpointVpcClients, aws.String(o.endpointVpcId))
endpointVPCDefaultSG, err := awsutils.GetDefaultSGOfVpc(o.endpointVpcClients, o.endpointVpcId)
if err != nil {
log.WithError(err).Fatal("Failed to get default SG of the endpoint VPC")
}
Expand All @@ -163,36 +163,36 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
vpcPeeringConnectionId := acceptVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId
associatedVpcCIDR := acceptVpcPeeringConnectionOutput.VpcPeeringConnection.RequesterVpcInfo.CidrBlock
endpointVpcCIDR := acceptVpcPeeringConnectionOutput.VpcPeeringConnection.AccepterVpcInfo.CidrBlock
log.Debugf("Found associated VPC CIDR = %v, endpoint VPC CIDR = %v", *associatedVpcCIDR, *endpointVpcCIDR)
log.WithFields(log.Fields{
"associated": aws.ToString(associatedVpcCIDR),
"endpoint": aws.ToString(endpointVpcCIDR),
}).Debug("Found associated CIDRs")

// Update route tables
log.Info("Adding route to private route tables of the associated VPC")
if err = addRouteToRouteTables(
associatedVpcClients,
aws.String(associatedVpcId),
associatedVpcId,
endpointVpcCIDR,
vpcPeeringConnectionId,
&ec2.Filter{Name: aws.String("tag:Name"), Values: []*string{aws.String("*private*")}},
ec2types.Filter{Name: aws.String("tag:Name"), Values: []string{"*private*"}},
); err != nil {
log.WithError(err).Fatal("Failed to add route to private route tables of the associated VPC")
}

log.Info("Adding route to route tables of the endpoint subnets")
if err = addRouteToRouteTables(
o.endpointVpcClients,
aws.String(o.endpointVpcId),
o.endpointVpcId,
associatedVpcCIDR,
vpcPeeringConnectionId,
&ec2.Filter{Name: aws.String("association.subnet-id"), Values: aws.StringSlice(o.endpointSubnetIds)},
ec2types.Filter{Name: aws.String("association.subnet-id"), Values: o.endpointSubnetIds},
); err != nil {
log.WithError(err).Fatal("Failed to add route to route tables of the endpoint subnets")
}

// Update SGs
associatedVpcWorkerSG, err := awsutils.GetWorkerSGFromVpcId(
associatedVpcClients,
aws.String(associatedVpcId),
)
associatedVpcWorkerSG, err := awsutils.GetWorkerSGFromVpcId(associatedVpcClients, associatedVpcId)
if err != nil {
log.WithError(err).Fatal("Failed to get worker SG of the associated VPC")
}
Expand All @@ -210,10 +210,9 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
aws.String(fmt.Sprintf("Access from worker SG of associated VPC %s", associatedVpcId)),
); err != nil {
// Proceed if ingress already authorized, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "InvalidPermission.Duplicate":
if awsclient.ErrCodeEquals(err, "InvalidPermission.Duplicate") {
log.Warnf("Traffic from the associated VPC's worker SG to the endpoint VPC's default SG is already authorized")
default:
} else {
log.WithError(err).Fatal("Failed to authorize traffic from the associated VPC's worker SG to the endpoint VPC's default SG")
}
}
Expand All @@ -226,10 +225,9 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
aws.String(fmt.Sprintf("Access from default SG of endpoint VPC %s", o.endpointVpcId)),
); err != nil {
// Proceed if ingress already authorized, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "InvalidPermission.Duplicate":
if awsclient.ErrCodeEquals(err, "InvalidPermission.Duplicate") {
log.Warnf("Traffic from the endpoint VPC's default SG to the associated VPC's worker SG is already authorized")
default:
} else {
log.WithError(err).Fatal("Failed to authorize traffic from the endpoint VPC's default SG to the associated VPC's worker SG")
}
}
Expand All @@ -244,10 +242,9 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
aws.String(fmt.Sprintf("Access from CIDR block of associated VPC %s", associatedVpcId)),
); err != nil {
// Proceed if ingress already authorized, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "InvalidPermission.Duplicate":
if awsclient.ErrCodeEquals(err, "InvalidPermission.Duplicate") {
log.Warnf("Traffic from the associated VPC's CIDR block to the endpoint VPC's default SG is already authorized")
default:
} else {
log.WithError(err).Fatal("Failed to authorize traffic from the associated VPC's CIDR block to the endpoint VPC's default SG")
}
}
Expand All @@ -260,10 +257,9 @@ func (o *endpointVPCAddOptions) Run(cmd *cobra.Command, args []string) error {
aws.String(fmt.Sprintf("Access from CIDR block of endpoint VPC %s", o.endpointVpcId)),
); err != nil {
// Proceed if ingress already authorized, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "InvalidPermission.Duplicate":
if awsclient.ErrCodeEquals(err, "InvalidPermission.Duplicate") {
log.Warnf("Traffic from the endpoint VPC's CIDR block to the associated VPC's worker SG is already authorized")
default:
} else {
log.WithError(err).Fatal("Failed to authorize traffic from the endpoint VPC's CIDR block to the associated VPC's worker SG")
}
}
Expand All @@ -283,13 +279,13 @@ func (o *endpointVPCAddOptions) addEndpointVpcToHiveConfig() {
var endpointSubnets []hivev1.AWSPrivateLinkSubnet
if err := o.endpointVpcClients.DescribeSubnetsPages(
&ec2.DescribeSubnetsInput{
SubnetIds: aws.StringSlice(o.endpointSubnetIds),
SubnetIds: o.endpointSubnetIds,
},
func(page *ec2.DescribeSubnetsOutput, lastPage bool) bool {
for _, subnet := range page.Subnets {
endpointSubnet := hivev1.AWSPrivateLinkSubnet{
SubnetID: *subnet.SubnetId,
AvailabilityZone: *subnet.AvailabilityZone,
SubnetID: aws.ToString(subnet.SubnetId),
AvailabilityZone: aws.ToString(subnet.AvailabilityZone),
}
endpointSubnets = append(endpointSubnets, endpointSubnet)
}
Expand Down Expand Up @@ -335,13 +331,13 @@ func (o *endpointVPCAddOptions) addEndpointVpcToHiveConfig() {

func addRouteToRouteTables(
vpcClients awsclient.Client,
vpcId, peerCIDR, VpcPeeringConnectionId *string,
additionalFiltersForRouteTables ...*ec2.Filter,
vpcId string, peerCIDR, VpcPeeringConnectionId *string,
additionalFiltersForRouteTables ...ec2types.Filter,
) error {
filters := append([]*ec2.Filter{
filters := append([]ec2types.Filter{
{
Name: aws.String("vpc-id"),
Values: []*string{vpcId},
Values: []string{vpcId},
},
}, additionalFiltersForRouteTables...)

Expand All @@ -358,14 +354,13 @@ func addRouteToRouteTables(
})
if err != nil {
// Proceed if route already exists, fail otherwise
switch aerr, ok := err.(awserr.Error); {
case ok && aerr.Code() == "RouteAlreadyExists":
log.Warnf("Route already exists in route table %v", *routeTable.RouteTableId)
default:
log.WithError(err).Fatalf("Failed to create route for route table %v", *routeTable.RouteTableId)
if awsclient.ErrCodeEquals(err, "RouteAlreadyExists") {
log.WithField("routeTableId", aws.ToString(routeTable.RouteTableId)).Warn("Route already exists in route table")
} else {
log.WithField("routeTableId", aws.ToString(routeTable.RouteTableId)).WithError(err).Fatal("Failed to create route for route table")
}
} else {
log.Debugf("Route added to route table %v", *routeTable.RouteTableId)
log.WithField("routeTableId", aws.ToString(routeTable.RouteTableId)).Debug("Route added to route table")
}
}

Expand All @@ -389,23 +384,26 @@ func setupVpcPeeringConnection(
if err != nil {
return nil, err
}
log.Debugf("VPC peering connection %v requested", *createVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId)

pcid := aws.ToString(createVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId)
log.WithField("VpcPeeringConnectionId", pcid).Debug("VPC peering connection requested")

err = endpointVpcClients.WaitUntilVpcPeeringConnectionExists(&ec2.DescribeVpcPeeringConnectionsInput{
VpcPeeringConnectionIds: []*string{createVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId},
VpcPeeringConnectionIds: []string{pcid},
})
if err != nil {
return nil, err
}
log.Debugf("VPC peering connection %v exists", *createVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId)
log.WithField("VpcPeeringConnectionId", pcid).Debug("VPC peering connection exists")

acceptVpcPeeringConnectionOutput, err := endpointVpcClients.AcceptVpcPeeringConnection(&ec2.AcceptVpcPeeringConnectionInput{
VpcPeeringConnectionId: createVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId,
})
if err != nil {
return nil, err
}
log.Debugf("VPC peering connection %v accepted", *acceptVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId)
log.WithField("VpcPeeringConnectionId", aws.ToString(acceptVpcPeeringConnectionOutput.VpcPeeringConnection.VpcPeeringConnectionId)).
Debugf("VPC peering connection accepted")

return acceptVpcPeeringConnectionOutput, nil
}
Loading