From ec6428d4310da13087ac0546885c7a31e29a1356 Mon Sep 17 00:00:00 2001 From: rkthtrifork <131661717+rkthtrifork@users.noreply.github.com> Date: Thu, 22 Aug 2024 15:18:17 +0200 Subject: [PATCH] Rewrote ISM Policy reconciler (#846) ### Description The ISM Policy reconciler was constantly trying to update the ISM Policy and it was not handling reconciliation requeue in some cases. There were possibly other issues as well. Below I have described what caused the different issues I encountered - The ISM Policy request was different from the response, but they were both made with the same struct. This caused the reconciler to always see the existing ISM Policy and the ISM Policy from the CR as different and try to update it. I have created a separate struct model for each to separate the logic and in the code I now compare the existing policy with the policy from the CR by comparing both the Policy IDs and the policy spec - There were some very complex cases in the code that were very difficult to understand so I have attempted to make the code more concise and easy to read and understand - I have added reconciliation requeuing to all cases so the operator doesn't just stop reconciling the ISM Policy in some cases One thing I am wondering is that I am not sure why we would want to create a CR without specifying the cluster ID and then the operator automatically links it to that cluster ID so it breaks if the OpenSearch CR is deleted. Is this intended and why? I'm talking about the section with the comment "Check cluster ref has not changed" Tested cases: - A new ISM Policy is created through a CR and the operator creates it in the OpenSearch Cluster - The CR for an ISM Policy that is created by the operator is removed and the operator removes it in the OpenSearch Cluster - An ISM Policy that already exists in the OpenSearch Cluster is created through a CR and the operator ignores it and marks it as existing - The CR for an ISM Policy that was pre-existing and therefore was not created by the operator is removed and the operator does not remove the ISM Policy from the OpenSearch Cluster - An ISM Policy that already exists in the OpenSearch Cluster is created through a CR and the operator ignores it and marks it as existing. The ISM Policy is then manually removed from the OpenSearch Cluster and the operator now applies the ISM Policy from the CR The test for ISM Policies is currently failing miserably, but I decided to create the PR to get feedback before I dive into fixing it. ### Issues Resolved https://github.com/opensearch-project/opensearch-k8s-operator/issues/833 https://github.com/opensearch-project/opensearch-k8s-operator/issues/732 Possibly other issues ### Check List - [x] Commits are signed per the DCO using --signoff - [x] Unittest added for the new/changed functionality and all unit tests are successful - [x] Customer-visible features documented - [x] No linter warnings (`make lint`) If CRDs are changed: - [ ] CRD YAMLs updated (`make manifests`) and also copied into the helm chart - [ ] Changes to CRDs documented Please refer to the [PR guidelines](https://github.com/opensearch-project/opensearch-k8s-operator/blob/main/docs/developing.md#submitting-a-pr) before submitting this pull request. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). Signed-off-by: rkthtrifork --- .../opensearch-gateway/requests/IsmPolicy.go | 9 +- .../responses/ISMPolicyResponse.go | 5 - .../opensearch-gateway/responses/IsmPolicy.go | 10 + .../services/os_ism_service.go | 33 +- .../pkg/reconcilers/ismpolicy.go | 285 +++++++------ .../pkg/reconcilers/ismpolicy_test.go | 398 +++++++++--------- .../pkg/reconcilers/reconcilers.go | 16 +- 7 files changed, 383 insertions(+), 373 deletions(-) delete mode 100644 opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go create mode 100644 opensearch-operator/opensearch-gateway/responses/IsmPolicy.go diff --git a/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go b/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go index 2f820a0a..023fc79e 100644 --- a/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go +++ b/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go @@ -1,14 +1,11 @@ package requests -type Policy struct { - PolicyID string `json:"_id,omitempty"` - PrimaryTerm *int `json:"_primary_term,omitempty"` - SequenceNumber *int `json:"_seq_no,omitempty"` - Policy ISMPolicy `json:"policy"` +type ISMPolicy struct { + Policy ISMPolicySpec `json:"policy"` } // ISMPolicySpec is the specification for the ISM policy for OS. -type ISMPolicy struct { +type ISMPolicySpec struct { // The default starting state for each index that uses this policy. DefaultState string `json:"default_state"` // A human-readable description of the policy. diff --git a/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go b/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go deleted file mode 100644 index 753314cc..00000000 --- a/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go +++ /dev/null @@ -1,5 +0,0 @@ -package responses - -import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" - -type GetISMPoliciesResponse requests.Policy diff --git a/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go b/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go new file mode 100644 index 00000000..29174466 --- /dev/null +++ b/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go @@ -0,0 +1,10 @@ +package responses + +import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" + +type GetISMPolicyResponse struct { + PolicyID string `json:"_id"` + PrimaryTerm int `json:"_primary_term"` + SequenceNumber int `json:"_seq_no"` + Policy requests.ISMPolicySpec +} diff --git a/opensearch-operator/opensearch-gateway/services/os_ism_service.go b/opensearch-operator/opensearch-gateway/services/os_ism_service.go index d82c0e90..040e2e26 100644 --- a/opensearch-operator/opensearch-gateway/services/os_ism_service.go +++ b/opensearch-operator/opensearch-gateway/services/os_ism_service.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" + "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/responses" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/opensearch-project/opensearch-go/opensearchutil" @@ -16,7 +17,7 @@ import ( var ErrNotFound = errors.New("policy not found") // ShouldUpdateISMPolicy checks if the passed policy is same as existing or needs update -func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.Policy) (bool, error) { +func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.ISMPolicy) (bool, error) { if cmp.Equal(newPolicy, existingPolicy, cmpopts.EquateEmpty()) { return false, nil } @@ -27,23 +28,8 @@ func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy reques return true, nil } -// PolicyExists checks if the passed policy already exists or not -func PolicyExists(ctx context.Context, service *OsClusterClient, policyName string) (bool, error) { - resp, err := service.GetISMConfig(ctx, policyName) - if err != nil { - return false, err - } - defer resp.Body.Close() - if resp.StatusCode == 404 { - return false, nil - } else if resp.IsError() { - return false, fmt.Errorf("response from API is %s", resp.Status()) - } - return true, nil -} - // GetPolicy fetches the passed policy -func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*requests.Policy, error) { +func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*responses.GetISMPolicyResponse, error) { resp, err := service.GetISMConfig(ctx, policyName) if err != nil { return nil, err @@ -51,10 +37,11 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) defer resp.Body.Close() if resp.StatusCode == 404 { return nil, ErrNotFound - } else if resp.IsError() { + } + if resp.IsError() { return nil, fmt.Errorf("response from API is %s", resp.Status()) } - ismResponse := requests.Policy{} + ismResponse := responses.GetISMPolicyResponse{} if resp != nil && resp.Body != nil { err := json.NewDecoder(resp.Body).Decode(&ismResponse) if err != nil { @@ -66,7 +53,7 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) } // CreateISMPolicy creates the passed policy -func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, policyId string) error { +func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, policyId string) error { spec := opensearchutil.NewJSONReader(ismpolicy) resp, err := service.PutISMConfig(ctx, policyId, spec) if err != nil { @@ -80,15 +67,15 @@ func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy re } // UpdateISMPolicy updates the given policy -func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, seqno, primterm *int, policyName string) error { +func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, seqno, primterm *int, policyId string) error { spec := opensearchutil.NewJSONReader(ismpolicy) - resp, err := service.UpdateISMConfig(ctx, policyName, *seqno, *primterm, spec) + resp, err := service.UpdateISMConfig(ctx, policyId, *seqno, *primterm, spec) if err != nil { return err } defer resp.Body.Close() if resp.IsError() { - return fmt.Errorf("failed to create ism policy: %s", resp.String()) + return fmt.Errorf("failed to update ism policy: %s", resp.String()) } return nil } diff --git a/opensearch-operator/pkg/reconcilers/ismpolicy.go b/opensearch-operator/pkg/reconcilers/ismpolicy.go index f2954d45..8b757422 100644 --- a/opensearch-operator/pkg/reconcilers/ismpolicy.go +++ b/opensearch-operator/pkg/reconcilers/ismpolicy.go @@ -13,6 +13,8 @@ import ( "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/util" "github.com/cisco-open/operator-tools/pkg/reconciler" "github.com/go-logr/logr" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" @@ -22,7 +24,10 @@ import ( ) const ( - ismPolicyExists = "ism policy already exists in Opensearch" + opensearchIsmPolicyExists = "ISM Policy already exists in Opensearch" + opensearchIsmPolicyNameMismatch = "OpensearchISMPolicyNameMismatch" + opensearchClusterRequeueAfter = 10 * time.Second + defaultRequeueAfter = 30 * time.Second ) type IsmPolicyReconciler struct { @@ -58,6 +63,7 @@ func NewIsmReconciler( func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) { var reason string var policyId string + defer func() { if !pointer.BoolDeref(r.updateStatus, true) { return @@ -71,24 +77,23 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) instance.Status.State = opsterv1.OpensearchISMPolicyError } // Requeue after is 10 seconds if waiting for OpenSearch cluster - if retResult.Requeue && retResult.RequeueAfter == 10*time.Second { + if retResult.Requeue && retResult.RequeueAfter == opensearchClusterRequeueAfter { instance.Status.State = opsterv1.OpensearchISMPolicyPending } - // Requeue is after 30 seconds for normal reconciliation after creation/update - if retErr == nil && retResult.RequeueAfter == 30*time.Second { + if retErr == nil && retResult.Requeue { instance.Status.State = opsterv1.OpensearchISMPolicyCreated instance.Status.PolicyId = policyId } - if reason == ismPolicyExists { + if reason == opensearchIsmPolicyExists { instance.Status.State = opsterv1.OpensearchISMPolicyIgnored } }) + if err != nil { r.logger.Error(err, "failed to update status") } }() - var err error r.cluster, retErr = util.FetchOpensearchCluster(r.client, r.ctx, types.NamespacedName{ Name: r.instance.Spec.OpensearchRef.Name, Namespace: r.instance.Namespace, @@ -97,167 +102,184 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) reason = "error fetching opensearch cluster" r.logger.Error(retErr, "failed to fetch opensearch cluster") r.recorder.Event(r.instance, "Warning", opensearchError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } if r.cluster == nil { r.logger.Info("opensearch cluster does not exist, requeueing") reason = "waiting for opensearch cluster to exist" r.recorder.Event(r.instance, "Normal", opensearchPending, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 10 * time.Second, - } - return + RequeueAfter: opensearchClusterRequeueAfter, + }, nil } + // Check cluster ref has not changed - if r.instance.Status.ManagedCluster != nil { - if *r.instance.Status.ManagedCluster != r.cluster.UID { - reason = "cannot change the cluster a role refers to" - retErr = fmt.Errorf("%s", reason) - r.recorder.Event(r.instance, "Warning", opensearchRefMismatch, reason) - return - } - } else { - if pointer.BoolDeref(r.updateStatus, true) { - retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { - instance := object.(*opsterv1.OpenSearchISMPolicy) - instance.Status.ManagedCluster = &r.cluster.UID - }) - if retErr != nil { - reason = fmt.Sprintf("failed to update status: %s", retErr) - r.recorder.Event(r.instance, "Warning", statusError, reason) - return - } + managedCluster := r.instance.Status.ManagedCluster + if managedCluster != nil && *managedCluster != r.cluster.UID { + reason = "cannot change the cluster a resource refers to" + retErr = fmt.Errorf("%s", reason) + r.recorder.Event(r.instance, "Warning", opensearchRefMismatch, reason) + return ctrl.Result{ + Requeue: false, + }, retErr + } + + if pointer.BoolDeref(r.updateStatus, true) { + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ManagedCluster = &r.cluster.UID + }) + if retErr != nil { + reason = fmt.Sprintf("failed to update status: %s", retErr) + r.recorder.Event(r.instance, "Warning", statusError, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } } + // Check cluster is ready if r.cluster.Status.Phase != opsterv1.PhaseRunning { r.logger.Info("opensearch cluster is not running, requeueing") reason = "waiting for opensearch cluster status to be running" r.recorder.Event(r.instance, "Normal", opensearchPending, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 10 * time.Second, - } - return + RequeueAfter: opensearchClusterRequeueAfter, + }, nil } - r.osClient, err = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) - if err != nil { - reason := "error creating opensearch client" + r.osClient, retErr = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) + if retErr != nil { + reason = "error creating opensearch client" r.recorder.Event(r.instance, "Warning", opensearchError, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 30 * time.Second, - } - retErr = err - return - } - - // If PolicyID not provided explicitly, use metadata.name by default - policyId = r.instance.Spec.PolicyID - if r.instance.Spec.PolicyID == "" { - policyId = r.instance.Name - } - // Check ism policy state to make sure we don't touch preexisting ism policy - if r.instance.Status.ExistingISMPolicy == nil { - var exists bool - exists, retErr = services.PolicyExists(r.ctx, r.osClient, policyId) - if retErr != nil { - reason = "failed to get policy status from Opensearch API" - r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return - } - if pointer.BoolDeref(r.updateStatus, true) { - retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { - instance := object.(*opsterv1.OpenSearchISMPolicy) - instance.Status.ExistingISMPolicy = &exists - }) - if retErr != nil { - reason = fmt.Sprintf("failed to update status: %s", retErr) - r.recorder.Event(r.instance, "Warning", statusError, reason) - return - } - } else { - // Emit an event for unit testing assertion - r.recorder.Event(r.instance, "Normal", "UnitTest", fmt.Sprintf("exists is %t", exists)) - return - } + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } - // If ism policy is existing do nothing - if *r.instance.Status.ExistingISMPolicy { - reason = ismPolicyExists - return + // If PolicyID is not provided explicitly, use metadata.name by default + policyId = r.instance.Name + if r.instance.Spec.PolicyID != "" { + policyId = r.instance.Spec.PolicyID } - ismpolicy, retErr := r.CreateISMPolicyRequest() + newPolicy, retErr := r.CreateISMPolicy() if retErr != nil { - reason = "failed to get create the ism policy request" r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } existingPolicy, retErr := services.GetPolicy(r.ctx, r.osClient, policyId) - if retErr != nil && retErr != services.ErrNotFound { - reason = "failed to get policy from Opensearch API" - r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return - } + // If not exists, create if errors.Is(retErr, services.ErrNotFound) { - r.logger.V(1).Info(fmt.Sprintf("policy %s not found, creating.", r.instance.Spec.PolicyID)) - retErr = services.CreateISMPolicy(r.ctx, r.osClient, *ismpolicy, policyId) + request := requests.ISMPolicy{ + Policy: *newPolicy, + } + retErr = services.CreateISMPolicy(r.ctx, r.osClient, request, policyId) if retErr != nil { reason = "failed to create ism policy" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } - r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy created in opensearch") - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + // Mark the ISM Policy as not pre-existing (created by the operator) + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ExistingISMPolicy = pointer.Bool(false) + }) + if retErr != nil { + reason = "failed to update custom resource object" + r.logger.Error(retErr, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr + } + + r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy successfully created in OpenSearch Cluster") + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } - priterm := existingPolicy.PrimaryTerm - seqno := existingPolicy.SequenceNumber - // Reset - existingPolicy.PrimaryTerm = nil - existingPolicy.SequenceNumber = nil - shouldUpdate, retErr := services.ShouldUpdateISMPolicy(r.ctx, *ismpolicy, *existingPolicy) + + // If other error, report if retErr != nil { - reason = "failed to compare the policies" + reason = "failed to get the ism policy from Opensearch API" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } - if !shouldUpdate { - r.logger.V(1).Info(fmt.Sprintf("policy %s is in sync", r.instance.Spec.PolicyID)) - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + // If the ISM policy exists in OpenSearch cluster and was not created by the operator, update the status and return + if r.instance.Status.ExistingISMPolicy == nil || *r.instance.Status.ExistingISMPolicy { + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ExistingISMPolicy = pointer.Bool(true) + }) + if retErr != nil { + reason = "failed to update custom resource object" + r.logger.Error(retErr, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr + } + reason = "the ISM policy already exists in the OpenSearch cluster" + r.logger.Error(errors.New(opensearchIsmPolicyExists), reason) + r.recorder.Event(r.instance, "Warning", opensearchIsmPolicyExists, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } - // the policyId is immutable, so check the old name (r.instance.Status.PolicyId) against the new - if r.instance.Status.PolicyId != "" && policyId != r.instance.Status.PolicyId { - reason = "can't change PolicyID" - r.recorder.Event(r.instance, "Warning", opensearchError, reason) - return + // Return if there are no changes + if r.instance.Spec.PolicyID == existingPolicy.PolicyID && cmp.Equal(*newPolicy, existingPolicy.Policy, cmpopts.EquateEmpty()) { + r.logger.V(1).Info(fmt.Sprintf("user %s is in sync", r.instance.Name)) + r.recorder.Event(r.instance, "Normal", opensearchAPIUnchanged, "policy is in sync") + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil + } + request := requests.ISMPolicy{ + Policy: *newPolicy, } - retErr = services.UpdateISMPolicy(r.ctx, r.osClient, *ismpolicy, seqno, priterm, policyId) + retErr = services.UpdateISMPolicy(r.ctx, r.osClient, request, &existingPolicy.SequenceNumber, &existingPolicy.PrimaryTerm, existingPolicy.PolicyID) if retErr != nil { reason = "failed to update ism policy with Opensearch API" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy updated in opensearch") - - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } -func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) { - policy := requests.ISMPolicy{ +func (r *IsmPolicyReconciler) CreateISMPolicy() (*requests.ISMPolicySpec, error) { + policy := requests.ISMPolicySpec{ DefaultState: r.instance.Spec.DefaultState, Description: r.instance.Spec.Description, } @@ -378,35 +400,35 @@ func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) shrink.ForceUnsafe = action.Shrink.ForceUnsafe } if action.Shrink.MaxShardSize == nil && action.Shrink.NumNewShards == nil && action.Shrink.PercentageOfSourceShards == nil { - reason := "Either of MaxShardSize or NumNewShards or PercentageOfSourceShards is required" - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "either of MaxShardSize or NumNewShards or PercentageOfSourceShards is required" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } if action.Shrink.MaxShardSize != nil { if action.Shrink.NumNewShards == nil && action.Shrink.PercentageOfSourceShards == nil { shrink.MaxShardSize = action.Shrink.MaxShardSize } else { - reason := "MaxShardSize can't exist with NumNewShards or PercentageOfSourceShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "maxShardSize can't exist with NumNewShards or PercentageOfSourceShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } if action.Shrink.NumNewShards != nil { if action.Shrink.MaxShardSize == nil && action.Shrink.PercentageOfSourceShards == nil { shrink.NumNewShards = action.Shrink.NumNewShards } else { - reason := "NumNewShards can't exist with MaxShardSize or PercentageOfSourceShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "numNewShards can't exist with MaxShardSize or PercentageOfSourceShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } } if action.Shrink.PercentageOfSourceShards != nil { if action.Shrink.NumNewShards == nil && action.Shrink.MaxShardSize == nil { shrink.PercentageOfSourceShards = action.Shrink.PercentageOfSourceShards } else { - reason := "PercentageOfSourceShards can't exist with MaxShardSize or NumNewShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "percentageOfSourceShards can't exist with MaxShardSize or NumNewShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } } if action.Shrink.TargetIndexNameTemplate != nil { @@ -515,10 +537,8 @@ func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) policy.States = append(policy.States, requests.State{Actions: actions, Name: state.Name, Transitions: transitions}) } } - ismPolicy := requests.Policy{ - Policy: policy, - } - return &ismPolicy, nil + + return &policy, nil } // Delete ISM policy from the OS cluster @@ -527,10 +547,12 @@ func (r *IsmPolicyReconciler) Delete() error { if r.instance.Status.ExistingISMPolicy == nil { return nil } + if *r.instance.Status.ExistingISMPolicy { r.logger.Info("policy was pre-existing; not deleting") return nil } + var err error r.cluster, err = util.FetchOpensearchCluster(r.client, r.ctx, types.NamespacedName{ Name: r.instance.Spec.OpensearchRef.Name, @@ -544,15 +566,18 @@ func (r *IsmPolicyReconciler) Delete() error { // If the opensearch cluster doesn't exist, we don't need to delete anything return nil } + r.osClient, err = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) if err != nil { return err } + // If PolicyID not provided explicitly, use metadata.name by default policyId := r.instance.Spec.PolicyID - if r.instance.Spec.PolicyID == "" { + if policyId == "" { policyId = r.instance.Name } + err = services.DeleteISMPolicy(r.ctx, r.osClient, policyId) if err != nil { return err diff --git a/opensearch-operator/pkg/reconcilers/ismpolicy_test.go b/opensearch-operator/pkg/reconcilers/ismpolicy_test.go index 22351613..a7b9d290 100644 --- a/opensearch-operator/pkg/reconcilers/ismpolicy_test.go +++ b/opensearch-operator/pkg/reconcilers/ismpolicy_test.go @@ -21,7 +21,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -var seqno *int = new(int) var _ = Describe("ism policy reconciler", func() { var ( transport *httpmock.MockTransport @@ -117,7 +116,7 @@ var _ = Describe("ism policy reconciler", func() { recorder = record.NewFakeRecorder(1) mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) }) - It("should wait for the cluster to be running", func() { + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) @@ -140,6 +139,7 @@ var _ = Describe("ism policy reconciler", func() { cluster.Status.Phase = opsterv1.PhaseRunning cluster.Status.ComponentsStatus = []opsterv1.ComponentStatus{} mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) + recorder = record.NewFakeRecorder(1) transport.RegisterResponder( http.MethodGet, @@ -162,44 +162,73 @@ var _ = Describe("ism policy reconciler", func() { ) }) - When("existing status is true", func() { + When("cluster reference mismatch", func() { BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(true) + managedCluster := types.UID("different-uid") + instance.Status.ManagedCluster = &managedCluster }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) + It("should emit a unit test event and not requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).To(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s cannot change the cluster a resource refers to", opensearchRefMismatch))) }) }) - When("existing status is nil", func() { - var localExtraCalls = 4 + When("policy does not exist in opensearch", func() { BeforeEach(func() { - policyRequest := requests.ISMPolicy{ - DefaultState: "abc", - Description: "test", - } + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) - recorder = record.NewFakeRecorder(1) transport.RegisterResponder( http.MethodGet, fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", cluster.Spec.General.ServiceName, cluster.Namespace, + instance.Name, ), - httpmock.NewStringResponder(200, "OK").Times(4, failMessage), + httpmock.NewStringResponder(404, "Not Found").Once(), ) transport.RegisterResponder( - http.MethodHead, + http.MethodPut, fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", cluster.Spec.General.ServiceName, cluster.Namespace, + instance.Name, ), - httpmock.NewStringResponder(200, "OK").Times(2, failMessage), + httpmock.NewStringResponder(200, "OK").Once(), ) + }) + It("should create the policy, emit a unit test event, and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy successfully created in OpenSearch Cluster", opensearchAPIUpdated))) + }) + }) + + When("failed to get policy from opensearch api", func() { + BeforeEach(func() { transport.RegisterResponder( http.MethodGet, fmt.Sprintf( @@ -208,118 +237,61 @@ var _ = Describe("ism policy reconciler", func() { cluster.Namespace, instance.Name, ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - }).Then( - httpmock.NewStringResponder(404, "does not exist"), - ).Then( - httpmock.NewNotFoundResponder(failMessage), - ), + httpmock.NewErrorResponder(fmt.Errorf("failed to get policy")).Once(), ) }) - - It("should do nothing and emit a unit test event", func() { + It("should emit a unit test event, requeue, and return an error", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - _, err = reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - // Confirm all responders have been called - Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls + localExtraCalls)) + result, err := reconciler.Reconcile() + Expect(err).To(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) }() var events []string for msg := range recorder.Events { events = append(events, msg) } - Expect(len(events)).To(Equal(2)) - Expect(events[0]).To(Equal("Normal UnitTest exists is true")) - Expect(events[1]).To(Equal("Normal UnitTest exists is false")) + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s failed to get the ism policy from Opensearch API", opensearchAPIError))) }) }) - When("existing status is true", func() { + Context("policy exists in opensearch", func() { BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(true) - }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - }) - }) + instance.Spec.PolicyID = "test-policy-id" - When("existing status is false", func() { - BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(false) + transport.RegisterResponder( + http.MethodGet, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Spec.PolicyID, + ), + httpmock.NewJsonResponderOrPanic(200, responses.GetISMPolicyResponse{ + PolicyID: "test-policy-id", + Policy: requests.ISMPolicySpec{ + DefaultState: "test-state", + Description: "test-policy", + }, + }).Once(), + ) }) - When("policy exists in opensearch and is the same", func() { + When("existing status is nil", func() { BeforeEach(func() { - policyRequest := requests.ISMPolicy{ - DefaultState: "", - Description: "", - } - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - SequenceNumber: seqno, - PrimaryTerm: seqno, - }).Once(failMessage), - ) - }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) + instance.Status.ExistingISMPolicy = nil }) - }) - When("policy exists in opensearch and is not the same", func() { - BeforeEach(func() { - recorder = record.NewFakeRecorder(1) - policyRequest := requests.ISMPolicy{ - DefaultState: "policy", - Description: "test-policy", - } - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - SequenceNumber: seqno, - PrimaryTerm: seqno, - PolicyID: "test-policy", - }).Once(failMessage), - ) - transport.RegisterResponder( - http.MethodPut, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s?if_seq_no=0&if_primary_term=0", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) - }) - It("should update the policy", func() { + + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() + result, err := reconciler.Reconcile() Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) // Confirm all responders have been called Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) }() @@ -328,39 +300,23 @@ var _ = Describe("ism policy reconciler", func() { events = append(events, msg) } Expect(len(events)).To(Equal(1)) - Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy updated in opensearch", opensearchAPIUpdated))) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s the ISM policy already exists in the OpenSearch cluster", opensearchIsmPolicyExists))) }) }) - When("policy doesn't exist in opensearch", func() { + + When("existing status is true", func() { BeforeEach(func() { - recorder = record.NewFakeRecorder(1) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(404, "OK").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodPut, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) + instance.Status.ExistingISMPolicy = pointer.Bool(true) }) - It("should create the policy", func() { + + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() + result, err := reconciler.Reconcile() Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) // Confirm all responders have been called Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) }() @@ -369,7 +325,72 @@ var _ = Describe("ism policy reconciler", func() { events = append(events, msg) } Expect(len(events)).To(Equal(1)) - Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy created in opensearch", opensearchAPIUpdated))) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s the ISM policy already exists in the OpenSearch cluster", opensearchIsmPolicyExists))) + }) + }) + + Context("existing status is false", func() { + BeforeEach(func() { + instance.Status.ExistingISMPolicy = pointer.Bool(false) + }) + + When("policy is the same", func() { + BeforeEach(func() { + instance.Spec.DefaultState = "test-state" + instance.Spec.Description = "test-policy" + }) + It("should emit a unit test event and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + // Confirm all responders have been called + Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy is in sync", opensearchAPIUnchanged))) + }) + }) + + When("policy is not the same", func() { + BeforeEach(func() { + instance.Spec.DefaultState = "test-state2" + instance.Spec.Description = "test-policy2" + + transport.RegisterResponder( + http.MethodPut, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Spec.PolicyID, + ), + httpmock.NewStringResponder(200, "OK").Once(), + ) + }) + It("should update ism policy, emit a unit test event, and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + // Confirm all responders have been called + Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy updated in opensearch", opensearchAPIUpdated))) + }) }) }) }) @@ -406,9 +427,14 @@ var _ = Describe("ism policy reconciler", func() { }) }) - When("policy does not exist", func() { + Context("cluster is ready", func() { + // extraContextCalls := 1 BeforeEach(func() { + cluster.Status.Phase = opsterv1.PhaseRunning + cluster.Status.ComponentsStatus = []opsterv1.ComponentStatus{} mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) + recorder = record.NewFakeRecorder(1) + transport.RegisterResponder( http.MethodGet, fmt.Sprintf( @@ -418,6 +444,7 @@ var _ = Describe("ism policy reconciler", func() { ), httpmock.NewStringResponder(200, "OK").Times(2, failMessage), ) + transport.RegisterResponder( http.MethodHead, fmt.Sprintf( @@ -427,75 +454,42 @@ var _ = Describe("ism policy reconciler", func() { ), httpmock.NewStringResponder(200, "OK").Once(failMessage), ) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(404, "does not exist").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodDelete, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) }) - It("should do nothing and exit", func() { - Expect(reconciler.Delete()).To(Succeed()) - }) - }) - When("policy does exist", func() { - BeforeEach(func() { - mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", - cluster.Spec.General.ServiceName, - cluster.Namespace, - ), - httpmock.NewStringResponder(200, "OK").Times(2, failMessage), - ) - transport.RegisterResponder( - http.MethodHead, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", - cluster.Spec.General.ServiceName, - cluster.Namespace, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodDelete, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) + + When("policy does not exist", func() { + BeforeEach(func() { + transport.RegisterResponder( + http.MethodDelete, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Name, + ), + httpmock.NewStringResponder(404, "does not exist").Once(failMessage), + ) + }) + It("should do nothing and exit", func() { + Expect(reconciler.Delete()).NotTo(Succeed()) + }) }) - It("should delete the policy", func() { - Expect(reconciler.Delete()).To(Succeed()) + + When("policy does exist", func() { + BeforeEach(func() { + transport.RegisterResponder( + http.MethodDelete, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Name, + ), + httpmock.NewStringResponder(200, "OK").Once(failMessage), + ) + }) + It("should delete the policy", func() { + Expect(reconciler.Delete()).To(Succeed()) + }) }) }) }) diff --git a/opensearch-operator/pkg/reconcilers/reconcilers.go b/opensearch-operator/pkg/reconcilers/reconcilers.go index 6531bc6f..c0e644ee 100644 --- a/opensearch-operator/pkg/reconcilers/reconcilers.go +++ b/opensearch-operator/pkg/reconcilers/reconcilers.go @@ -14,13 +14,15 @@ import ( ) const ( - opensearchPending = "OpensearchPending" - opensearchError = "OpensearchError" - opensearchAPIError = "OpensearchAPIError" - opensearchRefMismatch = "OpensearchRefMismatch" - opensearchAPIUpdated = "OpensearchAPIUpdated" - passwordError = "PasswordError" - statusError = "StatusUpdateError" + opensearchPending = "OpensearchPending" + opensearchError = "OpensearchError" + opensearchAPIError = "OpensearchAPIError" + opensearchRefMismatch = "OpensearchRefMismatch" + opensearchAPIUpdated = "OpensearchAPIUpdated" + opensearchAPIUnchanged = "OpensearchAPIUnchanged" + opensearchCustomResourceError = "OpensearchCustomResourceError" + passwordError = "PasswordError" + statusError = "StatusUpdateError" ) type ComponentReconciler func() (reconcile.Result, error)