Skip to content

Commit

Permalink
KO-371: Added a validation check to ensure that MRT fields are allowe…
Browse files Browse the repository at this point in the history
…d only in SC namespaces (#339)

* Added a validation check to ensure that MRT fields are allowed only in SC namespaces.
  • Loading branch information
jwalantmodi05 authored Jan 30, 2025
1 parent 58a3270 commit 630e2ca
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
27 changes: 27 additions & 0 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,10 @@ func validateNamespaceConfig(
return nErr
}

if mErr := validateMRTFields(nsConf); mErr != nil {
return mErr
}

if nsStorage, ok := nsConf["storage-engine"]; ok {
if nsStorage == nil {
return fmt.Errorf(
Expand Down Expand Up @@ -1207,6 +1211,29 @@ func validateNamespaceConfig(
return nil
}

func validateMRTFields(nsConf map[string]interface{}) error {
mrtField := isMRTFieldSet(nsConf)
scEnabled := IsNSSCEnabled(nsConf)

if !scEnabled && mrtField {
return fmt.Errorf("MRT fields are not allowed in non-SC namespace %v", nsConf)
}

return nil
}

func isMRTFieldSet(nsConf map[string]interface{}) bool {
mrtFields := []string{"mrt-duration", "disable-mrt-writes"}

for _, field := range mrtFields {
if _, exists := nsConf[field]; exists {
return true
}
}

return false
}

func validateNamespaceReplicationFactor(
nsConf map[string]interface{}, clSize int,
) error {
Expand Down
45 changes: 45 additions & 0 deletions test/cluster/strong_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,28 @@ var _ = Describe("SCMode", func() {

validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace)
})

It("Should allow MRT fields in SC namespace", func() {
aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 3)
racks := getDummyRackConf(1)

sc1Path := "/test/dev/" + sc1Name
aeroCluster.Spec.Storage.Volumes = append(
aeroCluster.Spec.Storage.Volumes, getStorageVolumeForAerospike(sc1Name, sc1Path))

conf := getSCNamespaceConfig(sc1Name, sc1Path)
conf["mrt-duration"] = 15

racks[0].InputAerospikeConfig = &asdbv1.AerospikeConfigSpec{
Value: map[string]interface{}{
"namespaces": []interface{}{conf},
},
}
aeroCluster.Spec.RackConfig.Racks = racks

err := deployCluster(k8sClient, ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())
})
})

Context("When doing invalid operation", func() {
Expand Down Expand Up @@ -322,6 +344,29 @@ var _ = Describe("SCMode", func() {
Expect(err).To(HaveOccurred())
})

It("Should not allow MRT fields in non-SC namespace", func() {
aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 3)
racks := getDummyRackConf(1)

sc1Path := "/test/dev/" + sc1Name
aeroCluster.Spec.Storage.Volumes = append(
aeroCluster.Spec.Storage.Volumes, getStorageVolumeForAerospike(sc1Name, sc1Path))

conf := getSCNamespaceConfig(sc1Name, sc1Path)
conf["strong-consistency"] = false
conf["mrt-duration"] = 10

racks[0].InputAerospikeConfig = &asdbv1.AerospikeConfigSpec{
Value: map[string]interface{}{
"namespaces": []interface{}{conf},
},
}
aeroCluster.Spec.RackConfig.Racks = racks

err := deployCluster(k8sClient, ctx, aeroCluster)
Expect(err).To(HaveOccurred())
})

})
})

Expand Down

0 comments on commit 630e2ca

Please sign in to comment.