diff --git a/apis/exoscale/v1/kafka_types.go b/apis/exoscale/v1/kafka_types.go new file mode 100644 index 00000000..dbcca07a --- /dev/null +++ b/apis/exoscale/v1/kafka_types.go @@ -0,0 +1,115 @@ +package v1 + +import ( + "reflect" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// KafkaParameters are the configurable fields of a Kafka instance. +type KafkaParameters struct { + Maintenance MaintenanceSpec `json:"maintenance,omitempty"` + + // +kubebuilder:validation:Required + + // Zone is the datacenter identifier in which the instance runs in. + Zone Zone `json:"zone"` + + DBaaSParameters `json:",inline"` + + // Version is the (minor) version identifier for the instance (e.g. "3.2"). + Version string `json:"version,omitempty"` + + // KafkaSettings contains additional Kafka settings. + KafkaSettings runtime.RawExtension `json:"kafkaSettings,omitempty"` +} + +// KafkaSpec defines the desired state of a Kafka. +type KafkaSpec struct { + xpv1.ResourceSpec `json:",inline"` + ForProvider KafkaParameters `json:"forProvider"` +} + +// KafkaObservation are the observable fields of a Kafka. +type KafkaObservation struct { + Version string `json:"version,omitempty"` + // KafkaSettings contains additional Kafka settings as set by the provider. + KafkaSettings runtime.RawExtension `json:"kafkaSettings,omitempty"` + + // State of individual service nodes + NodeStates []NodeState `json:"nodeStates,omitempty"` + + // Service notifications + Notifications []Notification `json:"notifications,omitempty"` +} + +// KafkaStatus represents the observed state of a Kafka instance. +type KafkaStatus struct { + xpv1.ResourceStatus `json:",inline"` + AtProvider KafkaObservation `json:"atProvider,omitempty"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].reason" +// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status" +// +kubebuilder:printcolumn:name="Synced",type="string",JSONPath=".status.conditions[?(@.type=='Synced')].status" +// +kubebuilder:printcolumn:name="External Name",type="string",JSONPath=".metadata.annotations.crossplane\\.io/external-name" +// +kubebuilder:printcolumn:name="Plan",type="string",JSONPath=".spec.forProvider.size.plan" +// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Cluster,categories={crossplane,exoscale} +// +kubebuilder:webhook:verbs=create;update,path=/validate-exoscale-crossplane-io-v1-kafka,mutating=false,failurePolicy=fail,groups=exoscale.crossplane.io,resources=kafkas,versions=v1,name=kafkas.exoscale.crossplane.io,sideEffects=None,admissionReviewVersions=v1 + +// Kafka is the API for creating Kafka. +type Kafka struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec KafkaSpec `json:"spec"` + Status KafkaStatus `json:"status,omitempty"` +} + +// GetProviderConfigName returns the name of the ProviderConfig. +// Returns empty string if reference not given. +func (in *Kafka) GetProviderConfigName() string { + if ref := in.GetProviderConfigReference(); ref != nil { + return ref.Name + } + return "" +} + +// GetInstanceName returns the external name of the instance in the following precedence: +// +// .metadata.annotations."crossplane.io/external-name" +// .metadata.name +func (in *Kafka) GetInstanceName() string { + if name := meta.GetExternalName(in); name != "" { + return name + } + return in.Name +} + +// +kubebuilder:object:root=true + +// KafkaList contains a list of Kafka +type KafkaList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Kafka `json:"items"` +} + +// Kafka type metadata. +var ( + KafkaKind = reflect.TypeOf(Kafka{}).Name() + KafkaGroupKind = schema.GroupKind{Group: Group, Kind: KafkaKind}.String() + KafkaKindAPIVersion = KafkaKind + "." + SchemeGroupVersion.String() + KafkaGroupVersionKind = SchemeGroupVersion.WithKind(KafkaKind) +) + +func init() { + SchemeBuilder.Register(&Kafka{}, &KafkaList{}) +} diff --git a/apis/exoscale/v1/zz_generated.deepcopy.go b/apis/exoscale/v1/zz_generated.deepcopy.go index a5f099b3..5f735225 100644 --- a/apis/exoscale/v1/zz_generated.deepcopy.go +++ b/apis/exoscale/v1/zz_generated.deepcopy.go @@ -312,6 +312,145 @@ func (in IPFilter) DeepCopy() IPFilter { return *out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Kafka) DeepCopyInto(out *Kafka) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Kafka. +func (in *Kafka) DeepCopy() *Kafka { + if in == nil { + return nil + } + out := new(Kafka) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Kafka) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaList) DeepCopyInto(out *KafkaList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Kafka, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaList. +func (in *KafkaList) DeepCopy() *KafkaList { + if in == nil { + return nil + } + out := new(KafkaList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *KafkaList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaObservation) DeepCopyInto(out *KafkaObservation) { + *out = *in + in.KafkaSettings.DeepCopyInto(&out.KafkaSettings) + if in.NodeStates != nil { + in, out := &in.NodeStates, &out.NodeStates + *out = make([]NodeState, len(*in)) + copy(*out, *in) + } + if in.Notifications != nil { + in, out := &in.Notifications, &out.Notifications + *out = make([]Notification, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaObservation. +func (in *KafkaObservation) DeepCopy() *KafkaObservation { + if in == nil { + return nil + } + out := new(KafkaObservation) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaParameters) DeepCopyInto(out *KafkaParameters) { + *out = *in + out.Maintenance = in.Maintenance + in.DBaaSParameters.DeepCopyInto(&out.DBaaSParameters) + in.KafkaSettings.DeepCopyInto(&out.KafkaSettings) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaParameters. +func (in *KafkaParameters) DeepCopy() *KafkaParameters { + if in == nil { + return nil + } + out := new(KafkaParameters) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaSpec) DeepCopyInto(out *KafkaSpec) { + *out = *in + in.ResourceSpec.DeepCopyInto(&out.ResourceSpec) + in.ForProvider.DeepCopyInto(&out.ForProvider) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaSpec. +func (in *KafkaSpec) DeepCopy() *KafkaSpec { + if in == nil { + return nil + } + out := new(KafkaSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaStatus) DeepCopyInto(out *KafkaStatus) { + *out = *in + in.ResourceStatus.DeepCopyInto(&out.ResourceStatus) + in.AtProvider.DeepCopyInto(&out.AtProvider) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaStatus. +func (in *KafkaStatus) DeepCopy() *KafkaStatus { + if in == nil { + return nil + } + out := new(KafkaStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MaintenanceSpec) DeepCopyInto(out *MaintenanceSpec) { *out = *in diff --git a/apis/exoscale/v1/zz_generated.managed.go b/apis/exoscale/v1/zz_generated.managed.go index c85ef64f..fc8345b6 100644 --- a/apis/exoscale/v1/zz_generated.managed.go +++ b/apis/exoscale/v1/zz_generated.managed.go @@ -136,6 +136,72 @@ func (mg *IAMKey) SetWriteConnectionSecretToReference(r *xpv1.SecretReference) { mg.Spec.WriteConnectionSecretToReference = r } +// GetCondition of this Kafka. +func (mg *Kafka) GetCondition(ct xpv1.ConditionType) xpv1.Condition { + return mg.Status.GetCondition(ct) +} + +// GetDeletionPolicy of this Kafka. +func (mg *Kafka) GetDeletionPolicy() xpv1.DeletionPolicy { + return mg.Spec.DeletionPolicy +} + +// GetProviderConfigReference of this Kafka. +func (mg *Kafka) GetProviderConfigReference() *xpv1.Reference { + return mg.Spec.ProviderConfigReference +} + +/* +GetProviderReference of this Kafka. +Deprecated: Use GetProviderConfigReference. +*/ +func (mg *Kafka) GetProviderReference() *xpv1.Reference { + return mg.Spec.ProviderReference +} + +// GetPublishConnectionDetailsTo of this Kafka. +func (mg *Kafka) GetPublishConnectionDetailsTo() *xpv1.PublishConnectionDetailsTo { + return mg.Spec.PublishConnectionDetailsTo +} + +// GetWriteConnectionSecretToReference of this Kafka. +func (mg *Kafka) GetWriteConnectionSecretToReference() *xpv1.SecretReference { + return mg.Spec.WriteConnectionSecretToReference +} + +// SetConditions of this Kafka. +func (mg *Kafka) SetConditions(c ...xpv1.Condition) { + mg.Status.SetConditions(c...) +} + +// SetDeletionPolicy of this Kafka. +func (mg *Kafka) SetDeletionPolicy(r xpv1.DeletionPolicy) { + mg.Spec.DeletionPolicy = r +} + +// SetProviderConfigReference of this Kafka. +func (mg *Kafka) SetProviderConfigReference(r *xpv1.Reference) { + mg.Spec.ProviderConfigReference = r +} + +/* +SetProviderReference of this Kafka. +Deprecated: Use SetProviderConfigReference. +*/ +func (mg *Kafka) SetProviderReference(r *xpv1.Reference) { + mg.Spec.ProviderReference = r +} + +// SetPublishConnectionDetailsTo of this Kafka. +func (mg *Kafka) SetPublishConnectionDetailsTo(r *xpv1.PublishConnectionDetailsTo) { + mg.Spec.PublishConnectionDetailsTo = r +} + +// SetWriteConnectionSecretToReference of this Kafka. +func (mg *Kafka) SetWriteConnectionSecretToReference(r *xpv1.SecretReference) { + mg.Spec.WriteConnectionSecretToReference = r +} + // GetCondition of this MySQL. func (mg *MySQL) GetCondition(ct xpv1.ConditionType) xpv1.Condition { return mg.Status.GetCondition(ct) diff --git a/apis/exoscale/v1/zz_generated.managedlist.go b/apis/exoscale/v1/zz_generated.managedlist.go index 76511616..24b1381e 100644 --- a/apis/exoscale/v1/zz_generated.managedlist.go +++ b/apis/exoscale/v1/zz_generated.managedlist.go @@ -22,6 +22,15 @@ func (l *IAMKeyList) GetItems() []resource.Managed { return items } +// GetItems of this KafkaList. +func (l *KafkaList) GetItems() []resource.Managed { + items := make([]resource.Managed, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items +} + // GetItems of this MySQLList. func (l *MySQLList) GetItems() []resource.Managed { items := make([]resource.Managed, len(l.Items)) diff --git a/docs/modules/ROOT/examples/exoscale_iamkey.yaml b/docs/modules/ROOT/examples/exoscale_iamkey.yaml index a89a3477..09172ca8 100644 --- a/docs/modules/ROOT/examples/exoscale_iamkey.yaml +++ b/docs/modules/ROOT/examples/exoscale_iamkey.yaml @@ -1,17 +1,17 @@ apiVersion: exoscale.crossplane.io/v1 kind: IAMKey metadata: - name: my-exoscale-iam-key + name: iam-key-local-dev spec: forProvider: - keyName: iam-key + keyName: iam-key-local-dev services: sos: buckets: - - bucket-test-1 + - bucket-local-dev zone: CH-DK-2 providerConfigRef: name: provider-config writeConnectionSecretToRef: - name: my-exoscale-iam-key + name: my-exoscale-user-credentials namespace: default diff --git a/generate_sample.go b/generate_sample.go index 44032766..c7a01ead 100644 --- a/generate_sample.go +++ b/generate_sample.go @@ -41,6 +41,7 @@ func main() { generateMysqlSample() generatePostgresqlSample() generateRedisSample() + generateKafkaSample() } func generatePostgresqlSample() { @@ -268,6 +269,40 @@ func newRedisSample() *exoscalev1.Redis { }, } } +func generateKafkaSample() { + spec := newKafkaSample() + serialize(spec, true) +} + +func newKafkaSample() *exoscalev1.Kafka { + return &exoscalev1.Kafka{ + TypeMeta: metav1.TypeMeta{ + APIVersion: exoscalev1.KafkaGroupVersionKind.GroupVersion().String(), + Kind: exoscalev1.KafkaKind, + }, + ObjectMeta: metav1.ObjectMeta{Name: "kafka-local-dev"}, + Spec: exoscalev1.KafkaSpec{ + ResourceSpec: xpv1.ResourceSpec{ + ProviderConfigReference: &xpv1.Reference{Name: "provider-config"}, + WriteConnectionSecretToReference: &xpv1.SecretReference{Name: "kafka-local-dev-details", Namespace: "default"}, + }, + ForProvider: exoscalev1.KafkaParameters{ + Maintenance: exoscalev1.MaintenanceSpec{ + TimeOfDay: "12:00:00", + DayOfWeek: exoscaleoapi.DbaasServiceMaintenanceDowMonday, + }, + Zone: "ch-dk-2", + DBaaSParameters: exoscalev1.DBaaSParameters{ + Size: exoscalev1.SizeSpec{ + Plan: "startup-2", + }, + IPFilter: exoscalev1.IPFilter{"0.0.0.0/0"}, + }, + KafkaSettings: runtime.RawExtension{Raw: []byte(`{"connections_max_idle_ms": 60000}`)}, + }, + }, + } +} func failIfError(err error) { if err != nil { diff --git a/go.mod b/go.mod index 8f93b1f1..854c4cb6 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/exoscale/egoscale v0.90.1 github.com/go-logr/logr v1.2.3 github.com/go-logr/zapr v1.2.3 + github.com/google/go-cmp v0.5.8 github.com/hashicorp/go-version v1.6.0 github.com/minio/minio-go/v7 v7.0.45 github.com/stretchr/testify v1.8.0 @@ -46,7 +47,6 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.6.9 // indirect - github.com/google/go-cmp v0.5.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/operator/kafkacontroller/controller.go b/operator/kafkacontroller/controller.go new file mode 100644 index 00000000..917cef51 --- /dev/null +++ b/operator/kafkacontroller/controller.go @@ -0,0 +1,72 @@ +package kafkacontroller + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/crossplane/crossplane-runtime/pkg/event" + "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscalesdk "github.com/exoscale/egoscale/v2" + "github.com/exoscale/egoscale/v2/oapi" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/pipelineutil" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type connector struct { + kube client.Client + recorder event.Recorder +} + +type connection struct { + exo oapi.ClientWithResponsesInterface +} + +// Connect to the exoscale kafka provider. +func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { + log := ctrl.LoggerFrom(ctx) + log.V(1).Info("connecting resource") + + kafkaInstance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return nil, fmt.Errorf("invalid managed resource type %T for kafka connector", mg) + } + + exo, err := pipelineutil.OpenExoscaleClient(ctx, c.kube, kafkaInstance.GetProviderConfigName(), exoscalesdk.ClientOptWithAPIEndpoint(fmt.Sprintf("https://api-%s.exoscale.com", kafkaInstance.Spec.ForProvider.Zone))) + if err != nil { + return nil, err + } + return connection{ + exo: exo.Exoscale, + }, nil +} + +// SetupController adds a controller that reconciles kafka resources. +func SetupController(mgr ctrl.Manager) error { + name := strings.ToLower(exoscalev1.KafkaGroupKind) + recorder := event.NewAPIRecorder(mgr.GetEventRecorderFor(name)) + + cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())} + + r := managed.NewReconciler(mgr, + resource.ManagedKind(exoscalev1.KafkaGroupVersionKind), + managed.WithExternalConnecter(&connector{ + kube: mgr.GetClient(), + recorder: recorder, + }), + managed.WithLogger(logging.NewLogrLogger(mgr.GetLogger().WithValues("controller", name))), + managed.WithRecorder(recorder), + managed.WithPollInterval(1*time.Minute), + managed.WithConnectionPublishers(cps...), + managed.WithCreationGracePeriod(30*time.Second)) + + return ctrl.NewControllerManagedBy(mgr). + Named(name). + For(&exoscalev1.Kafka{}). + Complete(r) +} diff --git a/operator/kafkacontroller/create.go b/operator/kafkacontroller/create.go new file mode 100644 index 00000000..271059f8 --- /dev/null +++ b/operator/kafkacontroller/create.go @@ -0,0 +1,67 @@ +package kafkacontroller + +import ( + "context" + "errors" + "fmt" + "strings" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" + + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscaleapi "github.com/exoscale/egoscale/v2/api" + "github.com/exoscale/egoscale/v2/oapi" + controllerruntime "sigs.k8s.io/controller-runtime" +) + +// Create idempotently creates a Kafka instance. +// It will not return an "already exits" error. +func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("creating resource") + + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return managed.ExternalCreation{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } + + spec := instance.Spec.ForProvider + ipFilter := []string(spec.IPFilter) + settings, err := mapper.ToMap(spec.KafkaSettings) + if err != nil { + return managed.ExternalCreation{}, fmt.Errorf("invalid kafka settings: %w", err) + } + var version *string + if spec.Version != "" { + version = &spec.Version + } + + body := oapi.CreateDbaasServiceKafkaJSONRequestBody{ + IpFilter: &ipFilter, + KafkaSettings: &settings, + Maintenance: &struct { + Dow oapi.CreateDbaasServiceKafkaJSONBodyMaintenanceDow "json:\"dow\"" + Time string "json:\"time\"" + }{ + Dow: oapi.CreateDbaasServiceKafkaJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), + Time: spec.Maintenance.TimeOfDay.String(), + }, + Plan: spec.Size.Plan, + Version: version, + TerminationProtection: &spec.TerminationProtection, + } + + resp, err := c.exo.CreateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) + if err != nil { + if errors.Is(err, exoscaleapi.ErrInvalidRequest) && strings.Contains(err.Error(), "Service name is already taken") { + // According to the ExternalClient Interface, create needs to be idempotent. + // However the exoscale client doesn't return very helpful errors, so we need to make this brittle matching to find if we get an already exits error + return managed.ExternalCreation{}, nil + } + return managed.ExternalCreation{}, fmt.Errorf("unable to create instance: %w", err) + } + log.V(2).Info("response", "body", string(resp.Body)) + return managed.ExternalCreation{}, nil +} diff --git a/operator/kafkacontroller/create_test.go b/operator/kafkacontroller/create_test.go new file mode 100644 index 00000000..d8dac088 --- /dev/null +++ b/operator/kafkacontroller/create_test.go @@ -0,0 +1,107 @@ +package kafkacontroller + +import ( + "context" + "fmt" + "testing" + + exoscaleapi "github.com/exoscale/egoscale/v2/api" + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/internal/operatortest" +) + +func TestCreate(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-8" + instance.Spec.ForProvider.IPFilter = []string{ + "0.0.0.0/0", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" + ctx := context.Background() + + createReq := mockCreateKafkaCall(exoMock, "foo", nil) + + assert.NotPanics(t, func() { + _, err := c.Create(ctx, &instance) + require.NoError(t, err) + }) + if assert.NotNil(t, createReq.IpFilter) { + assert.Len(t, *createReq.IpFilter, 1) + assert.Equal(t, (*createReq.IpFilter)[0], "0.0.0.0/0") + } + if assert.NotNil(t, createReq.Plan) { + assert.Equal(t, createReq.Plan, "businesss-8") + } + if assert.NotNil(t, createReq.Maintenance) { + assert.EqualValues(t, createReq.Maintenance.Dow, "monday") + assert.Equal(t, createReq.Maintenance.Time, "10:10:10") + } +} + +func TestCreate_Idempotent(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-8" + instance.Spec.ForProvider.IPFilter = []string{ + "0.0.0.0/0", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" + + _ = mockCreateKafkaCall(exoMock, "foo", fmt.Errorf("%w: Service name is already taken.", exoscaleapi.ErrInvalidRequest)) + + assert.NotPanics(t, func() { + ctx := context.Background() + _, err := c.Create(ctx, &instance) + require.NoError(t, err) + }) +} + +func TestCreate_invalidInput(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + ctx := context.Background() + assert.NotPanics(t, func() { + _, err := c.Create(ctx, nil) + assert.Error(t, err) + }) +} + +func mockCreateKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, err error) *oapi.CreateDbaasServiceKafkaJSONRequestBody { + createReq := &oapi.CreateDbaasServiceKafkaJSONRequestBody{} + + m.On("CreateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName(name), + mock.MatchedBy(func(req oapi.CreateDbaasServiceKafkaJSONRequestBody) bool { + *createReq = req + return true + })). + Return(&oapi.CreateDbaasServiceKafkaResponse{Body: []byte{}}, err). + Once() + + return createReq +} diff --git a/operator/kafkacontroller/delete.go b/operator/kafkacontroller/delete.go new file mode 100644 index 00000000..0b0349cc --- /dev/null +++ b/operator/kafkacontroller/delete.go @@ -0,0 +1,34 @@ +package kafkacontroller + +import ( + "context" + "errors" + "fmt" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscaleapi "github.com/exoscale/egoscale/v2/api" + controllerruntime "sigs.k8s.io/controller-runtime" +) + +// Delete idempotently deletes a kafka instance. +// It will not return a "not found" error. +func (c connection) Delete(ctx context.Context, mg resource.Managed) error { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("deleting resource") + + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } + resp, err := c.exo.DeleteDbaasServiceWithResponse(ctx, instance.GetInstanceName()) + if err != nil { + if errors.Is(err, exoscaleapi.ErrNotFound) { + return nil + } + return fmt.Errorf("cannot delete kafka instance: %w", err) + } + log.V(2).Info("response", "body", string(resp.Body)) + return nil +} diff --git a/operator/kafkacontroller/delete_test.go b/operator/kafkacontroller/delete_test.go new file mode 100644 index 00000000..cb90a9b8 --- /dev/null +++ b/operator/kafkacontroller/delete_test.go @@ -0,0 +1,73 @@ +package kafkacontroller + +import ( + "context" + "testing" + + exoscaleapi "github.com/exoscale/egoscale/v2/api" + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/internal/operatortest" +) + +func TestDelete(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "buzz", + }, + } + + mockDeleteKafkaCall(exoMock, "buzz", nil) + + assert.NotPanics(t, func() { + ctx := context.Background() + err := c.Delete(ctx, &instance) + require.NoError(t, err) + }) +} +func TestDelete_invalidInput(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + assert.NotPanics(t, func() { + ctx := context.Background() + err := c.Delete(ctx, nil) + assert.Error(t, err) + }) +} +func TestDelete_Idempotent(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "buzz", + }, + } + + mockDeleteKafkaCall(exoMock, "buzz", exoscaleapi.ErrNotFound) + + assert.NotPanics(t, func() { + ctx := context.Background() + err := c.Delete(ctx, &instance) + require.NoError(t, err) + }) +} + +func mockDeleteKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, err error) { + m.On("DeleteDbaasServiceWithResponse", mock.Anything, name). + Return(&oapi.DeleteDbaasServiceResponse{Body: []byte{}}, err). + Once() +} diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go new file mode 100644 index 00000000..11ae4a73 --- /dev/null +++ b/operator/kafkacontroller/observe.go @@ -0,0 +1,188 @@ +package kafkacontroller + +import ( + "context" + "errors" + "fmt" + "strings" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscaleapi "github.com/exoscale/egoscale/v2/api" + "github.com/exoscale/egoscale/v2/oapi" + "github.com/google/go-cmp/cmp" + "k8s.io/utils/pointer" + controllerruntime "sigs.k8s.io/controller-runtime" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" +) + +// Observe the external kafka instance. +// Will return wether the the instance exits and if it is up-to-date. +// Observe will also update the status to the observed state and return connection details to connect to the instance. +func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.ExternalObservation, error) { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("observing resource") + + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return managed.ExternalObservation{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } + + res, err := c.exo.GetDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName())) + if err != nil { + if errors.Is(err, exoscaleapi.ErrNotFound) { + return managed.ExternalObservation{ResourceExists: false}, nil + } + return managed.ExternalObservation{}, err + } + external := res.JSON200 + + instance.Status.AtProvider, err = getObservation(external) + if err != nil { + log.Error(err, "failed to observe kafka instance") + } + + condition, err := getCondition(external) + if err != nil { + log.Error(err, "failed to update kafka condition") + } + instance.SetConditions(condition) + + caRes, err := c.exo.GetDbaasCaCertificateWithResponse(ctx) + if err != nil { + return managed.ExternalObservation{}, fmt.Errorf("cannot retrieve CA certificate: %w", err) + } + ca := "" + if caRes.JSON200 != nil && caRes.JSON200.Certificate != nil { + ca = *caRes.JSON200.Certificate + } + + connDetails, err := getConnectionDetails(external, ca) + if err != nil { + return managed.ExternalObservation{}, fmt.Errorf("failed to get kafka connection details: %w", err) + } + + upToDate, diff := diffParameters(external, instance.Spec.ForProvider) + + return managed.ExternalObservation{ + ResourceExists: true, + ResourceUpToDate: upToDate, + ConnectionDetails: connDetails, + Diff: diff, + }, nil +} + +func getObservation(external *oapi.DbaasServiceKafka) (exoscalev1.KafkaObservation, error) { + notifications, err := mapper.ToNotifications(external.Notifications) + if err != nil { + return exoscalev1.KafkaObservation{}, err + } + settings, err := mapper.ToRawExtension(external.KafkaSettings) + if err != nil { + return exoscalev1.KafkaObservation{}, err + } + + nodeStates := []exoscalev1.NodeState{} + if external.NodeStates != nil { + nodeStates = mapper.ToNodeStates(external.NodeStates) + } + + return exoscalev1.KafkaObservation{ + Version: pointer.StringDeref(external.Version, ""), + KafkaSettings: settings, + NodeStates: nodeStates, + Notifications: notifications, + }, nil +} +func getCondition(external *oapi.DbaasServiceKafka) (xpv1.Condition, error) { + var state oapi.EnumServiceState + if external.State != nil { + state = *external.State + } + switch state { + case oapi.EnumServiceStateRunning: + return exoscalev1.Running(), nil + case oapi.EnumServiceStateRebuilding: + return exoscalev1.Rebuilding(), nil + case oapi.EnumServiceStatePoweroff: + return exoscalev1.PoweredOff(), nil + case oapi.EnumServiceStateRebalancing: + return exoscalev1.Rebalancing(), nil + default: + return xpv1.Condition{}, fmt.Errorf("unknown state %q", state) + } +} +func getConnectionDetails(external *oapi.DbaasServiceKafka, ca string) (map[string][]byte, error) { + if external.ConnectionInfo == nil { + return nil, errors.New("no connection details") + } + nodes := "" + if external.ConnectionInfo.Nodes != nil { + nodes = strings.Join(*external.ConnectionInfo.Nodes, " ") + } + + if external.ConnectionInfo.AccessCert == nil { + return nil, errors.New("no certificate returned") + } + cert := *external.ConnectionInfo.AccessCert + + if external.ConnectionInfo.AccessKey == nil { + return nil, errors.New("no key returned") + } + key := *external.ConnectionInfo.AccessKey + + if external.Uri == nil { + return nil, errors.New("no URI returned") + } + uri := *external.Uri + host := "" + port := "" + if external.UriParams != nil { + uriParams := *external.UriParams + host, _ = uriParams["host"].(string) + port, _ = uriParams["port"].(string) + } + + return map[string][]byte{ + "KAFKA_URI": []byte(uri), + "KAFKA_HOST": []byte(host), + "KAFKA_PORT": []byte(port), + "KAFKA_NODES": []byte(nodes), + "service.cert": []byte(cert), + "service.key": []byte(key), + "ca.crt": []byte(ca), + }, nil +} + +func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaParameters) (bool, string) { + actualIPFilter := []string{} + if external.IpFilter != nil { + actualIPFilter = *external.IpFilter + } + actualKafkaSettings, err := mapper.ToRawExtension(external.KafkaSettings) + if err != nil { + return false, err.Error() + } + + actual := exoscalev1.KafkaParameters{ + Maintenance: exoscalev1.MaintenanceSpec{ + DayOfWeek: external.Maintenance.Dow, + TimeOfDay: exoscalev1.TimeOfDay(external.Maintenance.Time), + }, + Zone: expected.Zone, + DBaaSParameters: exoscalev1.DBaaSParameters{ + TerminationProtection: pointer.BoolDeref(external.TerminationProtection, false), + Size: exoscalev1.SizeSpec{ + Plan: external.Plan, + }, + IPFilter: actualIPFilter, + }, + Version: expected.Version, // We should never mark somthing as out of date if the versions don't match as update can't modify the version anyway + KafkaSettings: actualKafkaSettings, + } + + return cmp.Equal(expected, actual), cmp.Diff(expected, actual) +} diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go new file mode 100644 index 00000000..b4798a8f --- /dev/null +++ b/operator/kafkacontroller/observe_test.go @@ -0,0 +1,330 @@ +package kafkacontroller + +import ( + "context" + "testing" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + exoscaleapi "github.com/exoscale/egoscale/v2/api" + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/internal/operatortest" + "github.com/vshn/provider-exoscale/operator/mapper" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/pointer" +) + +func TestObserve_NotExits(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + + instance := sampleKafka("foo") + mockGetKafkaCall(exoMock, "foo", nil, exoscaleapi.ErrNotFound) + + assert.NotPanics(t, func() { + ctx := context.Background() + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.False(t, res.ResourceExists, "report resource not exits") + }) +} + +func TestObserve_UpToDate_ConnectionDetails(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + found.Uri = pointer.String("foobar.com:21701") + found.UriParams = &map[string]interface{}{ + "host": "foobar.com", + "port": "21701", + } + found.ConnectionInfo.Nodes = &[]string{ + "10.10.1.1:21701", + "10.10.1.2:21701", + "10.10.1.3:21701", + } + found.ConnectionInfo.AccessCert = pointer.String("CERT") + found.ConnectionInfo.AccessKey = pointer.String("KEY") + + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) + + assert.NotPanics(t, func() { + ctx := context.Background() + res, err := c.Observe(ctx, &instance) + + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + require.NotNil(t, res.ConnectionDetails) + expectedConnDetails := managed.ConnectionDetails{ + "KAFKA_URI": []byte("foobar.com:21701"), + "KAFKA_HOST": []byte("foobar.com"), + "KAFKA_PORT": []byte("21701"), + "KAFKA_NODES": []byte("10.10.1.1:21701 10.10.1.2:21701 10.10.1.3:21701"), + "service.cert": []byte("CERT"), + "service.key": []byte("KEY"), + "ca.crt": []byte("CA"), + } + assert.Equal(t, expectedConnDetails, res.ConnectionDetails) + }) +} + +func TestObserve_UpToDate_Status(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + found.Version = pointer.String("3.2.1") + found.NodeStates = &[]oapi.DbaasNodeState{ + { + Name: "node-1", + State: "running", + }, + { + Name: "node-3", + State: "leaving", + }, + } + + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) + + assert.NotPanics(t, func() { + ctx := context.Background() + res, err := c.Observe(ctx, &instance) + + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + + assert.Equal(t, "3.2.1", instance.Status.AtProvider.Version) + require.Len(t, instance.Status.AtProvider.NodeStates, 2, "expect 2 node states") + assert.Equal(t, "node-1", instance.Status.AtProvider.NodeStates[0].Name) + assert.EqualValues(t, "running", instance.Status.AtProvider.NodeStates[0].State) + assert.EqualValues(t, "leaving", instance.Status.AtProvider.NodeStates[1].State) + }) +} + +func TestObserve_UpToDate_Condition_NotReady(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + state := oapi.EnumServiceStateRebalancing + found.State = &state + + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) + + assert.NotPanics(t, func() { + ctx := context.Background() + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + + readyState := instance.Status.ConditionedStatus.GetCondition(xpv1.TypeReady) + + assert.Equal(t, corev1.ConditionFalse, readyState.Status) + assert.EqualValues(t, "Rebalancing", readyState.Reason) + }) +} + +func TestObserve_UpToDate_Condition_Ready(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + state := oapi.EnumServiceStateRunning + found.State = &state + + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) + + assert.NotPanics(t, func() { + ctx := context.Background() + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + + readyState := instance.Status.ConditionedStatus.GetCondition(xpv1.TypeReady) + + assert.Equal(t, corev1.ConditionTrue, readyState.Status) + }) +} + +func TestObserve_UpToDate_WithVersion(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + instance.Spec.ForProvider.Version = "3.2" + found := sampleAPIKafka("foo") + found.Version = pointer.String("3.2.1") + + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) + + assert.NotPanics(t, func() { + ctx := context.Background() + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + }) +} + +func TestObserve_Outdated(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + found.Maintenance.Dow = "tuesday" + + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) + + assert.NotPanics(t, func() { + ctx := context.Background() + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.False(t, res.ResourceUpToDate, "report resource not uptodate") + }) +} + +func TestObserve_Outdated_Settings(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + setting, _ := mapper.ToRawExtension(&map[string]interface{}{ + "count": 1, + "foo": "bar", + }) + instance.Spec.ForProvider.KafkaSettings = setting + found := sampleAPIKafka("foo") + found.KafkaRestSettings = &map[string]interface{}{ + "foo": "bar", + "count": 2, + } + + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) + + assert.NotPanics(t, func() { + ctx := context.Background() + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.False(t, res.ResourceUpToDate, "report resource not uptodate") + }) +} + +func sampleKafka(name string) exoscalev1.Kafka { + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-8" + instance.Spec.ForProvider.IPFilter = []string{ + "0.0.0.0/0", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" + instance.Spec.ForProvider.Zone = "ch-dk-2" + instance.Spec.ForProvider.KafkaSettings = runtime.RawExtension{Raw: []byte(`{"connections_max_idle_ms":60000}`)} + return instance +} + +func sampleAPIKafka(name string) *oapi.DbaasServiceKafka { + res := oapi.DbaasServiceKafka{} + + res.Name = oapi.DbaasServiceName(name) + res.Plan = "businesss-8" + res.IpFilter = &[]string{"0.0.0.0/0"} + res.Maintenance = &oapi.DbaasServiceMaintenance{ + Dow: "monday", + Time: "10:10:10", + } + res.KafkaSettings = &map[string]interface{}{ + "connections_max_idle_ms": 60000, + } + + nodes := []string{"194.182.160.164:21701", + "159.100.244.100:21701", + "159.100.241.65:21701", + } + res.ConnectionInfo = &struct { + AccessCert *string "json:\"access-cert,omitempty\"" + AccessKey *string "json:\"access-key,omitempty\"" + Nodes *[]string "json:\"nodes,omitempty\"" + RegistryUri *string "json:\"registry-uri,omitempty\"" + RestUri *string "json:\"rest-uri,omitempty\"" + }{ + AccessCert: pointer.String("SOME ACCESS CERT"), + AccessKey: pointer.String("SOME ACCESS KEY"), + Nodes: &nodes, + } + + res.Uri = pointer.String("foo-exoscale-8fa13713-1027-4b9c-bca7-4c14f9ff9928.aivencloud.com") + res.UriParams = &map[string]interface{}{} + + res.Version = pointer.String("3.2.1") + + return &res +} + +func mockGetKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, found *oapi.DbaasServiceKafka, err error) { + m.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName(name)). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, err). + Once() + +} +func mockCACall(m *operatortest.ClientWithResponsesInterface) { + m.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() +} diff --git a/operator/kafkacontroller/update.go b/operator/kafkacontroller/update.go new file mode 100644 index 00000000..c9288dfa --- /dev/null +++ b/operator/kafkacontroller/update.go @@ -0,0 +1,53 @@ +package kafkacontroller + +import ( + "context" + "fmt" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" + + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + "github.com/exoscale/egoscale/v2/oapi" + controllerruntime "sigs.k8s.io/controller-runtime" +) + +// Update the provided kafka instance. +func (c connection) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("updating resource") + + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return managed.ExternalUpdate{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } + + spec := instance.Spec.ForProvider + ipFilter := []string(spec.IPFilter) + settings, err := mapper.ToMap(spec.KafkaSettings) + if err != nil { + return managed.ExternalUpdate{}, fmt.Errorf("invalid kafka settings: %w", err) + } + + body := oapi.UpdateDbaasServiceKafkaJSONRequestBody{ + IpFilter: &ipFilter, + KafkaSettings: &settings, + Maintenance: &struct { + Dow oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow "json:\"dow\"" + Time string "json:\"time\"" + }{ + Dow: oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), + Time: spec.Maintenance.TimeOfDay.String(), + }, + Plan: &spec.Size.Plan, + TerminationProtection: &spec.TerminationProtection, + } + + resp, err := c.exo.UpdateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) + if err != nil { + return managed.ExternalUpdate{}, fmt.Errorf("unable to update instance: %w", err) + } + log.V(2).Info("response", "body", string(resp.Body)) + return managed.ExternalUpdate{}, nil +} diff --git a/operator/kafkacontroller/update_test.go b/operator/kafkacontroller/update_test.go new file mode 100644 index 00000000..2401f8f7 --- /dev/null +++ b/operator/kafkacontroller/update_test.go @@ -0,0 +1,81 @@ +package kafkacontroller + +import ( + "context" + "testing" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/internal/operatortest" +) + +func TestUpdate(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-4" + instance.Spec.ForProvider.IPFilter = []string{ + "1.0.0.0/8", + "2.0.0.0/8", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "11:11:11" + + updateReq := mockUpdateKafkaCall(exoMock, "bar", nil) + + assert.NotPanics(t, func() { + ctx := context.Background() + _, err := c.Update(ctx, &instance) + require.NoError(t, err) + }) + + if assert.NotNil(t, updateReq.IpFilter) { + assert.Len(t, *updateReq.IpFilter, 2) + assert.Equal(t, (*updateReq.IpFilter)[0], "1.0.0.0/8") + } + if assert.NotNil(t, updateReq.Plan) { + assert.Equal(t, *updateReq.Plan, "businesss-4") + } + if assert.NotNil(t, updateReq.Maintenance) { + assert.EqualValues(t, updateReq.Maintenance.Dow, "monday") + assert.Equal(t, updateReq.Maintenance.Time, "11:11:11") + } +} + +func TestUpdate_invalidInput(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + assert.NotPanics(t, func() { + ctx := context.Background() + _, err := c.Update(ctx, nil) + assert.Error(t, err) + }) +} + +func mockUpdateKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, err error) *oapi.UpdateDbaasServiceKafkaJSONRequestBody { + updateReq := &oapi.UpdateDbaasServiceKafkaJSONRequestBody{} + + m.On("UpdateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName(name), + mock.MatchedBy(func(req oapi.UpdateDbaasServiceKafkaJSONRequestBody) bool { + *updateReq = req + return true + })). + Return(&oapi.UpdateDbaasServiceKafkaResponse{Body: []byte{}}, err). + Once() + + return updateReq +} diff --git a/operator/kafkacontroller/webhook.go b/operator/kafkacontroller/webhook.go new file mode 100644 index 00000000..f94aefad --- /dev/null +++ b/operator/kafkacontroller/webhook.go @@ -0,0 +1,123 @@ +package kafkacontroller + +import ( + "context" + "fmt" + "strings" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/webhook" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" +) + +// SetupWebhook adds a webhook for kafka resources. +func SetupWebhook(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&exoscalev1.Kafka{}). + WithValidator(&Validator{ + log: mgr.GetLogger().WithName("webhook").WithName(strings.ToLower(exoscalev1.KafkaKind)), + }). + Complete() +} + +// Validator validates kafka admission requests. +type Validator struct { + log logr.Logger +} + +// ValidateCreate validates the spec of a created kafka resource. +func (v *Validator) ValidateCreate(_ context.Context, obj runtime.Object) error { + instance, ok := obj.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka webhook", obj) + } + v.log.V(2).WithValues("instance", instance).Info("validate create") + + return validateSpec(instance.Spec.ForProvider) +} + +// ValidateUpdate validates the spec of an updated kafka resource and checks that no immutable field has been modified. +func (v *Validator) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Object) error { + newInstance, ok := newObj.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka webhook", newObj) + } + oldInstance, ok := oldObj.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka webhook", newObj) + } + v.log.V(2).WithValues("old", oldInstance, "new", newInstance).Info("VALIDATE update") + + err := validateSpec(newInstance.Spec.ForProvider) + if err != nil { + return err + } + return validateImmutable(*oldInstance, *newInstance) +} + +// ValidateDelete validates a delete. Currently does not validate anything. +func (v *Validator) ValidateDelete(_ context.Context, obj runtime.Object) error { + v.log.V(2).Info("validate delete (noop)") + return nil +} + +func validateSpec(params exoscalev1.KafkaParameters) error { + err := validateIpFilter(params) + if err != nil { + return err + } + err = validateMaintenanceSchedule(params) + if err != nil { + return err + } + return validateKafkaSettings(params) +} + +func validateIpFilter(params exoscalev1.KafkaParameters) error { + if len(params.IPFilter) == 0 { + return fmt.Errorf("IP filter cannot be empty") + } + return nil +} + +func validateMaintenanceSchedule(params exoscalev1.KafkaParameters) error { + _, _, _, err := params.Maintenance.TimeOfDay.Parse() + return err +} + +func validateKafkaSettings(obj exoscalev1.KafkaParameters) error { + return webhook.ValidateRawExtension(obj.KafkaSettings) +} + +func validateImmutable(oldInst, newInst exoscalev1.Kafka) error { + err := compareZone(oldInst.Spec.ForProvider, newInst.Spec.ForProvider) + if err != nil { + return err + } + return compareVersion(oldInst, newInst) +} + +func compareZone(oldParams, newParams exoscalev1.KafkaParameters) error { + if oldParams.Zone != newParams.Zone { + return fmt.Errorf("field is immutable: %s (old), %s (changed)", oldParams.Zone, newParams.Zone) + } + return nil +} + +func compareVersion(oldInst, newInst exoscalev1.Kafka) error { + if oldInst.Spec.ForProvider.Version == newInst.Spec.ForProvider.Version { + return nil + } + if newInst.Spec.ForProvider.Version == "" { + // Setting version to empyt string should always be fine + return nil + } + if oldInst.Spec.ForProvider.Version == "" { + // Fall back to reported version if no version was set before + oldInst.Spec.ForProvider.Version = oldInst.Status.AtProvider.Version + } + return webhook.ValidateVersion(oldInst.Status.AtProvider.Version, oldInst.Spec.ForProvider.Version, newInst.Spec.ForProvider.Version) +} diff --git a/operator/kafkacontroller/webhook_test.go b/operator/kafkacontroller/webhook_test.go new file mode 100644 index 00000000..86ff4cc4 --- /dev/null +++ b/operator/kafkacontroller/webhook_test.go @@ -0,0 +1,93 @@ +package kafkacontroller + +import ( + "context" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" +) + +func TestWebhook_Create(t *testing.T) { + ctx := context.TODO() + v := Validator{ + log: logr.Discard(), + } + + base := sampleKafka("foo") + + t.Run("valid", func(t *testing.T) { + err := v.ValidateCreate(ctx, &base) + assert.NoError(t, err) + }) + t.Run("invalid empty", func(t *testing.T) { + err := v.ValidateCreate(ctx, &exoscalev1.Kafka{}) + assert.Error(t, err) + }) + t.Run("invalid no ipfilter", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.IPFilter = nil + err := v.ValidateCreate(ctx, &inst) + assert.Error(t, err) + }) + t.Run("invalid no time", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.Maintenance.TimeOfDay = "" + err := v.ValidateCreate(ctx, &inst) + assert.Error(t, err) + }) +} + +func TestWebhook_Update(t *testing.T) { + ctx := context.TODO() + v := Validator{ + log: logr.Discard(), + } + + base := sampleKafka("foo") + + t.Run("no change", func(t *testing.T) { + err := v.ValidateUpdate(ctx, &base, &base) + assert.NoError(t, err) + }) + t.Run("valid change", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.IPFilter = []string{"10.10.1.1/24", "10.10.2.1/24"} + err := v.ValidateUpdate(ctx, &base, &inst) + assert.NoError(t, err) + }) + t.Run("remove ipfilter", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.IPFilter = nil + err := v.ValidateUpdate(ctx, &base, &inst) + assert.Error(t, err) + }) + t.Run("change zone", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.Zone = "ch-gva-2" + err := v.ValidateUpdate(ctx, &base, &inst) + assert.Error(t, err) + }) + t.Run("change unsupported version", func(t *testing.T) { + newInst := base + oldInst := base + + oldInst.Status.AtProvider.Version = "3.2.1" + newInst.Spec.ForProvider.Version = "3.3" + + err := v.ValidateUpdate(ctx, &oldInst, &newInst) + assert.Error(t, err) + }) + t.Run("change supported version", func(t *testing.T) { + newInst := base + oldInst := base + + oldInst.Status.AtProvider.Version = "3.2.1" + newInst.Spec.ForProvider.Version = "3.2" + + err := v.ValidateUpdate(ctx, &oldInst, &newInst) + assert.NoError(t, err) + }) + +} diff --git a/operator/operator.go b/operator/operator.go index 5649597c..7d4c6eea 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -4,6 +4,7 @@ import ( "github.com/vshn/provider-exoscale/operator/bucketcontroller" "github.com/vshn/provider-exoscale/operator/configcontroller" "github.com/vshn/provider-exoscale/operator/iamkeycontroller" + "github.com/vshn/provider-exoscale/operator/kafkacontroller" "github.com/vshn/provider-exoscale/operator/mysqlcontroller" "github.com/vshn/provider-exoscale/operator/postgresqlcontroller" "github.com/vshn/provider-exoscale/operator/rediscontroller" @@ -20,6 +21,7 @@ func SetupControllers(mgr ctrl.Manager) error { mysqlcontroller.SetupController, postgresqlcontroller.SetupController, rediscontroller.SetupController, + kafkacontroller.SetupController, } { if err := setup(mgr); err != nil { return err @@ -36,6 +38,7 @@ func SetupWebhooks(mgr ctrl.Manager) error { mysqlcontroller.SetupWebhook, postgresqlcontroller.SetupWebhook, rediscontroller.SetupWebhook, + kafkacontroller.SetupWebhook, } { if err := setup(mgr); err != nil { return err diff --git a/package/crds/exoscale.crossplane.io_kafkas.yaml b/package/crds/exoscale.crossplane.io_kafkas.yaml new file mode 100644 index 00000000..8bfd1081 --- /dev/null +++ b/package/crds/exoscale.crossplane.io_kafkas.yaml @@ -0,0 +1,398 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.10.0 + creationTimestamp: null + name: kafkas.exoscale.crossplane.io +spec: + group: exoscale.crossplane.io + names: + categories: + - crossplane + - exoscale + kind: Kafka + listKind: KafkaList + plural: kafkas + singular: kafka + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .status.conditions[?(@.type=='Ready')].reason + name: State + type: string + - jsonPath: .status.conditions[?(@.type=='Ready')].status + name: Ready + type: string + - jsonPath: .status.conditions[?(@.type=='Synced')].status + name: Synced + type: string + - jsonPath: .metadata.annotations.crossplane\.io/external-name + name: External Name + type: string + - jsonPath: .spec.forProvider.size.plan + name: Plan + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1 + schema: + openAPIV3Schema: + description: Kafka is the API for creating Kafka. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: KafkaSpec defines the desired state of a Kafka. + properties: + deletionPolicy: + default: Delete + description: DeletionPolicy specifies what will happen to the underlying + external when this managed resource is deleted - either "Delete" + or "Orphan" the external resource. + enum: + - Orphan + - Delete + type: string + forProvider: + description: KafkaParameters are the configurable fields of a Kafka + instance. + properties: + ipFilter: + description: IPFilter is a list of allowed IPv4 CIDR ranges that + can access the service. If no IP Filter is set, you may not + be able to reach the service. A value of `0.0.0.0/0` will open + the service to all addresses on the public internet. + items: + type: string + type: array + kafkaSettings: + description: KafkaSettings contains additional Kafka settings. + type: object + x-kubernetes-preserve-unknown-fields: true + maintenance: + description: MaintenanceSpec contains settings to control the + maintenance of an instance. + properties: + dayOfWeek: + description: DayOfWeek specifies at which weekday the maintenance + is held place. Allowed values are [monday, tuesday, wednesday, + thursday, friday, saturday, sunday, never] + enum: + - monday + - tuesday + - wednesday + - thursday + - friday + - saturday + - sunday + - never + type: string + timeOfDay: + description: 'TimeOfDay for installing updates in UTC. Format: + "hh:mm:ss".' + pattern: ^([0-1]?[0-9]|2[0-3]):([0-5][0-9]):([0-5][0-9])$ + type: string + type: object + size: + description: Size contains the service capacity settings. + properties: + plan: + type: string + type: object + terminationProtection: + description: TerminationProtection protects against termination + and powering off. + type: boolean + version: + description: Version is the (minor) version identifier for the + instance (e.g. "3.2"). + type: string + zone: + description: Zone is the datacenter identifier in which the instance + runs in. + enum: + - ch-gva-2 + - ch-dk-2 + - de-fra-1 + - de-muc-1 + - at-vie-1 + - bg-sof-1 + type: string + required: + - zone + type: object + providerConfigRef: + default: + name: default + description: ProviderConfigReference specifies how the provider that + will be used to create, observe, update, and delete this managed + resource should be configured. + properties: + name: + description: Name of the referenced object. + type: string + policy: + description: Policies for referencing. + properties: + resolution: + default: Required + description: Resolution specifies whether resolution of this + reference is required. The default is 'Required', which + means the reconcile will fail if the reference cannot be + resolved. 'Optional' means this reference will be a no-op + if it cannot be resolved. + enum: + - Required + - Optional + type: string + resolve: + description: Resolve specifies when this reference should + be resolved. The default is 'IfNotPresent', which will attempt + to resolve the reference only when the corresponding field + is not present. Use 'Always' to resolve the reference on + every reconcile. + enum: + - Always + - IfNotPresent + type: string + type: object + required: + - name + type: object + providerRef: + description: 'ProviderReference specifies the provider that will be + used to create, observe, update, and delete this managed resource. + Deprecated: Please use ProviderConfigReference, i.e. `providerConfigRef`' + properties: + name: + description: Name of the referenced object. + type: string + policy: + description: Policies for referencing. + properties: + resolution: + default: Required + description: Resolution specifies whether resolution of this + reference is required. The default is 'Required', which + means the reconcile will fail if the reference cannot be + resolved. 'Optional' means this reference will be a no-op + if it cannot be resolved. + enum: + - Required + - Optional + type: string + resolve: + description: Resolve specifies when this reference should + be resolved. The default is 'IfNotPresent', which will attempt + to resolve the reference only when the corresponding field + is not present. Use 'Always' to resolve the reference on + every reconcile. + enum: + - Always + - IfNotPresent + type: string + type: object + required: + - name + type: object + publishConnectionDetailsTo: + description: PublishConnectionDetailsTo specifies the connection secret + config which contains a name, metadata and a reference to secret + store config to which any connection details for this managed resource + should be written. Connection details frequently include the endpoint, + username, and password required to connect to the managed resource. + properties: + configRef: + default: + name: default + description: SecretStoreConfigRef specifies which secret store + config should be used for this ConnectionSecret. + properties: + name: + description: Name of the referenced object. + type: string + policy: + description: Policies for referencing. + properties: + resolution: + default: Required + description: Resolution specifies whether resolution of + this reference is required. The default is 'Required', + which means the reconcile will fail if the reference + cannot be resolved. 'Optional' means this reference + will be a no-op if it cannot be resolved. + enum: + - Required + - Optional + type: string + resolve: + description: Resolve specifies when this reference should + be resolved. The default is 'IfNotPresent', which will + attempt to resolve the reference only when the corresponding + field is not present. Use 'Always' to resolve the reference + on every reconcile. + enum: + - Always + - IfNotPresent + type: string + type: object + required: + - name + type: object + metadata: + description: Metadata is the metadata for connection secret. + properties: + annotations: + additionalProperties: + type: string + description: Annotations are the annotations to be added to + connection secret. - For Kubernetes secrets, this will be + used as "metadata.annotations". - It is up to Secret Store + implementation for others store types. + type: object + labels: + additionalProperties: + type: string + description: Labels are the labels/tags to be added to connection + secret. - For Kubernetes secrets, this will be used as "metadata.labels". + - It is up to Secret Store implementation for others store + types. + type: object + type: + description: Type is the SecretType for the connection secret. + - Only valid for Kubernetes Secret Stores. + type: string + type: object + name: + description: Name is the name of the connection secret. + type: string + required: + - name + type: object + writeConnectionSecretToRef: + description: WriteConnectionSecretToReference specifies the namespace + and name of a Secret to which any connection details for this managed + resource should be written. Connection details frequently include + the endpoint, username, and password required to connect to the + managed resource. This field is planned to be replaced in a future + release in favor of PublishConnectionDetailsTo. Currently, both + could be set independently and connection details would be published + to both without affecting each other. + properties: + name: + description: Name of the secret. + type: string + namespace: + description: Namespace of the secret. + type: string + required: + - name + - namespace + type: object + required: + - forProvider + type: object + status: + description: KafkaStatus represents the observed state of a Kafka instance. + properties: + atProvider: + description: KafkaObservation are the observable fields of a Kafka. + properties: + kafkaSettings: + description: KafkaSettings contains additional Kafka settings + as set by the provider. + type: object + x-kubernetes-preserve-unknown-fields: true + nodeStates: + description: State of individual service nodes + items: + description: NodeState describes the state of a service node. + properties: + name: + description: Name of the service node + type: string + role: + description: Role of this node. + type: string + state: + description: State of the service node. + type: string + type: object + type: array + notifications: + description: Service notifications + items: + description: Notification contains a service message. + properties: + level: + description: Level of the notification. + type: string + message: + description: Message contains the notification. + type: string + metadata: + description: Metadata contains additional data. + type: object + x-kubernetes-preserve-unknown-fields: true + type: + description: Type of the notification. + type: string + type: object + type: array + version: + type: string + type: object + conditions: + description: Conditions of the resource. + items: + description: A Condition that may apply to a resource. + properties: + lastTransitionTime: + description: LastTransitionTime is the last time this condition + transitioned from one status to another. + format: date-time + type: string + message: + description: A Message containing details about this condition's + last transition from one status to another, if any. + type: string + reason: + description: A Reason for this condition's last transition from + one status to another. + type: string + status: + description: Status of this condition; is it currently True, + False, or Unknown? + type: string + type: + description: Type of this condition. At most one of each condition + type may apply to a resource at any point in time. + type: string + required: + - lastTransitionTime + - reason + - status + - type + type: object + type: array + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/package/webhook/manifests.yaml b/package/webhook/manifests.yaml index f254e4c8..ad9e86c8 100644 --- a/package/webhook/manifests.yaml +++ b/package/webhook/manifests.yaml @@ -45,6 +45,26 @@ webhooks: resources: - iamkeys sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-exoscale-crossplane-io-v1-kafka + failurePolicy: Fail + name: kafkas.exoscale.crossplane.io + rules: + - apiGroups: + - exoscale.crossplane.io + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - kafkas + sideEffects: None - admissionReviewVersions: - v1 clientConfig: diff --git a/samples/exoscale.crossplane.io_kafka.yaml b/samples/exoscale.crossplane.io_kafka.yaml new file mode 100644 index 00000000..bbb1289d --- /dev/null +++ b/samples/exoscale.crossplane.io_kafka.yaml @@ -0,0 +1,25 @@ +apiVersion: exoscale.crossplane.io/v1 +kind: Kafka +metadata: + creationTimestamp: null + name: kafka-local-dev +spec: + forProvider: + ipFilter: + - 0.0.0.0/0 + kafkaSettings: + connections_max_idle_ms: 60000 + maintenance: + dayOfWeek: monday + timeOfDay: "12:00:00" + size: + plan: startup-2 + zone: ch-dk-2 + providerConfigRef: + name: provider-config + writeConnectionSecretToRef: + name: kafka-local-dev-details + namespace: default +status: + atProvider: + kafkaSettings: null diff --git a/test/e2e/kafka/00-assert.yaml b/test/e2e/kafka/00-assert.yaml new file mode 100644 index 00000000..e8577df6 --- /dev/null +++ b/test/e2e/kafka/00-assert.yaml @@ -0,0 +1,48 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: exoscale.crossplane.io/v1 +kind: Kafka +metadata: + annotations: + crossplane.io/external-name: e2e-test-kafka + finalizers: + - finalizer.managedresource.crossplane.io + name: e2e-test-kafka +spec: + deletionPolicy: Delete + forProvider: + ipFilter: + - 0.0.0.0/0 + maintenance: + dayOfWeek: monday + timeOfDay: "12:00:00" + size: + plan: startup-2 + zone: ch-dk-2 + providerConfigRef: + name: provider-config + writeConnectionSecretToRef: + name: e2e-test-kafka-details + namespace: default +status: + atProvider: + nodeStates: + - state: running + - state: running + - state: running + conditions: + - status: "True" + - status: "True" +--- +apiVersion: v1 +kind: Secret +type: connection.crossplane.io/v1alpha1 +metadata: + name: e2e-test-kafka-details + namespace: default + ownerReferences: + - apiVersion: exoscale.crossplane.io/v1 + kind: Kafka + name: e2e-test-kafka diff --git a/test/e2e/kafka/00-install.yaml b/test/e2e/kafka/00-install.yaml new file mode 100644 index 00000000..fa751995 --- /dev/null +++ b/test/e2e/kafka/00-install.yaml @@ -0,0 +1,19 @@ +apiVersion: exoscale.crossplane.io/v1 +kind: Kafka +metadata: + name: e2e-test-kafka +spec: + forProvider: + ipFilter: + - 0.0.0.0/0 + maintenance: + dayOfWeek: monday + timeOfDay: "12:00:00" + size: + plan: startup-2 + zone: ch-dk-2 + providerConfigRef: + name: provider-config + writeConnectionSecretToRef: + name: e2e-test-kafka-details + namespace: default diff --git a/test/e2e/kafka/01-assert.yaml b/test/e2e/kafka/01-assert.yaml new file mode 100644 index 00000000..f1c4f5f2 --- /dev/null +++ b/test/e2e/kafka/01-assert.yaml @@ -0,0 +1,11 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: connect-kafka + namespace: default +status: + conditions: + - type: Complete + status: 'True' + succeeded: 1 + ready: 0 diff --git a/test/e2e/kafka/01-connect.yaml b/test/e2e/kafka/01-connect.yaml new file mode 100644 index 00000000..53c3d83c --- /dev/null +++ b/test/e2e/kafka/01-connect.yaml @@ -0,0 +1,52 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: e2e-test-kafka-config + namespace: default +data: + kaf.config: | + clusters: + - name: test + TLS: + cafile: /.kafka/certs/ca.crt + clientfile: /.kafka/certs/service.cert + clientkeyfile: /.kafka/certs/service.key + insecure: false +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: connect-kafka + namespace: default +spec: + backoffLimit: 5 + template: + metadata: + labels: + e2e-test: kafka + spec: + restartPolicy: Never + containers: + - name: connect + image: golang:1.19 + imagePullPolicy: IfNotPresent + command: + - bash + args: + - -c + - echo "Testing create topic...\n" && go install github.com/birdayz/kaf/cmd/kaf@v0.2.3 && kaf -b $KAFKA_URI --config /.kafka/config/kaf.config -c test topic create test + envFrom: + - secretRef: + name: e2e-test-kafka-details + volumeMounts: + - name: certs + mountPath: /.kafka/certs + - name: config + mountPath: /.kafka/config + volumes: + - name: certs + secret: + secretName: e2e-test-kafka-details + - name: config + configMap: + name: e2e-test-kafka-config diff --git a/test/e2e/kafka/02-delete.yaml b/test/e2e/kafka/02-delete.yaml new file mode 100644 index 00000000..35ca6153 --- /dev/null +++ b/test/e2e/kafka/02-delete.yaml @@ -0,0 +1,14 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +delete: + # This will wait until resources are really gone + - apiVersion: batch/v1 + kind: Job + name: connect-kafka + - apiVersion: v1 + kind: Pod + labels: + e2e-test: kafka + - apiVersion: exoscale.crossplane.io/v1 + kind: kafka + name: e2e-test-kafka