Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ (WIP) Extract the logic to add cluster annotations to the driver interface and add unit and integration tests #750

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion pkg/registration/register/aws_irsa/aws_irsa.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aws_irsa
import (
"context"
"fmt"
operatorv1 "open-cluster-management.io/api/operator/v1"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
Expand All @@ -13,6 +14,10 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"

clusterv1 "open-cluster-management.io/api/cluster/v1"

//operatorv1 "open-cluster-management.io/api/operator/v1"

"open-cluster-management.io/ocm/pkg/registration/register"
)

Expand All @@ -22,7 +27,9 @@ const (
// TLSKeyFile is the name of tls key file in kubeconfigSecret
TLSKeyFile = "tls.key"
// TLSCertFile is the name of the tls cert file in kubeconfigSecret
TLSCertFile = "tls.crt"
TLSCertFile = "tls.crt"
ManagedClusterArn = "managed-cluster-arn"
ManagedClusterIAMRoleSuffix = "managed-cluster-iam-role-suffix"
)

type AWSIRSADriver struct {
Expand Down Expand Up @@ -95,6 +102,24 @@ func (c *AWSIRSADriver) IsHubKubeConfigValid(ctx context.Context, secretOption r
return true, nil
}

func (c *AWSIRSADriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster, clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) *clusterv1.ManagedCluster {
if clusterAnnotations == nil {
clusterAnnotations = map[string]string{}
}
clusterAnnotations[operatorv1.ClusterAnnotationsKeyPrefix+"/"+ManagedClusterArn] = managedClusterArn
clusterAnnotations[operatorv1.ClusterAnnotationsKeyPrefix+"/"+ManagedClusterIAMRoleSuffix] = managedClusterRoleSuffix
return cluster
}

//func (c *AWSIRSADriver) AddClusterAnnotations(clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) {
// if clusterAnnotations == nil {
// clusterAnnotations = map[string]string{}
// }
//
// clusterAnnotations[operatorv1.ClusterAnnotationsKeyPrefix+"/"+ManagedClusterArn] = managedClusterArn
// clusterAnnotations[operatorv1.ClusterAnnotationsKeyPrefix+"/"+ManagedClusterIAMRoleSuffix] = managedClusterRoleSuffix
//}

func NewAWSIRSADriver() register.RegisterDriver {
return &AWSIRSADriver{}
}
10 changes: 10 additions & 0 deletions pkg/registration/register/csr/csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"k8s.io/client-go/util/keyutil"
"k8s.io/klog/v2"

clusterv1 "open-cluster-management.io/api/cluster/v1"

"open-cluster-management.io/ocm/pkg/registration/register"
)

Expand Down Expand Up @@ -266,6 +268,14 @@ func (c *CSRDriver) IsHubKubeConfigValid(ctx context.Context, secretOption regis
return isCertificateValid(logger, certData, nil)
}

func (c *CSRDriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster, clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) *clusterv1.ManagedCluster {
return cluster
}

//// AddClusterAnnotations noop for CSR driver
//func (c *CSRDriver) AddClusterAnnotations(clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) {
//}

