diff --git a/controllers/om/automation_config_test.go b/controllers/om/automation_config_test.go index 69300956e..fdc0d8e18 100644 --- a/controllers/om/automation_config_test.go +++ b/controllers/om/automation_config_test.go @@ -1059,7 +1059,7 @@ func TestApplyInto(t *testing.T) { } func changeTypes(deployment Deployment) error { - rs := deployment.getReplicaSets() + rs := deployment.GetReplicaSets() deployment.setReplicaSets(rs) return nil } diff --git a/controllers/om/deployment.go b/controllers/om/deployment.go index 254816648..ad51580c1 100644 --- a/controllers/om/deployment.go +++ b/controllers/om/deployment.go @@ -280,13 +280,13 @@ func (d Deployment) AddMonitoringAndBackup(log *zap.SugaredLogger, tls bool, caF d.addBackup(log) } -// DEPRECATED: this shouldn't be used as it may panic because of different underlying type; use getReplicaSets instead +// DEPRECATED: this shouldn't be used as it may panic because of different underlying type; use GetReplicaSets instead func (d Deployment) ReplicaSets() []ReplicaSet { return d["replicaSets"].([]ReplicaSet) } func (d Deployment) GetReplicaSetByName(name string) ReplicaSet { - for _, rs := range d.getReplicaSets() { + for _, rs := range d.GetReplicaSets() { if rs.Name() == name { return rs } @@ -395,7 +395,7 @@ func (d Deployment) RemoveReplicaSetByName(name string, log *zap.SugaredLogger) return xerrors.New("ReplicaSet does not exist") } - currentRs := d.getReplicaSets() + currentRs := d.GetReplicaSets() toKeep := make([]ReplicaSet, len(currentRs)-1) i := 0 for _, el := range currentRs { @@ -685,7 +685,7 @@ func (d Deployment) ProcessesCopy() []Process { // ReplicaSetsCopy returns the COPY of replicasets in the deployment. func (d Deployment) ReplicaSetsCopy() []ReplicaSet { - return d.deepCopy().getReplicaSets() + return d.deepCopy().GetReplicaSets() } // ShardedClustersCopy returns the COPY of sharded clusters in the deployment. @@ -958,7 +958,7 @@ func (d Deployment) getProcessByName(name string) *Process { } func (d Deployment) getReplicaSetByName(name string) *ReplicaSet { - for _, r := range d.getReplicaSets() { + for _, r := range d.GetReplicaSets() { if r.Name() == name { return &r } @@ -977,7 +977,7 @@ func (d Deployment) getShardedClusterByName(name string) *ShardedCluster { return nil } -func (d Deployment) getReplicaSets() []ReplicaSet { +func (d Deployment) GetReplicaSets() []ReplicaSet { switch v := d["replicaSets"].(type) { case []ReplicaSet: return v @@ -997,7 +997,7 @@ func (d Deployment) setReplicaSets(replicaSets []ReplicaSet) { } func (d Deployment) addReplicaSet(rs ReplicaSet) { - d.setReplicaSets(append(d.getReplicaSets(), rs)) + d.setReplicaSets(append(d.GetReplicaSets(), rs)) } func (d Deployment) getShardedClusters() []ShardedCluster { @@ -1052,7 +1052,7 @@ func (d Deployment) findReplicaSetsRemovedFromShardedCluster(clusterName string) clusterReplicaSets := shardedCluster.getAllReplicaSets() var ans []string - for _, v := range d.getReplicaSets() { + for _, v := range d.GetReplicaSets() { if !stringutil.Contains(clusterReplicaSets, v.Name()) && isShardOfShardedCluster(clusterName, v.Name()) { ans = append(ans, v.Name()) } diff --git a/controllers/om/deployment_test.go b/controllers/om/deployment_test.go index a222cadd6..8f0808dae 100644 --- a/controllers/om/deployment_test.go +++ b/controllers/om/deployment_test.go @@ -56,9 +56,9 @@ func TestMergeReplicaSet(t *testing.T) { expectedRs := buildRsByProcesses("fooRs", createReplicaSetProcesses("fooRs")) assert.Len(t, d.getProcesses(), 3) - assert.Len(t, d.getReplicaSets(), 1) - assert.Len(t, d.getReplicaSets()[0].Members(), 3) - assert.Equal(t, d.getReplicaSets()[0], expectedRs.Rs) + assert.Len(t, d.GetReplicaSets(), 1) + assert.Len(t, d.GetReplicaSets()[0].Members(), 3) + assert.Equal(t, d.GetReplicaSets()[0], expectedRs.Rs) // Now the deployment "gets updated" from external - new node is added and one is removed - this should be fixed // by merge @@ -66,15 +66,15 @@ func TestMergeReplicaSet(t *testing.T) { d.getProcesses()[0]["processType"] = ProcessTypeMongos // this will be overriden d.getProcesses()[1].EnsureNetConfig()["MaxIncomingConnections"] = 20 // this will be left as-is - d.getReplicaSets()[0]["protocolVersion"] = 10 // this field will be overriden by Operator - d.getReplicaSets()[0].setMembers(d.getReplicaSets()[0].Members()[0:2]) // "removing" the last node in replicaset - d.getReplicaSets()[0].addMember(newProcess, "", automationconfig.MemberOptions{}) // "adding" some new node - d.getReplicaSets()[0].Members()[0]["arbiterOnly"] = true // changing data for first node + d.GetReplicaSets()[0]["protocolVersion"] = 10 // this field will be overriden by Operator + d.GetReplicaSets()[0].setMembers(d.GetReplicaSets()[0].Members()[0:2]) // "removing" the last node in replicaset + d.GetReplicaSets()[0].addMember(newProcess, "", automationconfig.MemberOptions{}) // "adding" some new node + d.GetReplicaSets()[0].Members()[0]["arbiterOnly"] = true // changing data for first node mergeReplicaSet(d, "fooRs", createReplicaSetProcesses("fooRs")) assert.Len(t, d.getProcesses(), 3) - assert.Len(t, d.getReplicaSets(), 1) + assert.Len(t, d.GetReplicaSets(), 1) expectedRs = buildRsByProcesses("fooRs", createReplicaSetProcesses("fooRs")) expectedRs.Rs.Members()[0]["arbiterOnly"] = true @@ -89,14 +89,14 @@ func TestMergeReplica_ScaleDown(t *testing.T) { mergeReplicaSet(d, "someRs", createReplicaSetProcesses("someRs")) assert.Len(t, d.getProcesses(), 3) - assert.Len(t, d.getReplicaSets()[0].Members(), 3) + assert.Len(t, d.GetReplicaSets()[0].Members(), 3) // "scale down" scaledDownRsProcesses := createReplicaSetProcesses("someRs")[0:2] mergeReplicaSet(d, "someRs", scaledDownRsProcesses) assert.Len(t, d.getProcesses(), 2) - assert.Len(t, d.getReplicaSets()[0].Members(), 2) + assert.Len(t, d.GetReplicaSets()[0].Members(), 2) // checking that the last member was removed rsProcesses := buildRsByProcesses("someRs", createReplicaSetProcesses("someRs")).Processes @@ -123,7 +123,7 @@ func TestMergeReplicaSet_MergeFirstProcess(t *testing.T) { mergeReplicaSet(d, "fooRs", createReplicaSetProcessesCount(5, "fooRs")) assert.Len(t, d.getProcesses(), 8) - assert.Len(t, d.getReplicaSets(), 2) + assert.Len(t, d.GetReplicaSets(), 2) expectedRs := buildRsByProcesses("fooRs", createReplicaSetProcessesCount(5, "fooRs")) @@ -177,8 +177,8 @@ func TestMergeDeployment_BigReplicaset(t *testing.T) { checkNumberOfVotingMembers(t, rs, 7, 8) // Now OM user "has changed" votes for some of the members - this must stay the same after merge - omDeployment.getReplicaSets()[0].Members()[2].setVotes(0).setPriority(0) - omDeployment.getReplicaSets()[0].Members()[4].setVotes(0).setPriority(0) + omDeployment.GetReplicaSets()[0].Members()[2].setVotes(0).setPriority(0) + omDeployment.GetReplicaSets()[0].Members()[4].setVotes(0).setPriority(0) omDeployment.MergeReplicaSet(rs, nil, nil, zap.S()) checkNumberOfVotingMembers(t, rs, 5, 8) @@ -199,10 +199,10 @@ func TestMergeDeployment_BigReplicaset(t *testing.T) { omDeployment.MergeReplicaSet(rsToMerge, nil, nil, zap.S()) checkNumberOfVotingMembers(t, rs, 7, 11) - assert.Equal(t, 0, omDeployment.getReplicaSets()[0].Members()[2].Votes()) - assert.Equal(t, 0, omDeployment.getReplicaSets()[0].Members()[4].Votes()) - assert.Equal(t, float32(0), omDeployment.getReplicaSets()[0].Members()[2].Priority()) - assert.Equal(t, float32(0), omDeployment.getReplicaSets()[0].Members()[4].Priority()) + assert.Equal(t, 0, omDeployment.GetReplicaSets()[0].Members()[2].Votes()) + assert.Equal(t, 0, omDeployment.GetReplicaSets()[0].Members()[4].Votes()) + assert.Equal(t, float32(0), omDeployment.GetReplicaSets()[0].Members()[2].Priority()) + assert.Equal(t, float32(0), omDeployment.GetReplicaSets()[0].Members()[4].Priority()) } func TestGetAllProcessNames_MergedReplicaSetsAndShardedClusters(t *testing.T) { @@ -360,7 +360,7 @@ func TestGetNumberOfExcessProcesses_ShardedClusterScaleDown(t *testing.T) { _, err := d.MergeShardedCluster(mergeOpts) assert.NoError(t, err) assert.Len(t, d.getShardedClusterByName("sc001").shards(), 3) - assert.Len(t, d.getReplicaSets(), 4) + assert.Len(t, d.GetReplicaSets(), 4) assert.Equal(t, 0, d.GetNumberOfExcessProcesses("sc001")) // Now we are "scaling down" the sharded cluster - so junk replica sets will appear - this is still ok @@ -377,7 +377,7 @@ func TestGetNumberOfExcessProcesses_ShardedClusterScaleDown(t *testing.T) { _, err = d.MergeShardedCluster(mergeOpts) assert.NoError(t, err) assert.Len(t, d.getShardedClusterByName("sc001").shards(), 2) - assert.Len(t, d.getReplicaSets(), 4) + assert.Len(t, d.GetReplicaSets(), 4) assert.Equal(t, 0, d.GetNumberOfExcessProcesses("sc001")) } @@ -586,7 +586,7 @@ func checkShardedClusterCheckExtraReplicaSets(t *testing.T, d Deployment, expect // checking that no previous replica sets are left. For this we take the name of first shard and remove the last digit firstShardName := expectedReplicaSets[0].Rs.Name() i := 0 - for _, r := range d.getReplicaSets() { + for _, r := range d.GetReplicaSets() { if strings.HasPrefix(r.Name(), firstShardName[0:len(firstShardName)-1]) { i++ } diff --git a/controllers/om/depshardedcluster_test.go b/controllers/om/depshardedcluster_test.go index e93a90412..e9f025dd0 100644 --- a/controllers/om/depshardedcluster_test.go +++ b/controllers/om/depshardedcluster_test.go @@ -31,9 +31,9 @@ func TestMergeShardedCluster_New(t *testing.T) { assert.NoError(t, err) require.Len(t, d.getProcesses(), 15) - require.Len(t, d.getReplicaSets(), 4) + require.Len(t, d.GetReplicaSets(), 4) for i := 0; i < 4; i++ { - require.Len(t, d.getReplicaSets()[i].Members(), 3) + require.Len(t, d.GetReplicaSets()[i].Members(), 3) } checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster")) checkReplicaSet(t, d, createConfigSrvRs("configSrv", true)) @@ -130,9 +130,9 @@ func TestMergeShardedCluster_ReplicaSetsModified(t *testing.T) { expectedShards[0].Rs["writeConcernMajorityJournalDefault"] = true require.Len(t, d.getProcesses(), 15) - require.Len(t, d.getReplicaSets(), 4) + require.Len(t, d.GetReplicaSets(), 4) for i := 0; i < 4; i++ { - require.Len(t, d.getReplicaSets()[i].Members(), 3) + require.Len(t, d.GetReplicaSets()[i].Members(), 3) } checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster")) checkReplicaSet(t, d, createConfigSrvRs("configSrv", true)) @@ -166,7 +166,7 @@ func TestMergeShardedCluster_ShardedClusterModified(t *testing.T) { mergeReplicaSet(d, "fakeShard", createReplicaSetProcesses("fakeShard")) - require.Len(t, d.getReplicaSets(), 5) + require.Len(t, d.GetReplicaSets(), 5) // Final check - we create the expected configuration, add there correct OM changes and check for equality with merge // result @@ -188,9 +188,9 @@ func TestMergeShardedCluster_ShardedClusterModified(t *testing.T) { // Note, that fake replicaset and it's processes haven't disappeared as we passed 'false' to 'MergeShardedCluster' // which results in "draining" for redundant shards but not physical removal of replica sets require.Len(t, d.getProcesses(), 18) - require.Len(t, d.getReplicaSets(), 5) + require.Len(t, d.GetReplicaSets(), 5) for i := 0; i < 4; i++ { - require.Len(t, d.getReplicaSets()[i].Members(), 3) + require.Len(t, d.GetReplicaSets()[i].Members(), 3) } checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster")) checkReplicaSet(t, d, createConfigSrvRs("configSrv", true)) diff --git a/controllers/om/mockedomclient.go b/controllers/om/mockedomclient.go index d5b90eef9..60978fc4e 100644 --- a/controllers/om/mockedomclient.go +++ b/controllers/om/mockedomclient.go @@ -141,6 +141,22 @@ func (oc *MockedOmConnection) ConfigureProject(project *Project) { oc.context.OrgID = project.OrgID } +func (oc *MockedOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) { + oc.addToHistory(reflect.ValueOf(oc.GetReplicaSetMemberIds)) + dep, err := oc.ReadDeployment() + if err != nil { + return nil, err + } + + finalProcessIds := make(map[string]map[string]int) + + for _, replicaSet := range dep.GetReplicaSets() { + finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() + } + + return finalProcessIds, nil +} + var _ Connection = &MockedOmConnection{} // NewEmptyMockedOmConnection is the standard function for creating mocked connections that is usually used for testing @@ -734,7 +750,7 @@ func (oc *MockedOmConnection) CheckResourcesAndBackupDeleted(t *testing.T, resou // This can be improved for some more complicated scenarios when we have different resources in parallel - so far // just checking if deployment assert.Empty(t, oc.deployment.getProcesses()) - assert.Empty(t, oc.deployment.getReplicaSets()) + assert.Empty(t, oc.deployment.GetReplicaSets()) assert.Empty(t, oc.deployment.getShardedClusters()) assert.Empty(t, oc.deployment.getMonitoringVersions()) assert.Empty(t, oc.deployment.getBackupVersions()) diff --git a/controllers/om/omclient.go b/controllers/om/omclient.go index 7ec1336a1..60e19016f 100644 --- a/controllers/om/omclient.go +++ b/controllers/om/omclient.go @@ -64,6 +64,10 @@ type Connection interface { GetPreferredHostnames(agentApiKey string) ([]PreferredHostname, error) AddPreferredHostname(agentApiKey string, value string, isRegexp bool) error + // GetReplicaSetMemberIds returns a map with the replicaset name as the key. + // The value is another map where the key is the replicaset member name and the value is its member id. + GetReplicaSetMemberIds() (map[string]map[string]int, error) + backup.GroupConfigReader backup.GroupConfigUpdater @@ -273,6 +277,21 @@ func (oc *HTTPOmConnection) GetAgentAuthMode() (string, error) { return ac.Auth.AutoAuthMechanism, nil } +func (oc *HTTPOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) { + dep, err := oc.ReadDeployment() + if err != nil { + return nil, err + } + + finalProcessIds := make(map[string]map[string]int) + + for _, replicaSet := range dep.GetReplicaSets() { + finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() + } + + return finalProcessIds, nil +} + var _ Connection = &HTTPOmConnection{} // NewOpsManagerConnection stores OpsManger api endpoint and authentication credentials. diff --git a/controllers/om/replicaset.go b/controllers/om/replicaset.go index 7c6ea8ed5..181899a4f 100644 --- a/controllers/om/replicaset.go +++ b/controllers/om/replicaset.go @@ -146,6 +146,14 @@ func (r ReplicaSet) String() string { return fmt.Sprintf("\"%s\" (members: %v)", r.Name(), r.Members()) } +func (r ReplicaSet) MemberIds() map[string]int { + memberIds := make(map[string]int) + for _, rsMember := range r.Members() { + memberIds[rsMember.Name()] = rsMember.Id() + } + return memberIds +} + // ***************************************** Private methods *********************************************************** func initDefaultRs(set ReplicaSet, name string, protocolVersion string) { diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index caebe414c..a7e959a12 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -198,8 +198,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request return r.updateStatus(ctx, &mrs, status, log) } + // Save replicasets member ids in annotation + finalMemberIds, err := conn.GetReplicaSetMemberIds() + if err != nil { + return r.updateStatus(ctx, &mrs, workflow.Failed(err), log) + } + mrs.Status.FeatureCompatibilityVersion = mrs.CalculateFeatureCompatibilityVersion() - if err := r.saveLastAchievedSpec(ctx, mrs); err != nil { + if err := r.saveLastAchievedSpec(ctx, mrs, finalMemberIds); err != nil { return r.updateStatus(ctx, &mrs, workflow.Failed(xerrors.Errorf("Failed to set annotation: %w", err)), log) } @@ -624,7 +630,7 @@ func getMembersForClusterSpecItemThisReconciliation(mrs *mdbmultiv1.MongoDBMulti } // saveLastAchievedSpec updates the MongoDBMultiCluster resource with the spec that was just achieved. -func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster) error { +func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds map[string]map[string]int) error { clusterSpecs, err := mrs.GetClusterSpecItems() if err != nil { return err @@ -654,6 +660,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Conte annotationsToAdd[mdbmultiv1.LastClusterNumMapping] = string(clusterNumBytes) } + rsMemberIdsBytes, err := json.Marshal(rsMemberIds) + if err != nil { + return err + } + if string(rsMemberIdsBytes) != "null" { + annotationsToAdd[util.LastAchievedRsMemberIds] = string(rsMemberIdsBytes) + } + return annotations.SetAnnotations(ctx, &mrs, annotationsToAdd, r.client) } @@ -696,6 +710,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte } processIds := getReplicaSetProcessIdsFromReplicaSets(mrs.Name, existingDeployment) + // If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in annotation + if len(processIds) == 0 { + processIds = getReplicaSetProcessIdsFromAnnotation(mrs) + } log.Debugf("Existing process Ids: %+v", processIds) certificateFileName := "" @@ -791,6 +809,16 @@ func getReplicaSetProcessIdsFromReplicaSets(replicaSetName string, deployment om return processIds } +func getReplicaSetProcessIdsFromAnnotation(mrs mdbmultiv1.MongoDBMultiCluster) map[string]int { + processIds := make(map[string]map[string]int) + if processIdsStr, ok := mrs.Annotations[util.LastAchievedRsMemberIds]; ok { + if err := json.Unmarshal([]byte(processIdsStr), &processIds); err != nil { + return map[string]int{} + } + } + return processIds[mrs.Name] +} + func getSRVService(mrs *mdbmultiv1.MongoDBMultiCluster) corev1.Service { additionalConfig := mrs.Spec.GetAdditionalMongodConfig() port := additionalConfig.GetPortOrDefault() diff --git a/controllers/operator/mongodbshardedcluster_controller.go b/controllers/operator/mongodbshardedcluster_controller.go index 07c6ea2a0..920f0848f 100644 --- a/controllers/operator/mongodbshardedcluster_controller.go +++ b/controllers/operator/mongodbshardedcluster_controller.go @@ -98,9 +98,10 @@ func newShardedClusterReconciler(ctx context.Context, kubeClient client.Client, } type ShardedClusterDeploymentState struct { - CommonDeploymentState `json:",inline"` - LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"` - Status *mdbv1.MongoDbStatus `json:"status"` + CommonDeploymentState `json:",inline"` + LastAchievedRsMemberIds map[string]map[string]int `json:"lastAchievedRsMemberIds"` + LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"` + Status *mdbv1.MongoDbStatus `json:"status"` } // updateStatusFromResourceStatus updates the status in the deployment state with values from the resource status with additional ensurance that no data is accidentally lost. @@ -978,6 +979,13 @@ func (r *ShardedClusterReconcileHelper) Reconcile(ctx context.Context, log *zap. return r.updateStatus(ctx, sc, workflow.Failed(err), log) } + // Save replicasets member ids in deployment state + finalProcessIds, err := conn.GetReplicaSetMemberIds() + if err != nil { + return r.updateStatus(ctx, sc, workflow.Failed(err), log) + } + r.deploymentState.LastAchievedRsMemberIds = finalProcessIds + // Save last achieved spec in state r.deploymentState.LastAchievedSpec = &sc.Spec log.Infof("Finished reconciliation for Sharded Cluster! %s", completionMessage(conn.BaseURL(), conn.GroupID())) @@ -1907,7 +1915,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c } configSrvProcesses, configSrvMemberOptions := r.createDesiredConfigSrvProcessesAndMemberOptions(configSrvMemberCertPath) - configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment) + configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment, r.deploymentState.LastAchievedRsMemberIds[sc.ConfigRsName()]) // Shards shards := make([]om.ReplicaSetWithProcesses, sc.Spec.ShardCount) @@ -1918,7 +1926,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c shardInternalClusterPaths = append(shardInternalClusterPaths, fmt.Sprintf("%s/%s", util.InternalClusterAuthMountPath, shardOptions.InternalClusterHash)) shardMemberCertPath := fmt.Sprintf("%s/%s", util.TLSCertMountPath, shardOptions.CertificateHash) desiredShardProcesses, desiredShardMemberOptions := r.createDesiredShardProcessesAndMemberOptions(shardIdx, shardMemberCertPath) - shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment) + shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment, r.deploymentState.LastAchievedRsMemberIds[r.sc.ShardRsName(shardIdx)]) } // updateOmAuthentication normally takes care of the certfile rotation code, but since sharded-cluster is special pertaining multiple clusterfiles, we code this part here for now. @@ -2225,10 +2233,15 @@ func createMongodProcessForShardedCluster(mongoDBImage string, forceEnterprise b // buildReplicaSetFromProcesses creates the 'ReplicaSetWithProcesses' with specified processes. This is of use only // for sharded cluster (config server, shards) -func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment) (om.ReplicaSetWithProcesses, error) { +func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment, savedProcessIds map[string]int) (om.ReplicaSetWithProcesses, error) { replicaSet := om.NewReplicaSet(name, mdb.Spec.GetMongoDBVersion()) existingProcessIds := getReplicaSetProcessIdsFromReplicaSets(replicaSet.Name(), deployment) + // If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in deployment state + if len(existingProcessIds) == 0 { + existingProcessIds = savedProcessIds + } + var rsWithProcesses om.ReplicaSetWithProcesses if mdb.Spec.IsMultiCluster() { // we're passing nil as connectivity argument as in sharded clusters horizons don't make much sense as we don't expose externally individual shards diff --git a/controllers/operator/mongodbshardedcluster_controller_multi_test.go b/controllers/operator/mongodbshardedcluster_controller_multi_test.go index fb3ca64e1..3e0fbc046 100644 --- a/controllers/operator/mongodbshardedcluster_controller_multi_test.go +++ b/controllers/operator/mongodbshardedcluster_controller_multi_test.go @@ -1088,7 +1088,10 @@ func TestMigrateToNewDeploymentState(t *testing.T) { err = kubeClient.Get(ctx, types.NamespacedName{Name: configMapName, Namespace: sc.Namespace}, stateConfigMap) require.NoError(t, err) - expectedDeploymentState := generateExpectedDeploymentState(t, sc) + memberIds, err := omConnectionFactory.GetConnection().GetReplicaSetMemberIds() + require.NoError(t, err) + + expectedDeploymentState := generateExpectedDeploymentState(t, sc, memberIds) require.Contains(t, stateConfigMap.Data, stateKey) require.JSONEq(t, expectedDeploymentState, stateConfigMap.Data[stateKey]) @@ -3587,18 +3590,19 @@ func getMultiClusterFQDN(stsName string, namespace string, clusterIdx int, podId return fmt.Sprintf("%s-svc.%s.svc.%s", getPodName(stsName, clusterIdx, podIdx), namespace, clusterDomain) } -func generateExpectedDeploymentState(t *testing.T, sc *mdbv1.MongoDB) string { +func generateExpectedDeploymentState(t *testing.T, sc *mdbv1.MongoDB, memberIds map[string]map[string]int) string { lastSpec, _ := sc.GetLastSpec() expectedState := ShardedClusterDeploymentState{ CommonDeploymentState: CommonDeploymentState{ ClusterMapping: map[string]int{}, }, - LastAchievedSpec: lastSpec, - Status: &sc.Status, + LastAchievedRsMemberIds: memberIds, + LastAchievedSpec: lastSpec, + Status: &sc.Status, } - lastSpecBytes, err := json.Marshal(expectedState) + expectedStateBytes, err := json.Marshal(expectedState) require.NoError(t, err) - return string(lastSpecBytes) + return string(expectedStateBytes) } func loadMongoDBResource(resourceYamlPath string) (*mdbv1.MongoDB, error) { diff --git a/controllers/operator/mongodbshardedcluster_controller_test.go b/controllers/operator/mongodbshardedcluster_controller_test.go index 782b4dd4a..e07ed7c81 100644 --- a/controllers/operator/mongodbshardedcluster_controller_test.go +++ b/controllers/operator/mongodbshardedcluster_controller_test.go @@ -1644,7 +1644,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc construct.GetPodEnvOptions(), ) shardSts := construct.DatabaseStatefulSet(*sh, shardOptions, zap.S()) - shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment()) + shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), make(map[string]int)) } desiredMongosConfig := createMongosSpec(sh) @@ -1665,7 +1665,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc construct.GetPodEnvOptions(), ) configSvrSts := construct.DatabaseStatefulSet(*sh, configServerOptions, zap.S()) - configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment()) + configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), make(map[string]int)) d := om.NewDeployment() _, err := d.MergeShardedCluster(om.DeploymentShardedClusterMergeOptions{ diff --git a/pkg/util/constants.go b/pkg/util/constants.go index b2082f6ee..0a16395d8 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -280,7 +280,10 @@ const ( // TODO: remove this from here and move it to the certs package // This currently creates an import cycle InternalCertAnnotationKey = "internalCertHash" - LastAchievedSpec = "mongodb.com/v1.lastSuccessfulConfiguration" + + // Annotation keys used by the operator + LastAchievedSpec = "mongodb.com/v1.lastSuccessfulConfiguration" + LastAchievedRsMemberIds = "mongodb.com/v1.lastAchievedRsMemberIds" // SecretVolumeName is the name of the volume resource. SecretVolumeName = "secret-certs"