Skip to content

Commit

Permalink
Rewrote ISM Policy reconciler (#846)
Browse files Browse the repository at this point in the history
### Description
The ISM Policy reconciler was constantly trying to update the ISM Policy
and it was not handling reconciliation requeue in some cases. There were
possibly other issues as well. Below I have described what caused the
different issues I encountered

- The ISM Policy request was different from the response, but they were
both made with the same struct. This caused the reconciler to always see
the existing ISM Policy and the ISM Policy from the CR as different and
try to update it. I have created a separate struct model for each to
separate the logic and in the code I now compare the existing policy
with the policy from the CR by comparing both the Policy IDs and the
policy spec
- There were some very complex cases in the code that were very
difficult to understand so I have attempted to make the code more
concise and easy to read and understand
- I have added reconciliation requeuing to all cases so the operator
doesn't just stop reconciling the ISM Policy in some cases

One thing I am wondering is that I am not sure why we would want to
create a CR without specifying the cluster ID and then the operator
automatically links it to that cluster ID so it breaks if the OpenSearch
CR is deleted. Is this intended and why? I'm talking about the section
with the comment "Check cluster ref has not changed"

Tested cases:
- A new ISM Policy is created through a CR and the operator creates it
in the OpenSearch Cluster
- The CR for an ISM Policy that is created by the operator is removed
and the operator removes it in the OpenSearch Cluster
- An ISM Policy that already exists in the OpenSearch Cluster is created
through a CR and the operator ignores it and marks it as existing
- The CR for an ISM Policy that was pre-existing and therefore was not
created by the operator is removed and the operator does not remove the
ISM Policy from the OpenSearch Cluster
- An ISM Policy that already exists in the OpenSearch Cluster is created
through a CR and the operator ignores it and marks it as existing. The
ISM Policy is then manually removed from the OpenSearch Cluster and the
operator now applies the ISM Policy from the CR

The test for ISM Policies is currently failing miserably, but I decided
to create the PR to get feedback before I dive into fixing it.

### Issues Resolved
#833
#732
Possibly other issues

### Check List
- [x] Commits are signed per the DCO using --signoff 
- [x] Unittest added for the new/changed functionality and all unit
tests are successful
- [x] Customer-visible features documented
- [x] No linter warnings (`make lint`)

If CRDs are changed:
- [ ] CRD YAMLs updated (`make manifests`) and also copied into the helm
chart
- [ ] Changes to CRDs documented

Please refer to the [PR
guidelines](https://github.com/opensearch-project/opensearch-k8s-operator/blob/main/docs/developing.md#submitting-a-pr)
before submitting this pull request.

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and
signing off your commits, please check
[here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin).

Signed-off-by: rkthtrifork <[email protected]>
  • Loading branch information
rkthtrifork authored Aug 22, 2024
1 parent 610222a commit ec6428d
Show file tree
Hide file tree
Showing 7 changed files with 383 additions and 373 deletions.
9 changes: 3 additions & 6 deletions opensearch-operator/opensearch-gateway/requests/IsmPolicy.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package requests

type Policy struct {
PolicyID string `json:"_id,omitempty"`
PrimaryTerm *int `json:"_primary_term,omitempty"`
SequenceNumber *int `json:"_seq_no,omitempty"`
Policy ISMPolicy `json:"policy"`
type ISMPolicy struct {
Policy ISMPolicySpec `json:"policy"`
}

// ISMPolicySpec is the specification for the ISM policy for OS.
type ISMPolicy struct {
type ISMPolicySpec struct {
// The default starting state for each index that uses this policy.
DefaultState string `json:"default_state"`
// A human-readable description of the policy.
Expand Down

This file was deleted.

10 changes: 10 additions & 0 deletions opensearch-operator/opensearch-gateway/responses/IsmPolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package responses

import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests"

type GetISMPolicyResponse struct {
PolicyID string `json:"_id"`
PrimaryTerm int `json:"_primary_term"`
SequenceNumber int `json:"_seq_no"`
Policy requests.ISMPolicySpec
}
33 changes: 10 additions & 23 deletions opensearch-operator/opensearch-gateway/services/os_ism_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/responses"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/opensearch-project/opensearch-go/opensearchutil"
Expand All @@ -16,7 +17,7 @@ import (
var ErrNotFound = errors.New("policy not found")

// ShouldUpdateISMPolicy checks if the passed policy is same as existing or needs update
func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.Policy) (bool, error) {
func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.ISMPolicy) (bool, error) {
if cmp.Equal(newPolicy, existingPolicy, cmpopts.EquateEmpty()) {
return false, nil
}
Expand All @@ -27,34 +28,20 @@ func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy reques
return true, nil
}

// PolicyExists checks if the passed policy already exists or not
func PolicyExists(ctx context.Context, service *OsClusterClient, policyName string) (bool, error) {
resp, err := service.GetISMConfig(ctx, policyName)
if err != nil {
return false, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return false, nil
} else if resp.IsError() {
return false, fmt.Errorf("response from API is %s", resp.Status())
}
return true, nil
}

// GetPolicy fetches the passed policy
func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*requests.Policy, error) {
func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*responses.GetISMPolicyResponse, error) {
resp, err := service.GetISMConfig(ctx, policyName)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, ErrNotFound
} else if resp.IsError() {
}
if resp.IsError() {
return nil, fmt.Errorf("response from API is %s", resp.Status())
}
ismResponse := requests.Policy{}
ismResponse := responses.GetISMPolicyResponse{}
if resp != nil && resp.Body != nil {
err := json.NewDecoder(resp.Body).Decode(&ismResponse)
if err != nil {
Expand All @@ -66,7 +53,7 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string)
}

// CreateISMPolicy creates the passed policy
func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, policyId string) error {
func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, policyId string) error {
spec := opensearchutil.NewJSONReader(ismpolicy)
resp, err := service.PutISMConfig(ctx, policyId, spec)
if err != nil {
Expand All @@ -80,15 +67,15 @@ func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy re
}

// UpdateISMPolicy updates the given policy
func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, seqno, primterm *int, policyName string) error {
func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, seqno, primterm *int, policyId string) error {
spec := opensearchutil.NewJSONReader(ismpolicy)
resp, err := service.UpdateISMConfig(ctx, policyName, *seqno, *primterm, spec)
resp, err := service.UpdateISMConfig(ctx, policyId, *seqno, *primterm, spec)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to create ism policy: %s", resp.String())
return fmt.Errorf("failed to update ism policy: %s", resp.String())
}
return nil
}
Expand Down
Loading

0 comments on commit ec6428d

Please sign in to comment.