func NewCSRDriver() register.RegisterDriver {
return &CSRDriver{}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/registration/register/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ type RegisterDriver interface {
// InformerHandler returns informer of the related object. If no object needs to be watched, the func could
// return nil, nil.
InformerHandler(option any) (cache.SharedIndexInformer, factory.EventFilterFunc)

// ManagedClusterDecorator is to change managed cluster metadata or spec during registration process.
ManagedClusterDecorator(cluster *clusterv1.ManagedCluster, clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) *clusterv1.ManagedCluster

//// AddClusterAnnotations adds cluster annotations for non-CSR drivers
//AddClusterAnnotations(clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string)
}

// Approvers is the inteface that each driver should implement on hub side. The hub controller will use this driver
Expand Down
11 changes: 10 additions & 1 deletion pkg/registration/register/secret_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

clusterv1 "open-cluster-management.io/api/cluster/v1"

testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)
Expand Down Expand Up @@ -133,7 +135,7 @@ func TestSync(t *testing.T) {
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
syncCtx := testingcommon.NewFakeSyncContext(t, "test")
kubeClient := kubefake.NewSimpleClientset(c.secrets...)
kubeClient := kubefake.NewClientset(c.secrets...)
c.option.ManagementCoreClient = kubeClient.CoreV1()
informerFactory := informers.NewSharedInformerFactory(kubeClient, 10*time.Minute)
c.option.ManagementSecretInformer = informerFactory.Core().V1().Secrets().Informer()
Expand Down Expand Up @@ -167,6 +169,9 @@ type fakeDriver struct {
cond *metav1.Condition
}

func (f *fakeDriver) AddClusterAnnotations(clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) {
}

func newFakeDriver(secret *corev1.Secret, cond *metav1.Condition, err error) *fakeDriver {
return &fakeDriver{
secret: secret,
Expand Down Expand Up @@ -195,3 +200,7 @@ func (f *fakeDriver) Process(
func (f *fakeDriver) InformerHandler(_ any) (cache.SharedIndexInformer, factory.EventFilterFunc) {
return nil, nil
}

func (f *fakeDriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster, clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) *clusterv1.ManagedCluster {
return cluster
}
103 changes: 55 additions & 48 deletions pkg/registration/spoke/registration/creating_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,26 @@ var (
CreatingControllerSyncInterval = 60 * time.Minute
)

type ManagedClusterDecorator func(cluster *clusterv1.ManagedCluster, clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) *clusterv1.ManagedCluster

// managedClusterCreatingController creates a ManagedCluster on hub cluster during the spoke agent bootstrap phase
type managedClusterCreatingController struct {
clusterName string
spokeExternalServerURLs []string
spokeCABundle []byte
clusterAnnotations map[string]string
hubClusterClient clientset.Interface
clusterName string
clusterDecorators []ManagedClusterDecorator
hubClusterClient clientset.Interface
}

// NewManagedClusterCreatingController creates a new managedClusterCreatingController on the managed cluster.
func NewManagedClusterCreatingController(
clusterName string, spokeExternalServerURLs []string, annotations map[string]string,
spokeCABundle []byte,
clusterName string,
decorators []ManagedClusterDecorator,
hubClusterClient clientset.Interface,
recorder events.Recorder) factory.Controller {

c := &managedClusterCreatingController{
clusterName: clusterName,
spokeExternalServerURLs: spokeExternalServerURLs,
spokeCABundle: spokeCABundle,
clusterAnnotations: commonhelpers.FilterClusterAnnotations(annotations),
hubClusterClient: hubClusterClient,
clusterName: clusterName,
hubClusterClient: hubClusterClient,
clusterDecorators: decorators,
}

return factory.New().
Expand All @@ -69,20 +67,12 @@ func (c *managedClusterCreatingController) sync(ctx context.Context, syncCtx fac
if errors.IsNotFound(err) {
managedCluster := &clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: c.clusterName,
Annotations: c.clusterAnnotations,
Name: c.clusterName,
},
}

if len(c.spokeExternalServerURLs) != 0 {
var managedClusterClientConfigs []clusterv1.ClientConfig
for _, serverURL := range c.spokeExternalServerURLs {
managedClusterClientConfigs = append(managedClusterClientConfigs, clusterv1.ClientConfig{
URL: serverURL,
CABundle: c.spokeCABundle,
})
}
managedCluster.Spec.ManagedClusterClientConfigs = managedClusterClientConfigs
for _, decorator := range c.clusterDecorators {
managedCluster = decorator(managedCluster, nil, "", "")
}

_, err = c.hubClusterClient.ClusterV1().ManagedClusters().Create(ctx, managedCluster, metav1.CreateOptions{})
Expand All @@ -94,37 +84,17 @@ func (c *managedClusterCreatingController) sync(ctx context.Context, syncCtx fac
return nil
}

// do not update ManagedClusterClientConfigs in ManagedCluster if spokeExternalServerURLs is empty
if len(c.spokeExternalServerURLs) == 0 {
return nil
managedCluster := existingCluster.DeepCopy()
for _, decorator := range c.clusterDecorators {
managedCluster = decorator(managedCluster, nil, "", "")
}

// merge ClientConfig
managedClusterClientConfigs := existingCluster.Spec.ManagedClusterClientConfigs
for _, serverURL := range c.spokeExternalServerURLs {
isIncludeByExisting := false
for _, existingClientConfig := range existingCluster.Spec.ManagedClusterClientConfigs {
if serverURL == existingClientConfig.URL {
isIncludeByExisting = true
break
}
}

if !isIncludeByExisting {
managedClusterClientConfigs = append(managedClusterClientConfigs, clusterv1.ClientConfig{
URL: serverURL,
CABundle: c.spokeCABundle,
})
}
}
if len(existingCluster.Spec.ManagedClusterClientConfigs) == len(managedClusterClientConfigs) {
if len(existingCluster.Spec.ManagedClusterClientConfigs) == len(managedCluster.Spec.ManagedClusterClientConfigs) {
return nil
}

// update ManagedClusterClientConfigs in ManagedCluster
clusterCopy := existingCluster.DeepCopy()
clusterCopy.Spec.ManagedClusterClientConfigs = managedClusterClientConfigs
_, err = c.hubClusterClient.ClusterV1().ManagedClusters().Update(ctx, clusterCopy, metav1.UpdateOptions{})
_, err = c.hubClusterClient.ClusterV1().ManagedClusters().Update(ctx, managedCluster, metav1.UpdateOptions{})
// ManagedClusterClientConfigs in ManagedCluster is only allowed updated during bootstrap.
// After bootstrap secret expired, an unauthorized error will be got, skip it
if skipUnauthorizedError(err) != nil {
Expand All @@ -141,3 +111,40 @@ func skipUnauthorizedError(err error) error {

return err
}

func AnnotationDecorator(annotations map[string]string) ManagedClusterDecorator {
return func(cluster *clusterv1.ManagedCluster, clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) *clusterv1.ManagedCluster {
filteredAnnotations := commonhelpers.FilterClusterAnnotations(annotations)
if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
for key, value := range filteredAnnotations {
cluster.Annotations[key] = value
}
return cluster
}
}

// ClientConfigDecorator merge ClientConfig
func ClientConfigDecorator(externalServerURLs []string, caBundle []byte) ManagedClusterDecorator {
return func(cluster *clusterv1.ManagedCluster, clusterAnnotations map[string]string, managedClusterArn string, managedClusterRoleSuffix string) *clusterv1.ManagedCluster {
for _, serverURL := range externalServerURLs {
isIncludeByExisting := false
for _, existingClientConfig := range cluster.Spec.ManagedClusterClientConfigs {
if serverURL == existingClientConfig.URL {
isIncludeByExisting = true
break
}
}

if !isIncludeByExisting {
cluster.Spec.ManagedClusterClientConfigs = append(
cluster.Spec.ManagedClusterClientConfigs, clusterv1.ClientConfig{
URL: serverURL,
CABundle: caBundle,
})
}
}
return cluster
}
}
13 changes: 7 additions & 6 deletions pkg/registration/spoke/registration/creating_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ func TestCreateSpokeCluster(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
ctrl := managedClusterCreatingController{
clusterName: testinghelpers.TestManagedClusterName,
spokeExternalServerURLs: []string{testSpokeExternalServerUrl},
spokeCABundle: []byte("testcabundle"),
hubClusterClient: clusterClient,
clusterAnnotations: map[string]string{
"agent.open-cluster-management.io/test": "true",
clusterName: testinghelpers.TestManagedClusterName,
clusterDecorators: []ManagedClusterDecorator{
AnnotationDecorator(map[string]string{
"agent.open-cluster-management.io/test": "true",
}),
ClientConfigDecorator([]string{testSpokeExternalServerUrl}, []byte("testcabundle")),
},
hubClusterClient: clusterClient,
}

syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, ""))
Expand Down
27 changes: 10 additions & 17 deletions pkg/registration/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"
operatorv1 "open-cluster-management.io/api/operator/v1"

"open-cluster-management.io/ocm/pkg/common/helpers"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
Expand Down Expand Up @@ -191,24 +190,14 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,

// initiate registration driver
var registerDriver register.RegisterDriver
if o.registrationOption.RegistrationAuth == AwsIrsaAuthType {
// TODO: may consider add additional validations
if o.registrationOption.HubClusterArn != "" {
registerDriver = awsIrsa.NewAWSIRSADriver()
if o.registrationOption.ClusterAnnotations == nil {
o.registrationOption.ClusterAnnotations = map[string]string{}
}
o.registrationOption.ClusterAnnotations[operatorv1.ClusterAnnotationsKeyPrefix+"/managed-cluster-arn"] = o.registrationOption.ManagedClusterArn
o.registrationOption.ClusterAnnotations[operatorv1.ClusterAnnotationsKeyPrefix+"/managed-cluster-iam-role-suffix"] =
o.registrationOption.ManagedClusterRoleSuffix

} else {
panic("A valid EKS Hub Cluster ARN is required with awsirsa based authentication")
}
var registrationOption = o.registrationOption
if registrationOption.RegistrationAuth == AwsIrsaAuthType {
registerDriver = awsIrsa.NewAWSIRSADriver()
} else {
registerDriver = csr.NewCSRDriver()
}

//registerDriver.AddClusterAnnotations(registrationOption.ClusterAnnotations, registrationOption.ManagedClusterArn, registrationOption.ManagedClusterRoleSuffix)
o.driver = registerDriver

// get spoke cluster CA bundle
Expand Down Expand Up @@ -254,8 +243,12 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,

// start a SpokeClusterCreatingController to make sure there is a spoke cluster on hub cluster
spokeClusterCreatingController := registration.NewManagedClusterCreatingController(
o.agentOptions.SpokeClusterName, o.registrationOption.SpokeExternalServerURLs, o.registrationOption.ClusterAnnotations,
spokeClusterCABundle,
o.agentOptions.SpokeClusterName,
[]registration.ManagedClusterDecorator{
registration.AnnotationDecorator(o.registrationOption.ClusterAnnotations),
registration.ClientConfigDecorator(o.registrationOption.SpokeExternalServerURLs, spokeClusterCABundle),
o.driver.ManagedClusterDecorator,
},
bootstrapClusterClient,
recorder,
)
Expand Down
14 changes: 14 additions & 0 deletions pkg/registration/spoke/spokeagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func init() {
func TestValidate(t *testing.T) {
defaultCompletedOptions := NewSpokeAgentOptions()
defaultCompletedOptions.BootstrapKubeconfig = "/spoke/bootstrap/kubeconfig"
awsCompletedOptionsHubArnMissing := *defaultCompletedOptions
awsCompletedOptionsHubArnMissing.RegistrationAuth = AwsIrsaAuthType
awsDefaultCompletedOptions := awsCompletedOptionsHubArnMissing
awsDefaultCompletedOptions.HubClusterArn = "arn:aws:eks:us-west-2:123456789012:cluster/hub-cluster1"

cases := []struct {
name string
Expand Down Expand Up @@ -78,6 +82,16 @@ func TestValidate(t *testing.T) {
options: defaultCompletedOptions,
expectedErr: "",
},
{
name: "default completed options for aws flow",
options: &awsDefaultCompletedOptions,
expectedErr: "",
},
{
name: "default completed options without HubClusterArn for aws flow",
options: &awsCompletedOptionsHubArnMissing,
expectedErr: "EksHubClusterArn cannot be empty if RegistrationAuth is awsirsa",
},
{
name: "default completed options",
options: &SpokeAgentOptions{
Expand Down
Loading
Loading