diff --git a/pkg/resource/domain/hooks.go b/pkg/resource/domain/hooks.go index ea17cf2..612aa17 100644 --- a/pkg/resource/domain/hooks.go +++ b/pkg/resource/domain/hooks.go @@ -16,6 +16,8 @@ package domain import ( "context" "errors" + "fmt" + "strings" "github.com/aws-controllers-k8s/opensearchservice-controller/apis/v1alpha1" ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1" @@ -35,6 +37,7 @@ var ( errors.New("domain is currently processing changes, cannot be modified or deleted"), ackrequeue.DefaultRequeueAfterDuration, ) + noAutoTuneInstances = []string{"t2", "t3"} ) // domainProcessing returns true if the supplied domain is in a state of @@ -46,6 +49,35 @@ func domainProcessing(r *resource) bool { return *r.ko.Status.Processing } +func isAutoTuneOptionReady(autoTuneOption *svcsdktypes.AutoTuneOptionsOutput) (bool, error) { + switch autoTuneOption.State { + case svcsdktypes.AutoTuneStateEnabled, svcsdktypes.AutoTuneStateDisabled: + return true, nil + + case svcsdktypes.AutoTuneStateError: + if autoTuneOption.ErrorMessage != nil { + return false, fmt.Errorf("error: %s", *autoTuneOption.ErrorMessage) + } + return false, fmt.Errorf("there is an error when updating AutoTuneOptions") + + default: + return false, nil + } +} + +// isAutoTuneSupported returns true if instance type supports AutoTune +// https://docs.aws.amazon.com/opensearch-service/latest/developerguide/supported-instance-types.html +func isAutoTuneSupported(r *resource) bool { + if r.ko.Spec.ClusterConfig != nil && r.ko.Spec.ClusterConfig.InstanceType != nil { + for _, v := range noAutoTuneInstances { + if strings.HasPrefix(*r.ko.Spec.ClusterConfig.InstanceType, v) { + return false + } + } + } + return true +} + func (rm *resourceManager) customUpdateDomain(ctx context.Context, desired, latest *resource, delta *ackcompare.Delta) (updated *resource, err error) { rlog := ackrtlog.FromContext(ctx) @@ -185,6 +217,7 @@ func (rm *resourceManager) customUpdateDomain(ctx context.Context, desired, late } ko.Spec.AutoTuneOptions = &v1alpha1.AutoTuneOptionsInput{ DesiredState: aws.String(string(resp.DomainConfig.AutoTuneOptions.Options.DesiredState)), + UseOffPeakWindow: resp.DomainConfig.AutoTuneOptions.Options.UseOffPeakWindow, MaintenanceSchedules: maintSchedules, } } else { @@ -205,11 +238,18 @@ func (rm *resourceManager) customUpdateDomain(ctx context.Context, desired, late } } ko.Spec.ClusterConfig = &v1alpha1.ClusterConfig{ - ColdStorageOptions: csOptions, - DedicatedMasterEnabled: resp.DomainConfig.ClusterConfig.Options.DedicatedMasterEnabled, - WarmEnabled: resp.DomainConfig.ClusterConfig.Options.WarmEnabled, - ZoneAwarenessConfig: zaConfig, - ZoneAwarenessEnabled: resp.DomainConfig.ClusterConfig.Options.ZoneAwarenessEnabled, + ColdStorageOptions: csOptions, + DedicatedMasterCount: int64OrNil(resp.DomainConfig.ClusterConfig.Options.DedicatedMasterCount), + DedicatedMasterEnabled: resp.DomainConfig.ClusterConfig.Options.DedicatedMasterEnabled, + DedicatedMasterType: aws.String(string(resp.DomainConfig.ClusterConfig.Options.DedicatedMasterType)), + InstanceCount: int64OrNil(resp.DomainConfig.ClusterConfig.Options.InstanceCount), + InstanceType: aws.String(string(resp.DomainConfig.ClusterConfig.Options.InstanceType)), + WarmCount: int64OrNil(resp.DomainConfig.ClusterConfig.Options.WarmCount), + WarmEnabled: resp.DomainConfig.ClusterConfig.Options.WarmEnabled, + WarmType: aws.String(string(resp.DomainConfig.ClusterConfig.Options.WarmType)), + ZoneAwarenessConfig: zaConfig, + ZoneAwarenessEnabled: resp.DomainConfig.ClusterConfig.Options.ZoneAwarenessEnabled, + MultiAZWithStandbyEnabled: resp.DomainConfig.ClusterConfig.Options.MultiAZWithStandbyEnabled, } if resp.DomainConfig.ClusterConfig.Options.DedicatedMasterCount != nil { ko.Spec.ClusterConfig.DedicatedMasterCount = aws.Int64(int64(*resp.DomainConfig.ClusterConfig.Options.DedicatedMasterCount)) @@ -285,6 +325,11 @@ func (rm *resourceManager) customUpdateDomain(ctx context.Context, desired, late } else { ko.Spec.EngineVersion = nil } + if resp.DomainConfig.IPAddressType != nil { + ko.Spec.IPAddressType = aws.String(string(resp.DomainConfig.IPAddressType.Options)) + } else { + ko.Spec.IPAddressType = nil + } if resp.DomainConfig.NodeToNodeEncryptionOptions != nil { ko.Spec.NodeToNodeEncryptionOptions = &v1alpha1.NodeToNodeEncryptionOptions{ Enabled: resp.DomainConfig.NodeToNodeEncryptionOptions.Options.Enabled, @@ -292,6 +337,41 @@ func (rm *resourceManager) customUpdateDomain(ctx context.Context, desired, late } else { ko.Spec.NodeToNodeEncryptionOptions = nil } + if resp.DomainConfig.SoftwareUpdateOptions != nil { + ko.Spec.SoftwareUpdateOptions = &v1alpha1.SoftwareUpdateOptions{ + AutoSoftwareUpdateEnabled: resp.DomainConfig.SoftwareUpdateOptions.Options.AutoSoftwareUpdateEnabled, + } + } else { + ko.Spec.SoftwareUpdateOptions = nil + } + if resp.DomainConfig.AIMLOptions != nil && resp.DomainConfig.AIMLOptions.Options != nil { + if resp.DomainConfig.AIMLOptions.Options.NaturalLanguageQueryGenerationOptions != nil { + ko.Spec.AIMLOptions = &v1alpha1.AIMLOptionsInput{ + NATuralLanguageQueryGenerationOptions: &v1alpha1.NATuralLanguageQueryGenerationOptionsInput{ + DesiredState: aws.String(string(resp.DomainConfig.AIMLOptions.Options.NaturalLanguageQueryGenerationOptions.DesiredState)), + }, + } + } + } else { + ko.Spec.AIMLOptions = nil + } + if resp.DomainConfig.OffPeakWindowOptions != nil && resp.DomainConfig.OffPeakWindowOptions.Options != nil { + var offPeakWindow *v1alpha1.OffPeakWindow + if resp.DomainConfig.OffPeakWindowOptions.Options.OffPeakWindow != nil { + offPeakWindow = &v1alpha1.OffPeakWindow{ + WindowStartTime: &v1alpha1.WindowStartTime{ + Hours: aws.Int64(resp.DomainConfig.OffPeakWindowOptions.Options.OffPeakWindow.WindowStartTime.Hours), + Minutes: aws.Int64(resp.DomainConfig.OffPeakWindowOptions.Options.OffPeakWindow.WindowStartTime.Minutes), + }, + } + } + ko.Spec.OffPeakWindowOptions = &v1alpha1.OffPeakWindowOptions{ + Enabled: resp.DomainConfig.OffPeakWindowOptions.Options.Enabled, + OffPeakWindow: offPeakWindow, + } + } else { + ko.Spec.OffPeakWindowOptions = nil + } rm.setStatusDefaults(ko) @@ -401,6 +481,9 @@ func (rm *resourceManager) newCustomUpdateRequestPayload( if desired.ko.Spec.AutoTuneOptions.DesiredState != nil { f3.DesiredState = svcsdktypes.AutoTuneDesiredState(*desired.ko.Spec.AutoTuneOptions.DesiredState) } + if desired.ko.Spec.AutoTuneOptions.UseOffPeakWindow != nil { + f3.UseOffPeakWindow = desired.ko.Spec.AutoTuneOptions.UseOffPeakWindow + } if desired.ko.Spec.AutoTuneOptions.MaintenanceSchedules != nil { f3f1 := []svcsdktypes.AutoTuneMaintenanceSchedule{} for _, f3f1iter := range desired.ko.Spec.AutoTuneOptions.MaintenanceSchedules { @@ -471,6 +554,9 @@ func (rm *resourceManager) newCustomUpdateRequestPayload( if desired.ko.Spec.ClusterConfig.ZoneAwarenessEnabled != nil { f4.ZoneAwarenessEnabled = desired.ko.Spec.ClusterConfig.ZoneAwarenessEnabled } + if desired.ko.Spec.ClusterConfig.MultiAZWithStandbyEnabled != nil { + f4.MultiAZWithStandbyEnabled = desired.ko.Spec.ClusterConfig.MultiAZWithStandbyEnabled + } res.ClusterConfig = f4 } @@ -586,5 +672,59 @@ func (rm *resourceManager) newCustomUpdateRequestPayload( res.VPCOptions = f14 } + if desired.ko.Spec.IPAddressType != nil && delta.DifferentAt("Spec.IPAddressType") { + res.IPAddressType = svcsdktypes.IPAddressType(*desired.ko.Spec.IPAddressType) + } + + if desired.ko.Spec.SoftwareUpdateOptions != nil && delta.DifferentAt("Spec.SoftwareUpdateOptions") { + f15 := &svcsdktypes.SoftwareUpdateOptions{} + if desired.ko.Spec.SoftwareUpdateOptions.AutoSoftwareUpdateEnabled != nil { + f15.AutoSoftwareUpdateEnabled = desired.ko.Spec.SoftwareUpdateOptions.AutoSoftwareUpdateEnabled + } + res.SoftwareUpdateOptions = f15 + } + + if desired.ko.Spec.AIMLOptions != nil && delta.DifferentAt("Spec.AIMLOptions") { + f16 := &svcsdktypes.AIMLOptionsInput{} + if desired.ko.Spec.AIMLOptions.NATuralLanguageQueryGenerationOptions != nil { + f16f0 := &svcsdktypes.NaturalLanguageQueryGenerationOptionsInput{} + if desired.ko.Spec.AIMLOptions.NATuralLanguageQueryGenerationOptions.DesiredState != nil { + f16f0.DesiredState = svcsdktypes.NaturalLanguageQueryGenerationDesiredState(*desired.ko.Spec.AIMLOptions.NATuralLanguageQueryGenerationOptions.DesiredState) + } + f16.NaturalLanguageQueryGenerationOptions = f16f0 + } + res.AIMLOptions = f16 + } + + if desired.ko.Spec.OffPeakWindowOptions != nil && delta.DifferentAt("Spec.OffPeakWindowOptions") { + f17 := &svcsdktypes.OffPeakWindowOptions{} + if desired.ko.Spec.OffPeakWindowOptions.Enabled != nil { + f17.Enabled = desired.ko.Spec.OffPeakWindowOptions.Enabled + } + if desired.ko.Spec.OffPeakWindowOptions.OffPeakWindow != nil { + f17f1 := &svcsdktypes.OffPeakWindow{} + if desired.ko.Spec.OffPeakWindowOptions.OffPeakWindow.WindowStartTime != nil { + f17f1f1 := &svcsdktypes.WindowStartTime{} + if desired.ko.Spec.OffPeakWindowOptions.OffPeakWindow.WindowStartTime.Hours != nil { + f17f1f1.Hours = *desired.ko.Spec.OffPeakWindowOptions.OffPeakWindow.WindowStartTime.Hours + } + if desired.ko.Spec.OffPeakWindowOptions.OffPeakWindow.WindowStartTime.Minutes != nil { + f17f1f1.Minutes = *desired.ko.Spec.OffPeakWindowOptions.OffPeakWindow.WindowStartTime.Minutes + } + f17f1.WindowStartTime = f17f1f1 + } + f17.OffPeakWindow = f17f1 + } + res.OffPeakWindowOptions = f17 + } + return res, nil } + +func int64OrNil(num *int32) *int64 { + if num == nil { + return nil + } + + return aws.Int64(int64(*num)) +} diff --git a/pkg/resource/domain/sdk.go b/pkg/resource/domain/sdk.go index d28962f..2607d86 100644 --- a/pkg/resource/domain/sdk.go +++ b/pkg/resource/domain/sdk.go @@ -535,6 +535,15 @@ func (rm *resourceManager) sdkFind( } rm.setStatusDefaults(ko) + if resp.DomainStatus.AutoTuneOptions != nil { + if ready, err := isAutoTuneOptionReady(resp.DomainStatus.AutoTuneOptions); err != nil { + return latest, ackrequeue.Needed(err) + } else if !ready { + return latest, ackrequeue.Needed(fmt.Errorf("waiting for AutotuneOptions to sync. Current state: ", resp.DomainStatus.AutoTuneOptions.State)) + } + ko.Spec.AutoTuneOptions.DesiredState = aws.String(string(resp.DomainStatus.AutoTuneOptions.State)) + } + if domainProcessing(&resource{ko}) { // Setting resource synced condition to false will trigger a requeue of // the resource. No need to return a requeue error here. @@ -1042,6 +1051,14 @@ func (rm *resourceManager) sdkCreate( } rm.setStatusDefaults(ko) + if resp.DomainStatus.AutoTuneOptions != nil { + if strings.HasPrefix(string(resp.DomainStatus.AutoTuneOptions.State), "ENABLE") { + ko.Spec.AutoTuneOptions.DesiredState = aws.String(string(svcsdktypes.AutoTuneStateEnabled)) + } else { + ko.Spec.AutoTuneOptions.DesiredState = aws.String(string(svcsdktypes.AutoTuneStateDisabled)) + } + } + if domainProcessing(&resource{ko}) { // Setting resource synced condition to false will trigger a requeue of // the resource. No need to return a requeue error here. diff --git a/templates/hooks/domain/sdk_create_post_set_output.go.tpl b/templates/hooks/domain/sdk_create_post_set_output.go.tpl index 95f6342..07a2e26 100644 --- a/templates/hooks/domain/sdk_create_post_set_output.go.tpl +++ b/templates/hooks/domain/sdk_create_post_set_output.go.tpl @@ -1,3 +1,11 @@ + if resp.DomainStatus.AutoTuneOptions != nil { + if strings.HasPrefix(string(resp.DomainStatus.AutoTuneOptions.State), "ENABLE") { + ko.Spec.AutoTuneOptions.DesiredState = aws.String(string(svcsdktypes.AutoTuneStateEnabled)) + } else { + ko.Spec.AutoTuneOptions.DesiredState = aws.String(string(svcsdktypes.AutoTuneStateDisabled)) + } + } + if domainProcessing(&resource{ko}) { // Setting resource synced condition to false will trigger a requeue of // the resource. No need to return a requeue error here. diff --git a/templates/hooks/domain/sdk_read_one_post_set_output.go.tpl b/templates/hooks/domain/sdk_read_one_post_set_output.go.tpl index 9739e50..2f494fd 100644 --- a/templates/hooks/domain/sdk_read_one_post_set_output.go.tpl +++ b/templates/hooks/domain/sdk_read_one_post_set_output.go.tpl @@ -1,3 +1,12 @@ + if resp.DomainStatus.AutoTuneOptions != nil { + if ready, err := isAutoTuneOptionReady(resp.DomainStatus.AutoTuneOptions); err != nil { + return latest, ackrequeue.Needed(err) + } else if !ready { + return latest, ackrequeue.Needed(fmt.Errorf("waiting for AutotuneOptions to sync. Current state: ", resp.DomainStatus.AutoTuneOptions.State)) + } + ko.Spec.AutoTuneOptions.DesiredState = aws.String(string(resp.DomainStatus.AutoTuneOptions.State)) + } + if domainProcessing(&resource{ko}) { // Setting resource synced condition to false will trigger a requeue of // the resource. No need to return a requeue error here. diff --git a/test/e2e/tests/test_domain.py b/test/e2e/tests/test_domain.py index f8972fd..51fb9e1 100644 --- a/test/e2e/tests/test_domain.py +++ b/test/e2e/tests/test_domain.py @@ -200,7 +200,7 @@ def es_2d3m_multi_az_vpc_2_subnet7_9_domain(os_client, resources: BootstrapResou ) k8s.create_custom_resource(ref, resource_data) k8s.wait_resource_consumed_by_controller(ref) - condition.assert_not_synced(ref) + assert k8s.wait_on_condition(ref, "ACK.ResourceSynced", "True", wait_periods=5) domain.wait_until(ref.name, domain.processing_matches(False)) @@ -280,6 +280,60 @@ def test_create_delete_es_2d3m_multi_az_no_vpc_7_9(self, es_2d3m_multi_az_no_vpc assert cr is not None assert 'status' in cr domain.assert_endpoint(cr) + print("before:", domain.get(resource.name)) + + # modify some cluster parameters to test updates + updates = { + "spec": { + "softwareUpdateOptions": { + "autoSoftwareUpdateEnabled": True + }, + "offPeakWindowOptions": { + "enabled": True, + "offPeakWindow": { + "windowStartTime": { + "hours": 23, + "minutes": 30 + } + } + } + }, + } + # updates = { + # "spec": { + # "AutoTuneOptions": { + # "UseOffPeakWindow": False + # }, + # "ClusterConfig": { + # "MultiAZWithStandbyEnabled": False + # }, + # "OffPeakWindowOptions": { + # "Enabled": True, + # "OffPeakWindow": { + # "WindowStartTime": { + # "Hours": 23, + # "Minutes": 30 + # } + # } + # }, + # "SoftwareUpdateOptions": { + # "AutoSoftwareUpdateEnabled": True + # } + # } + # } + k8s.patch_custom_resource(ref, updates) + time.sleep(CHECK_STATUS_WAIT_SECONDS) + print("after check wait:", domain.get(resource.name)) + assert k8s.wait_on_condition(ref, "ACK.ResourceSynced", "True", wait_periods=30) + latest = domain.get(resource.name) + print("latest:", latest) + + # assert latest['DomainStatus']['AutoTuneOptions']['UseOffPeakWindow'] is False + # assert latest['DomainStatus']['ClusterConfig']['MultiAZWithStandbyEnabled'] is False + assert latest['DomainStatus']['OffPeakWindowOptions']["Enabled"] is True + assert latest['DomainStatus']['OffPeakWindowOptions']["OffPeakWindow"]["WindowStartTime"]["Hours"] == 23 + assert latest['DomainStatus']['OffPeakWindowOptions']["OffPeakWindow"]["WindowStartTime"]["Minutes"] == 30 + assert latest['DomainStatus']['SoftwareUpdateOptions']["AutoSoftwareUpdateEnabled"] is True def test_create_delete_es_2d3m_multi_az_vpc_2_subnet7_9(self, es_2d3m_multi_az_vpc_2_subnet7_9_domain): ref, resource = es_2d3m_multi_az_vpc_2_subnet7_9_domain