Skip to content

CLOUDP-324440 Save rs member ids #206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/om/automation_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ func TestApplyInto(t *testing.T) {
}

func changeTypes(deployment Deployment) error {
rs := deployment.getReplicaSets()
rs := deployment.GetReplicaSets()
deployment.setReplicaSets(rs)
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions controllers/om/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,13 @@ func (d Deployment) AddMonitoringAndBackup(log *zap.SugaredLogger, tls bool, caF
d.addBackup(log)
}

// DEPRECATED: this shouldn't be used as it may panic because of different underlying type; use getReplicaSets instead
// DEPRECATED: this shouldn't be used as it may panic because of different underlying type; use GetReplicaSets instead
func (d Deployment) ReplicaSets() []ReplicaSet {
return d["replicaSets"].([]ReplicaSet)
}

func (d Deployment) GetReplicaSetByName(name string) ReplicaSet {
for _, rs := range d.getReplicaSets() {
for _, rs := range d.GetReplicaSets() {
if rs.Name() == name {
return rs
}
Expand Down Expand Up @@ -395,7 +395,7 @@ func (d Deployment) RemoveReplicaSetByName(name string, log *zap.SugaredLogger)
return xerrors.New("ReplicaSet does not exist")
}

currentRs := d.getReplicaSets()
currentRs := d.GetReplicaSets()
toKeep := make([]ReplicaSet, len(currentRs)-1)
i := 0
for _, el := range currentRs {
Expand Down Expand Up @@ -685,7 +685,7 @@ func (d Deployment) ProcessesCopy() []Process {

// ReplicaSetsCopy returns the COPY of replicasets in the deployment.
func (d Deployment) ReplicaSetsCopy() []ReplicaSet {
return d.deepCopy().getReplicaSets()
return d.deepCopy().GetReplicaSets()
}

// ShardedClustersCopy returns the COPY of sharded clusters in the deployment.
Expand Down Expand Up @@ -958,7 +958,7 @@ func (d Deployment) getProcessByName(name string) *Process {
}

func (d Deployment) getReplicaSetByName(name string) *ReplicaSet {
for _, r := range d.getReplicaSets() {
for _, r := range d.GetReplicaSets() {
if r.Name() == name {
return &r
}
Expand All @@ -977,7 +977,7 @@ func (d Deployment) getShardedClusterByName(name string) *ShardedCluster {
return nil
}

func (d Deployment) getReplicaSets() []ReplicaSet {
func (d Deployment) GetReplicaSets() []ReplicaSet {
switch v := d["replicaSets"].(type) {
case []ReplicaSet:
return v
Expand All @@ -997,7 +997,7 @@ func (d Deployment) setReplicaSets(replicaSets []ReplicaSet) {
}

func (d Deployment) addReplicaSet(rs ReplicaSet) {
d.setReplicaSets(append(d.getReplicaSets(), rs))
d.setReplicaSets(append(d.GetReplicaSets(), rs))
}

func (d Deployment) getShardedClusters() []ShardedCluster {
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func (d Deployment) findReplicaSetsRemovedFromShardedCluster(clusterName string)
clusterReplicaSets := shardedCluster.getAllReplicaSets()
var ans []string

for _, v := range d.getReplicaSets() {
for _, v := range d.GetReplicaSets() {
if !stringutil.Contains(clusterReplicaSets, v.Name()) && isShardOfShardedCluster(clusterName, v.Name()) {
ans = append(ans, v.Name())
}
Expand Down
40 changes: 20 additions & 20 deletions controllers/om/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,25 @@ func TestMergeReplicaSet(t *testing.T) {
expectedRs := buildRsByProcesses("fooRs", createReplicaSetProcesses("fooRs"))

assert.Len(t, d.getProcesses(), 3)
assert.Len(t, d.getReplicaSets(), 1)
assert.Len(t, d.getReplicaSets()[0].Members(), 3)
assert.Equal(t, d.getReplicaSets()[0], expectedRs.Rs)
assert.Len(t, d.GetReplicaSets(), 1)
assert.Len(t, d.GetReplicaSets()[0].Members(), 3)
assert.Equal(t, d.GetReplicaSets()[0], expectedRs.Rs)

// Now the deployment "gets updated" from external - new node is added and one is removed - this should be fixed
// by merge
newProcess := NewMongodProcess("foo", "bar", "fake-mongoDBImage", false, &mdbv1.AdditionalMongodConfig{}, &mdbv1.NewStandaloneBuilder().Build().Spec, "", nil, "")

d.getProcesses()[0]["processType"] = ProcessTypeMongos // this will be overriden
d.getProcesses()[1].EnsureNetConfig()["MaxIncomingConnections"] = 20 // this will be left as-is
d.getReplicaSets()[0]["protocolVersion"] = 10 // this field will be overriden by Operator
d.getReplicaSets()[0].setMembers(d.getReplicaSets()[0].Members()[0:2]) // "removing" the last node in replicaset
d.getReplicaSets()[0].addMember(newProcess, "", automationconfig.MemberOptions{}) // "adding" some new node
d.getReplicaSets()[0].Members()[0]["arbiterOnly"] = true // changing data for first node
d.GetReplicaSets()[0]["protocolVersion"] = 10 // this field will be overriden by Operator
d.GetReplicaSets()[0].setMembers(d.GetReplicaSets()[0].Members()[0:2]) // "removing" the last node in replicaset
d.GetReplicaSets()[0].addMember(newProcess, "", automationconfig.MemberOptions{}) // "adding" some new node
d.GetReplicaSets()[0].Members()[0]["arbiterOnly"] = true // changing data for first node

mergeReplicaSet(d, "fooRs", createReplicaSetProcesses("fooRs"))

assert.Len(t, d.getProcesses(), 3)
assert.Len(t, d.getReplicaSets(), 1)
assert.Len(t, d.GetReplicaSets(), 1)

expectedRs = buildRsByProcesses("fooRs", createReplicaSetProcesses("fooRs"))
expectedRs.Rs.Members()[0]["arbiterOnly"] = true
Expand All @@ -89,14 +89,14 @@ func TestMergeReplica_ScaleDown(t *testing.T) {

mergeReplicaSet(d, "someRs", createReplicaSetProcesses("someRs"))
assert.Len(t, d.getProcesses(), 3)
assert.Len(t, d.getReplicaSets()[0].Members(), 3)
assert.Len(t, d.GetReplicaSets()[0].Members(), 3)

// "scale down"
scaledDownRsProcesses := createReplicaSetProcesses("someRs")[0:2]
mergeReplicaSet(d, "someRs", scaledDownRsProcesses)

assert.Len(t, d.getProcesses(), 2)
assert.Len(t, d.getReplicaSets()[0].Members(), 2)
assert.Len(t, d.GetReplicaSets()[0].Members(), 2)

// checking that the last member was removed
rsProcesses := buildRsByProcesses("someRs", createReplicaSetProcesses("someRs")).Processes
Expand All @@ -123,7 +123,7 @@ func TestMergeReplicaSet_MergeFirstProcess(t *testing.T) {
mergeReplicaSet(d, "fooRs", createReplicaSetProcessesCount(5, "fooRs"))

assert.Len(t, d.getProcesses(), 8)
assert.Len(t, d.getReplicaSets(), 2)
assert.Len(t, d.GetReplicaSets(), 2)

expectedRs := buildRsByProcesses("fooRs", createReplicaSetProcessesCount(5, "fooRs"))

Expand Down Expand Up @@ -177,8 +177,8 @@ func TestMergeDeployment_BigReplicaset(t *testing.T) {
checkNumberOfVotingMembers(t, rs, 7, 8)

// Now OM user "has changed" votes for some of the members - this must stay the same after merge
omDeployment.getReplicaSets()[0].Members()[2].setVotes(0).setPriority(0)
omDeployment.getReplicaSets()[0].Members()[4].setVotes(0).setPriority(0)
omDeployment.GetReplicaSets()[0].Members()[2].setVotes(0).setPriority(0)
omDeployment.GetReplicaSets()[0].Members()[4].setVotes(0).setPriority(0)

omDeployment.MergeReplicaSet(rs, nil, nil, zap.S())
checkNumberOfVotingMembers(t, rs, 5, 8)
Expand All @@ -199,10 +199,10 @@ func TestMergeDeployment_BigReplicaset(t *testing.T) {

omDeployment.MergeReplicaSet(rsToMerge, nil, nil, zap.S())
checkNumberOfVotingMembers(t, rs, 7, 11)
assert.Equal(t, 0, omDeployment.getReplicaSets()[0].Members()[2].Votes())
assert.Equal(t, 0, omDeployment.getReplicaSets()[0].Members()[4].Votes())
assert.Equal(t, float32(0), omDeployment.getReplicaSets()[0].Members()[2].Priority())
assert.Equal(t, float32(0), omDeployment.getReplicaSets()[0].Members()[4].Priority())
assert.Equal(t, 0, omDeployment.GetReplicaSets()[0].Members()[2].Votes())
assert.Equal(t, 0, omDeployment.GetReplicaSets()[0].Members()[4].Votes())
assert.Equal(t, float32(0), omDeployment.GetReplicaSets()[0].Members()[2].Priority())
assert.Equal(t, float32(0), omDeployment.GetReplicaSets()[0].Members()[4].Priority())
}

func TestGetAllProcessNames_MergedReplicaSetsAndShardedClusters(t *testing.T) {
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestGetNumberOfExcessProcesses_ShardedClusterScaleDown(t *testing.T) {
_, err := d.MergeShardedCluster(mergeOpts)
assert.NoError(t, err)
assert.Len(t, d.getShardedClusterByName("sc001").shards(), 3)
assert.Len(t, d.getReplicaSets(), 4)
assert.Len(t, d.GetReplicaSets(), 4)
assert.Equal(t, 0, d.GetNumberOfExcessProcesses("sc001"))

// Now we are "scaling down" the sharded cluster - so junk replica sets will appear - this is still ok
Expand All @@ -377,7 +377,7 @@ func TestGetNumberOfExcessProcesses_ShardedClusterScaleDown(t *testing.T) {
_, err = d.MergeShardedCluster(mergeOpts)
assert.NoError(t, err)
assert.Len(t, d.getShardedClusterByName("sc001").shards(), 2)
assert.Len(t, d.getReplicaSets(), 4)
assert.Len(t, d.GetReplicaSets(), 4)

assert.Equal(t, 0, d.GetNumberOfExcessProcesses("sc001"))
}
Expand Down Expand Up @@ -586,7 +586,7 @@ func checkShardedClusterCheckExtraReplicaSets(t *testing.T, d Deployment, expect
// checking that no previous replica sets are left. For this we take the name of first shard and remove the last digit
firstShardName := expectedReplicaSets[0].Rs.Name()
i := 0
for _, r := range d.getReplicaSets() {
for _, r := range d.GetReplicaSets() {
if strings.HasPrefix(r.Name(), firstShardName[0:len(firstShardName)-1]) {
i++
}
Expand Down
14 changes: 7 additions & 7 deletions controllers/om/depshardedcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func TestMergeShardedCluster_New(t *testing.T) {
assert.NoError(t, err)

require.Len(t, d.getProcesses(), 15)
require.Len(t, d.getReplicaSets(), 4)
require.Len(t, d.GetReplicaSets(), 4)
for i := 0; i < 4; i++ {
require.Len(t, d.getReplicaSets()[i].Members(), 3)
require.Len(t, d.GetReplicaSets()[i].Members(), 3)
}
checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster"))
checkReplicaSet(t, d, createConfigSrvRs("configSrv", true))
Expand Down Expand Up @@ -130,9 +130,9 @@ func TestMergeShardedCluster_ReplicaSetsModified(t *testing.T) {
expectedShards[0].Rs["writeConcernMajorityJournalDefault"] = true

require.Len(t, d.getProcesses(), 15)
require.Len(t, d.getReplicaSets(), 4)
require.Len(t, d.GetReplicaSets(), 4)
for i := 0; i < 4; i++ {
require.Len(t, d.getReplicaSets()[i].Members(), 3)
require.Len(t, d.GetReplicaSets()[i].Members(), 3)
}
checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster"))
checkReplicaSet(t, d, createConfigSrvRs("configSrv", true))
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestMergeShardedCluster_ShardedClusterModified(t *testing.T) {

mergeReplicaSet(d, "fakeShard", createReplicaSetProcesses("fakeShard"))

require.Len(t, d.getReplicaSets(), 5)
require.Len(t, d.GetReplicaSets(), 5)

// Final check - we create the expected configuration, add there correct OM changes and check for equality with merge
// result
Expand All @@ -188,9 +188,9 @@ func TestMergeShardedCluster_ShardedClusterModified(t *testing.T) {
// Note, that fake replicaset and it's processes haven't disappeared as we passed 'false' to 'MergeShardedCluster'
// which results in "draining" for redundant shards but not physical removal of replica sets
require.Len(t, d.getProcesses(), 18)
require.Len(t, d.getReplicaSets(), 5)
require.Len(t, d.GetReplicaSets(), 5)
for i := 0; i < 4; i++ {
require.Len(t, d.getReplicaSets()[i].Members(), 3)
require.Len(t, d.GetReplicaSets()[i].Members(), 3)
}
checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster"))
checkReplicaSet(t, d, createConfigSrvRs("configSrv", true))
Expand Down
18 changes: 17 additions & 1 deletion controllers/om/mockedomclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ func (oc *MockedOmConnection) ConfigureProject(project *Project) {
oc.context.OrgID = project.OrgID
}

func (oc *MockedOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) {
oc.addToHistory(reflect.ValueOf(oc.GetReplicaSetMemberIds))
dep, err := oc.ReadDeployment()
if err != nil {
return nil, err
}

finalProcessIds := make(map[string]map[string]int)

for _, replicaSet := range dep.GetReplicaSets() {
finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds()
}

return finalProcessIds, nil
}

var _ Connection = &MockedOmConnection{}

// NewEmptyMockedOmConnection is the standard function for creating mocked connections that is usually used for testing
Expand Down Expand Up @@ -734,7 +750,7 @@ func (oc *MockedOmConnection) CheckResourcesAndBackupDeleted(t *testing.T, resou
// This can be improved for some more complicated scenarios when we have different resources in parallel - so far
// just checking if deployment
assert.Empty(t, oc.deployment.getProcesses())
assert.Empty(t, oc.deployment.getReplicaSets())
assert.Empty(t, oc.deployment.GetReplicaSets())
assert.Empty(t, oc.deployment.getShardedClusters())
assert.Empty(t, oc.deployment.getMonitoringVersions())
assert.Empty(t, oc.deployment.getBackupVersions())
Expand Down
19 changes: 19 additions & 0 deletions controllers/om/omclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type Connection interface {
GetPreferredHostnames(agentApiKey string) ([]PreferredHostname, error)
AddPreferredHostname(agentApiKey string, value string, isRegexp bool) error

// GetReplicaSetMemberIds returns a map with the replicaset name as the key.
// The value is another map where the key is the replicaset member name and the value is its member id.
GetReplicaSetMemberIds() (map[string]map[string]int, error)

backup.GroupConfigReader
backup.GroupConfigUpdater

Expand Down Expand Up @@ -273,6 +277,21 @@ func (oc *HTTPOmConnection) GetAgentAuthMode() (string, error) {
return ac.Auth.AutoAuthMechanism, nil
}

func (oc *HTTPOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) {
dep, err := oc.ReadDeployment()
if err != nil {
return nil, err
}

finalProcessIds := make(map[string]map[string]int)

for _, replicaSet := range dep.GetReplicaSets() {
finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds()
}

return finalProcessIds, nil
}

var _ Connection = &HTTPOmConnection{}

// NewOpsManagerConnection stores OpsManger api endpoint and authentication credentials.
Expand Down
8 changes: 8 additions & 0 deletions controllers/om/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ func (r ReplicaSet) String() string {
return fmt.Sprintf("\"%s\" (members: %v)", r.Name(), r.Members())
}

func (r ReplicaSet) MemberIds() map[string]int {
memberIds := make(map[string]int)
for _, rsMember := range r.Members() {
memberIds[rsMember.Name()] = rsMember.Id()
}
return memberIds
}

// ***************************************** Private methods ***********************************************************

func initDefaultRs(set ReplicaSet, name string, protocolVersion string) {
Expand Down
32 changes: 30 additions & 2 deletions controllers/operator/mongodbmultireplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request
return r.updateStatus(ctx, &mrs, status, log)
}

// Save replicasets member ids in annotation
finalMemberIds, err := conn.GetReplicaSetMemberIds()
if err != nil {
return r.updateStatus(ctx, &mrs, workflow.Failed(err), log)
}

mrs.Status.FeatureCompatibilityVersion = mrs.CalculateFeatureCompatibilityVersion()
if err := r.saveLastAchievedSpec(ctx, mrs); err != nil {
if err := r.saveLastAchievedSpec(ctx, mrs, finalMemberIds); err != nil {
return r.updateStatus(ctx, &mrs, workflow.Failed(xerrors.Errorf("Failed to set annotation: %w", err)), log)
}

Expand Down Expand Up @@ -624,7 +630,7 @@ func getMembersForClusterSpecItemThisReconciliation(mrs *mdbmultiv1.MongoDBMulti
}

// saveLastAchievedSpec updates the MongoDBMultiCluster resource with the spec that was just achieved.
func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster) error {
func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds map[string]map[string]int) error {
clusterSpecs, err := mrs.GetClusterSpecItems()
if err != nil {
return err
Expand Down Expand Up @@ -654,6 +660,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Conte
annotationsToAdd[mdbmultiv1.LastClusterNumMapping] = string(clusterNumBytes)
}

rsMemberIdsBytes, err := json.Marshal(rsMemberIds)
if err != nil {
return err
}
if string(rsMemberIdsBytes) != "null" {
annotationsToAdd[util.LastAchievedRsMemberIds] = string(rsMemberIdsBytes)
}

return annotations.SetAnnotations(ctx, &mrs, annotationsToAdd, r.client)
}

Expand Down Expand Up @@ -696,6 +710,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte
}

processIds := getReplicaSetProcessIdsFromReplicaSets(mrs.Name, existingDeployment)
// If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in annotation
if len(processIds) == 0 {
processIds = getReplicaSetProcessIdsFromAnnotation(mrs)
}
log.Debugf("Existing process Ids: %+v", processIds)

certificateFileName := ""
Expand Down Expand Up @@ -791,6 +809,16 @@ func getReplicaSetProcessIdsFromReplicaSets(replicaSetName string, deployment om
return processIds
}

func getReplicaSetProcessIdsFromAnnotation(mrs mdbmultiv1.MongoDBMultiCluster) map[string]int {
processIds := make(map[string]map[string]int)
if processIdsStr, ok := mrs.Annotations[util.LastAchievedRsMemberIds]; ok {
if err := json.Unmarshal([]byte(processIdsStr), &processIds); err != nil {
return map[string]int{}
}
}
return processIds[mrs.Name]
}

func getSRVService(mrs *mdbmultiv1.MongoDBMultiCluster) corev1.Service {
additionalConfig := mrs.Spec.GetAdditionalMongodConfig()
port := additionalConfig.GetPortOrDefault()
Expand Down
Loading