diff --git a/base/util.go b/base/util.go index a3dd50f27b..b48a802322 100644 --- a/base/util.go +++ b/base/util.go @@ -1018,6 +1018,49 @@ func HexCasToUint64(cas string) uint64 { return binary.LittleEndian.Uint64(casBytes[0:8]) } +// HexCasToUint64ForDelta will convert hex cas to uint64 accounting for any stripped zeros in delta calculation +func HexCasToUint64ForDelta(casByte []byte) (uint64, error) { + var decoded []byte + + // as we strip any zeros off the end of the hex value for deltas, the input delta could be odd length + if len(casByte)%2 != 0 { + casByte = append(casByte, '0') + } + + // create byte array for decoding into + decodedLen := hex.DecodedLen(len(casByte)) + // binary.LittleEndian.Uint64 expects length 8 byte array, if larger we should error, if smaller + // (because of stripped 0's then we should make it length 8). + if decodedLen > 8 { + return 0, fmt.Errorf("corrupt hex value, decoded length larger than expected") + } + if decodedLen < 8 { + // can be less than 8 given we have stripped the 0's for some values, in this case we need to ensure large eniough + decoded = make([]byte, 8) + } else { + decoded = make([]byte, decodedLen) + } + + if _, err := hex.Decode(decoded, casByte); err != nil { + return 0, err + } + res := binary.LittleEndian.Uint64(decoded) + return res, nil +} + +// Uint64ToLittleEndianHexAndStripZeros will convert a uint64 type to little endian hex, stripping any zeros off the end +// + stripping 0x from start +func Uint64ToLittleEndianHexAndStripZeros(cas uint64) string { + hexCas := Uint64CASToLittleEndianHex(cas) + + i := len(hexCas) - 1 + for i > 2 && hexCas[i] == '0' { + i-- + } + // strip 0x from start + return string(hexCas[2 : i+1]) +} + func HexToBase64(s string) ([]byte, error) { decoded := make([]byte, hex.DecodedLen(len(s))) if _, err := hex.Decode(decoded, []byte(s)); err != nil { diff --git a/base/util_test.go b/base/util_test.go index 7d8544682f..97d67bd315 100644 --- a/base/util_test.go +++ b/base/util_test.go @@ -1735,3 +1735,40 @@ func TestCASToLittleEndianHex(t *testing.T) { littleEndianHex := Uint64CASToLittleEndianHex(casValue) require.Equal(t, expHexValue, string(littleEndianHex)) } + +func TestUint64CASToLittleEndianHexAndStripZeros(t *testing.T) { + hexLE := "0x0000000000000000" + u64 := HexCasToUint64(hexLE) + hexLEStripped := Uint64ToLittleEndianHexAndStripZeros(u64) + u64Stripped, err := HexCasToUint64ForDelta([]byte(hexLEStripped)) + require.NoError(t, err) + assert.Equal(t, u64, u64Stripped) + + hexLE = "0xffffffffffffffff" + u64 = HexCasToUint64(hexLE) + hexLEStripped = Uint64ToLittleEndianHexAndStripZeros(u64) + u64Stripped, err = HexCasToUint64ForDelta([]byte(hexLEStripped)) + require.NoError(t, err) + assert.Equal(t, u64, u64Stripped) + + hexLE = "0xd123456e789a0bcf" + u64 = HexCasToUint64(hexLE) + hexLEStripped = Uint64ToLittleEndianHexAndStripZeros(u64) + u64Stripped, err = HexCasToUint64ForDelta([]byte(hexLEStripped)) + require.NoError(t, err) + assert.Equal(t, u64, u64Stripped) + + hexLE = "0xd123456e78000000" + u64 = HexCasToUint64(hexLE) + hexLEStripped = Uint64ToLittleEndianHexAndStripZeros(u64) + u64Stripped, err = HexCasToUint64ForDelta([]byte(hexLEStripped)) + require.NoError(t, err) + assert.Equal(t, u64, u64Stripped) + + hexLE = "0xa500000000000000" + u64 = HexCasToUint64(hexLE) + hexLEStripped = Uint64ToLittleEndianHexAndStripZeros(u64) + u64Stripped, err = HexCasToUint64ForDelta([]byte(hexLEStripped)) + require.NoError(t, err) + assert.Equal(t, u64, u64Stripped) +} diff --git a/db/document.go b/db/document.go index 85ec9ebc4f..577c1dd304 100644 --- a/db/document.go +++ b/db/document.go @@ -1117,7 +1117,8 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat } } if hlvXattrData != nil { - err := base.JSONUnmarshal(hlvXattrData, &doc.SyncData.HLV) + // parse the raw bytes of the hlv and convert deltas back to full values in memory + err := base.JSONUnmarshal(hlvXattrData, &doc.HLV) if err != nil { return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err)) } @@ -1157,7 +1158,8 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat } } if hlvXattrData != nil { - err := base.JSONUnmarshal(hlvXattrData, &doc.SyncData.HLV) + // parse the raw bytes of the hlv and convert deltas back to full values in memory + err := base.JSONUnmarshal(hlvXattrData, &doc.HLV) if err != nil { return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), err)) } @@ -1251,7 +1253,7 @@ func (doc *Document) MarshalWithXattrs() (data, syncXattr, vvXattr, mouXattr, gl } } if doc.SyncData.HLV != nil { - vvXattr, err = base.JSONMarshal(&doc.SyncData.HLV) + vvXattr, err = base.JSONMarshal(doc.SyncData.HLV) if err != nil { return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err)) } diff --git a/db/document_test.go b/db/document_test.go index 09dd07c97a..79dbced42a 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -239,24 +239,16 @@ const doc_meta_no_vv = `{ "time_saved": "2017-10-25T12:45:29.622450174-07:00" }` -const doc_meta_vv = `{ - "cvCas":"0x40e2010000000000", - "src":"cb06dc003846116d9b66d2ab23887a96", - "ver":"0x40e2010000000000", - "mv":{ - "s_LhRPsa7CpjEvP5zeXTXEBA":"c0ff05d7ac059a16", - "s_NqiIe0LekFPLeX4JvTO6Iw":"1c008cd6ac059a16" - }, - "pv":{ - "s_YZvBpEaztom9z5V/hDoeIw":"f0ff44d6ac059a16" - } - }` +const doc_meta_vv = `{"cvCas":"0x40e2010000000000","src":"cb06dc003846116d9b66d2ab23887a96","ver":"0x40e2010000000000", + "mv":["c0ff05d7ac059a16@s_LhRPsa7CpjEvP5zeXTXEBA","1c008cd6@s_NqiIe0LekFPLeX4JvTO6Iw"], + "pv":["f0ff44d6ac059a16@s_YZvBpEaztom9z5V/hDoeIw"] +}` func TestParseVersionVectorSyncData(t *testing.T) { mv := make(HLVVersions) pv := make(HLVVersions) - mv["s_LhRPsa7CpjEvP5zeXTXEBA"] = 1628620455147864000 //"c0ff05d7ac059a16" - mv["s_NqiIe0LekFPLeX4JvTO6Iw"] = 1628620455139868700 + mv["s_LhRPsa7CpjEvP5zeXTXEBA"] = 1628620455147864000 + mv["s_NqiIe0LekFPLeX4JvTO6Iw"] = 1628620458747363292 pv["s_YZvBpEaztom9z5V/hDoeIw"] = 1628620455135215600 ctx := base.TestCtx(t) @@ -295,6 +287,22 @@ func TestParseVersionVectorSyncData(t *testing.T) { assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) } +const doc_meta_vv_corrupt = `{"cvCas":"0x40e2010000000000","src":"cb06dc003846116d9b66d2ab23887a96","ver":"0x40e2010000000000", + "mv":["c0ff05d7ac059a16@s_LhRPsa7CpjEvP5zeXTXEBA","1c008cd61c008cd61c008cd6@s_NqiIe0LekFPLeX4JvTO6Iw"], + "pv":["f0ff44d6ac059a16@s_YZvBpEaztom9z5V/hDoeIw"] +}` + +func TestParseVersionVectorCorruptDelta(t *testing.T) { + + ctx := base.TestCtx(t) + + sync_meta := []byte(doc_meta_no_vv) + vv_meta := []byte(doc_meta_vv_corrupt) + _, err := unmarshalDocumentWithXattrs(ctx, "doc1", nil, sync_meta, vv_meta, nil, nil, nil, nil, 1, DocUnmarshalAll) + require.Error(t, err) + +} + // TestRevAndVersion tests marshalling and unmarshalling rev and current version func TestRevAndVersion(t *testing.T) { diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 5b59b2f8d6..e32bc862a8 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -13,6 +13,7 @@ import ( "encoding/base64" "encoding/hex" "fmt" + "sort" "strconv" "strings" @@ -30,6 +31,87 @@ type Version struct { Value uint64 `json:"version"` } +// VersionsDeltas will be sorted by version, first entry will be fill version then after that will be calculated deltas +type VersionsDeltas []Version + +func (vde VersionsDeltas) Len() int { return len(vde) } + +func (vde VersionsDeltas) Swap(i, j int) { + vde[i], vde[j] = vde[j], vde[i] +} + +func (vde VersionsDeltas) Less(i, j int) bool { + if vde[i].Value == vde[j].Value { + return false + } + return vde[i].Value < vde[j].Value +} + +// VersionDeltas calculate the deltas of input map +func VersionDeltas(versions map[string]uint64) VersionsDeltas { + if versions == nil { + return nil + } + + vdm := make(VersionsDeltas, 0, len(versions)) + for src, vrs := range versions { + vdm = append(vdm, CreateVersion(src, vrs)) + } + + // return early for single entry + if len(vdm) == 1 { + return vdm + } + + // sort the list + sort.Sort(vdm) + + // traverse in reverse order and calculate delta between versions, leaving the first element as is + for i := len(vdm) - 1; i >= 1; i-- { + vdm[i].Value = vdm[i].Value - vdm[i-1].Value + } + return vdm +} + +// VersionsToDeltas will calculate deltas from the input map (pv or mv). Then will return the deltas in persisted format +func VersionsToDeltas(m map[string]uint64) []string { + if len(m) == 0 { + return nil + } + + var vrsList []string + deltas := VersionDeltas(m) + for _, delta := range deltas { + listItem := delta.StringForVersionDelta() + vrsList = append(vrsList, listItem) + } + + return vrsList +} + +// PersistedDeltasToMap converts the list of deltas in pv or mv from the bucket back from deltas into full versions in map format +func PersistedDeltasToMap(vvList []string) (map[string]uint64, error) { + vv := make(map[string]uint64) + if len(vvList) == 0 { + return vv, nil + } + + var lastEntryVersion uint64 + for _, v := range vvList { + timestampString, sourceBase64, found := strings.Cut(v, "@") + if !found { + return nil, fmt.Errorf("Malformed version string %s, delimiter not found", v) + } + ver, err := base.HexCasToUint64ForDelta([]byte(timestampString)) + if err != nil { + return nil, err + } + lastEntryVersion = ver + lastEntryVersion + vv[sourceBase64] = lastEntryVersion + } + return vv, nil +} + // CreateVersion creates an encoded sourceID and version pair func CreateVersion(source string, version uint64) Version { return Version{ @@ -38,6 +120,7 @@ func CreateVersion(source string, version uint64) Version { } } +// ParseVersion will parse source version pair from string format func ParseVersion(versionString string) (version Version, err error) { timestampString, sourceBase64, found := strings.Cut(versionString, "@") if !found { @@ -61,6 +144,13 @@ func (v Version) String() string { return strconv.FormatUint(v.Value, 16) + "@" + v.SourceID } +// StringForVersionDelta will take a version struct and convert the value to delta format +// (encoding it to LE hex, stripping any 0's off the end and stripping leading 0x) +func (v Version) StringForVersionDelta() string { + encodedVal := base.Uint64ToLittleEndianHexAndStripZeros(v.Value) + return encodedVal + "@" + v.SourceID +} + // ExtractCurrentVersionFromHLV will take the current version form the HLV struct and return it in the Version struct func (hlv *HybridLogicalVector) ExtractCurrentVersionFromHLV() *Version { src, vrs := hlv.GetCurrentVersion() @@ -68,8 +158,7 @@ func (hlv *HybridLogicalVector) ExtractCurrentVersionFromHLV() *Version { return &currVersion } -// PersistedHybridLogicalVector is the marshalled format of HybridLogicalVector. -// This representation needs to be kept in sync with XDCR. +// HybridLogicalVector is the in memory format for the hLv. type HybridLogicalVector struct { CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS in little endian hex format at the time of replication ImportCAS uint64 // Set when an import modifies the document CAS but preserves the HLV (import of a version replicated by XDCR) @@ -79,15 +168,6 @@ type HybridLogicalVector struct { PreviousVersions HLVVersions // map of previous versions for fast efficient lookup } -type BucketVector struct { - CurrentVersionCAS string `json:"cvCas,omitempty"` - ImportCAS string `json:"importCAS,omitempty"` - SourceID string `json:"src"` - Version string `json:"ver"` - MergeVersions map[string]string `json:"mv,omitempty"` - PreviousVersions map[string]string `json:"pv,omitempty"` -} - // NewHybridLogicalVector returns an initialised HybridLogicalVector. func NewHybridLogicalVector() HybridLogicalVector { return HybridLogicalVector{ @@ -447,91 +527,78 @@ func CreateEncodedSourceID(bucketUUID, clusterUUID string) (string, error) { } func (hlv HybridLogicalVector) MarshalJSON() ([]byte, error) { - var cvCasByteArray []byte - var importCASBytes []byte - var vrsCasByteArray []byte + type BucketVector struct { + CurrentVersionCAS string `json:"cvCas,omitempty"` + ImportCAS string `json:"importCAS,omitempty"` + SourceID string `json:"src"` + Version string `json:"ver"` + PV *[]string `json:"pv,omitempty"` + MV *[]string `json:"mv,omitempty"` + } + var cvCas string + var importCAS string + var vrsCas string + + var bucketHLV = BucketVector{} if hlv.CurrentVersionCAS != 0 { - cvCasByteArray = base.Uint64CASToLittleEndianHex(hlv.CurrentVersionCAS) + cvCas = base.CasToString(hlv.CurrentVersionCAS) + bucketHLV.CurrentVersionCAS = cvCas } if hlv.ImportCAS != 0 { - importCASBytes = base.Uint64CASToLittleEndianHex(hlv.ImportCAS) - } - if hlv.Version != 0 { - vrsCasByteArray = base.Uint64CASToLittleEndianHex(hlv.Version) + importCAS = base.CasToString(hlv.ImportCAS) + bucketHLV.ImportCAS = importCAS } + vrsCas = base.CasToString(hlv.Version) + bucketHLV.Version = vrsCas + bucketHLV.SourceID = hlv.SourceID - pvPersistedFormat, err := convertMapToPersistedFormat(hlv.PreviousVersions) - if err != nil { - return nil, err + pvPersistedFormat := VersionsToDeltas(hlv.PreviousVersions) + if len(pvPersistedFormat) > 0 { + bucketHLV.PV = &pvPersistedFormat } - mvPersistedFormat, err := convertMapToPersistedFormat(hlv.MergeVersions) - if err != nil { - return nil, err - } - - bucketVector := BucketVector{ - CurrentVersionCAS: string(cvCasByteArray), - ImportCAS: string(importCASBytes), - Version: string(vrsCasByteArray), - SourceID: hlv.SourceID, - MergeVersions: mvPersistedFormat, - PreviousVersions: pvPersistedFormat, + mvPersistedFormat := VersionsToDeltas(hlv.MergeVersions) + if len(mvPersistedFormat) > 0 { + bucketHLV.MV = &mvPersistedFormat } - return base.JSONMarshal(&bucketVector) + return base.JSONMarshal(&bucketHLV) } func (hlv *HybridLogicalVector) UnmarshalJSON(inputjson []byte) error { - persistedJSON := BucketVector{} - err := base.JSONUnmarshal(inputjson, &persistedJSON) + type BucketVector struct { + CurrentVersionCAS string `json:"cvCas,omitempty"` + ImportCAS string `json:"importCAS,omitempty"` + SourceID string `json:"src"` + Version string `json:"ver"` + PV *[]string `json:"pv,omitempty"` + MV *[]string `json:"mv,omitempty"` + } + var bucketDeltas BucketVector + err := base.JSONUnmarshal(inputjson, &bucketDeltas) if err != nil { return err } - // convert the data to in memory format - hlv.convertPersistedHLVToInMemoryHLV(persistedJSON) - return nil -} - -func (hlv *HybridLogicalVector) convertPersistedHLVToInMemoryHLV(persistedJSON BucketVector) { - hlv.CurrentVersionCAS = base.HexCasToUint64(persistedJSON.CurrentVersionCAS) - if persistedJSON.ImportCAS != "" { - hlv.ImportCAS = base.HexCasToUint64(persistedJSON.ImportCAS) + if bucketDeltas.CurrentVersionCAS != "" { + hlv.CurrentVersionCAS = base.HexCasToUint64(bucketDeltas.CurrentVersionCAS) } - hlv.SourceID = persistedJSON.SourceID - // convert the hex cas to uint64 cas - hlv.Version = base.HexCasToUint64(persistedJSON.Version) - // convert the maps form persisted format to the in memory format - hlv.PreviousVersions = convertMapToInMemoryFormat(persistedJSON.PreviousVersions) - hlv.MergeVersions = convertMapToInMemoryFormat(persistedJSON.MergeVersions) -} - -// convertMapToPersistedFormat will convert in memory map of previous versions or merge versions into the persisted format map -func convertMapToPersistedFormat(memoryMap map[string]uint64) (map[string]string, error) { - if memoryMap == nil { - return nil, nil - } - returnedMap := make(map[string]string) - var persistedCAS string - for source, cas := range memoryMap { - casByteArray := base.Uint64CASToLittleEndianHex(cas) - persistedCAS = string(casByteArray) - // remove the leading '0x' from the CAS value - persistedCAS = persistedCAS[2:] - returnedMap[source] = persistedCAS + if bucketDeltas.ImportCAS != "" { + hlv.ImportCAS = base.HexCasToUint64(bucketDeltas.ImportCAS) } - return returnedMap, nil -} - -// convertMapToInMemoryFormat will convert the persisted format map to an in memory format of that map. -// Used for previous versions and merge versions maps on HLV -func convertMapToInMemoryFormat(persistedMap map[string]string) map[string]uint64 { - if persistedMap == nil { - return nil + hlv.SourceID = bucketDeltas.SourceID + hlv.Version = base.HexCasToUint64(bucketDeltas.Version) + if bucketDeltas.PV != nil { + prevVersion, err := PersistedDeltasToMap(*bucketDeltas.PV) + if err != nil { + return err + } + hlv.PreviousVersions = prevVersion } - returnedMap := make(map[string]uint64) - // convert each CAS entry from little endian hex to Uint64 - for key, value := range persistedMap { - returnedMap[key] = base.HexCasToUint64(value) + if bucketDeltas.MV != nil { + mergeVersion, err := PersistedDeltasToMap(*bucketDeltas.MV) + if err != nil { + return err + } + hlv.MergeVersions = mergeVersion } - return returnedMap + return nil } diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index dacefd9506..a0fd0070a5 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -10,10 +10,12 @@ package db import ( "encoding/base64" + "math/rand/v2" "reflect" "strconv" "strings" "testing" + "time" sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" @@ -460,3 +462,121 @@ func TestParseCBLVersion(t *testing.T) { cblString := vrs.String() assert.Equal(t, vrsString, cblString) } + +// TestVersionDeltaCalculation: +// - Create some random versions and assign to a source/version map +// - Convert the map to deltas and assert that first item in list is greater than all other elements +// - Create a test HLV and convert it to persisted format in bytes +// - Convert this back to in memory format, assert each elem of in memory format previous versions map is the same as +// the corresponding element in the original pvMap +// - Do the same for a pv map that will have two entries with the same version value +// - Do the same as above but for nil maps +func TestVersionDeltaCalculation(t *testing.T) { + src1 := "src1" + src2 := "src2" + src3 := "src3" + src4 := "src4" + src5 := "src5" + + timeNow := time.Now().UnixNano() + // make some version deltas + v1 := uint64(timeNow - rand.Int64N(1000000000000)) + v2 := uint64(timeNow - rand.Int64N(1000000000000)) + v3 := uint64(timeNow - rand.Int64N(1000000000000)) + v4 := uint64(timeNow - rand.Int64N(1000000000000)) + v5 := uint64(timeNow - rand.Int64N(1000000000000)) + + // make map of source to version + pvMap := make(HLVVersions) + pvMap[src1] = v1 + pvMap[src2] = v2 + pvMap[src3] = v3 + pvMap[src4] = v4 + pvMap[src5] = v5 + + // convert to version delta map assert that first element is larger than all other elements + deltas := VersionDeltas(pvMap) + assert.Greater(t, deltas[0].Value, deltas[1].Value) + assert.Greater(t, deltas[0].Value, deltas[2].Value) + assert.Greater(t, deltas[0].Value, deltas[3].Value) + assert.Greater(t, deltas[0].Value, deltas[4].Value) + + // create a test hlv + inputHLVA := []string{"cluster3@2"} + hlv := createHLVForTest(t, inputHLVA) + hlv.PreviousVersions = pvMap + expSrc := hlv.SourceID + expVal := hlv.Version + expCas := hlv.CurrentVersionCAS + + // convert hlv to persisted format + vvXattr, err := base.JSONMarshal(&hlv) + require.NoError(t, err) + + // convert the bytes back to an in memory format of hlv + memHLV := NewHybridLogicalVector() + err = base.JSONUnmarshal(vvXattr, &memHLV) + require.NoError(t, err) + + assert.Equal(t, pvMap[src1], memHLV.PreviousVersions[src1]) + assert.Equal(t, pvMap[src2], memHLV.PreviousVersions[src2]) + assert.Equal(t, pvMap[src3], memHLV.PreviousVersions[src3]) + assert.Equal(t, pvMap[src4], memHLV.PreviousVersions[src4]) + assert.Equal(t, pvMap[src5], memHLV.PreviousVersions[src5]) + + // assert that the other elements are as expected + assert.Equal(t, expSrc, memHLV.SourceID) + assert.Equal(t, expVal, memHLV.Version) + assert.Equal(t, expCas, memHLV.CurrentVersionCAS) + assert.Len(t, memHLV.MergeVersions, 0) + + // test hlv with two pv version entries that are equal to each other + hlv = createHLVForTest(t, inputHLVA) + // make src3 have the same version value as src2 + pvMap[src3] = pvMap[src2] + hlv.PreviousVersions = pvMap + + // convert hlv to persisted format + vvXattr, err = base.JSONMarshal(&hlv) + require.NoError(t, err) + + // convert the bytes back to an in memory format of hlv + memHLV = NewHybridLogicalVector() + err = base.JSONUnmarshal(vvXattr, &memHLV) + require.NoError(t, err) + + assert.Equal(t, pvMap[src1], memHLV.PreviousVersions[src1]) + assert.Equal(t, pvMap[src2], memHLV.PreviousVersions[src2]) + assert.Equal(t, pvMap[src3], memHLV.PreviousVersions[src3]) + assert.Equal(t, pvMap[src4], memHLV.PreviousVersions[src4]) + assert.Equal(t, pvMap[src5], memHLV.PreviousVersions[src5]) + + // assert that the other elements are as expected + assert.Equal(t, expSrc, memHLV.SourceID) + assert.Equal(t, expVal, memHLV.Version) + assert.Equal(t, expCas, memHLV.CurrentVersionCAS) + assert.Len(t, memHLV.MergeVersions, 0) + + // test hlv with nil merge versions and nil previous versions to test panic safe + pvMap = nil + hlv2 := createHLVForTest(t, inputHLVA) + hlv2.PreviousVersions = pvMap + hlv2.MergeVersions = nil + deltas = VersionDeltas(pvMap) + assert.Nil(t, deltas) + + // construct byte array from hlv + vvXattr, err = base.JSONMarshal(&hlv2) + require.NoError(t, err) + // convert the bytes back to an in memory format of hlv + memHLV = HybridLogicalVector{} + err = base.JSONUnmarshal(vvXattr, &memHLV) + require.NoError(t, err) + + // assert in memory hlv is as expected + assert.Equal(t, expSrc, memHLV.SourceID) + assert.Equal(t, expVal, memHLV.Version) + assert.Equal(t, expCas, memHLV.CurrentVersionCAS) + assert.Len(t, memHLV.PreviousVersions, 0) + assert.Len(t, memHLV.MergeVersions, 0) +} diff --git a/db/utilities_hlv_testing.go b/db/utilities_hlv_testing.go index 477e82dab2..ae4c2631ed 100644 --- a/db/utilities_hlv_testing.go +++ b/db/utilities_hlv_testing.go @@ -63,6 +63,27 @@ func (h *HLVAgent) InsertWithHLV(ctx context.Context, key string) (casOut uint64 return cas } +// UpdateWithHLV will update and existing doc in bucket mocking write from another hlv aware peer +func (h *HLVAgent) UpdateWithHLV(ctx context.Context, key string, inputCas uint64, hlv *HybridLogicalVector) (casOut uint64) { + err := hlv.AddVersion(CreateVersion(h.Source, expandMacroCASValueUint64)) + require.NoError(h.t, err) + hlv.CurrentVersionCAS = expandMacroCASValueUint64 + + vvXattr, err := hlv.MarshalJSON() + require.NoError(h.t, err) + mutateInOpts := &sgbucket.MutateInOptions{ + MacroExpansion: hlv.computeMacroExpansions(), + } + + docBody := base.MustJSONMarshal(h.t, defaultHelperBody) + xattrData := map[string][]byte{ + h.xattrName: vvXattr, + } + cas, err := h.datastore.WriteWithXattrs(ctx, key, 0, inputCas, docBody, xattrData, nil, mutateInOpts) + require.NoError(h.t, err) + return cas +} + // EncodeTestVersion converts a simplified string version of the form 1@abc to a hex-encoded version and base64 encoded // source, like 169a05acd705ffc0@YWJj. Allows use of simplified versions in tests for readability, ease of use. func EncodeTestVersion(versionString string) (encodedString string) { diff --git a/rest/api_test.go b/rest/api_test.go index f8f58b5f2e..bc554c10fe 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -2801,6 +2801,62 @@ func TestCreateDBWithXattrsDisbaled(t *testing.T) { assert.Contains(t, resp.Body.String(), errResp) } +// TestPvDeltaReadAndWrite: +// - Write a doc from another hlv aware peer to the bucket +// - Force import of this doc, then update this doc via rest tester source +// - Assert that the document hlv is as expected +// - Update the doc from a new hlv aware peer and force the import of this new write +// - Assert that the new hlv is as expected, testing that the hlv went through transformation to the persisted delta +// version and back to the in memory version as expected +func TestPvDeltaReadAndWrite(t *testing.T) { + rt := NewRestTester(t, nil) + defer rt.Close() + collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() + testSource := rt.GetDatabase().EncodedSourceID + + const docID = "doc1" + otherSource := "otherSource" + hlvHelper := db.NewHLVAgent(t, rt.GetSingleDataStore(), otherSource, "_vv") + existingHLVKey := docID + cas := hlvHelper.InsertWithHLV(ctx, existingHLVKey) + casV1 := cas + encodedSourceV1 := db.EncodeSource(otherSource) + + // force import of this write + version1, _ := rt.GetDoc(docID) + + // update the above doc, this should push CV to PV and adds a new CV + version2 := rt.UpdateDocDirectly(docID, version1, db.Body{"new": "update!"}) + newDoc, _, err := collection.GetDocWithXattrs(ctx, existingHLVKey, db.DocUnmarshalAll) + require.NoError(t, err) + casV2 := newDoc.Cas + encodedSourceV2 := testSource + + // assert that we have a prev CV drop to pv and a new CV pair, assert pv values are as expected after delta conversions + assert.Equal(t, testSource, newDoc.HLV.SourceID) + assert.Equal(t, version2.CV.Value, newDoc.HLV.Version) + assert.Len(t, newDoc.HLV.PreviousVersions, 1) + assert.Equal(t, casV1, newDoc.HLV.PreviousVersions[encodedSourceV1]) + + otherSource = "diffSource" + hlvHelper = db.NewHLVAgent(t, rt.GetSingleDataStore(), otherSource, "_vv") + cas = hlvHelper.UpdateWithHLV(ctx, existingHLVKey, newDoc.Cas, newDoc.HLV) + encodedSourceV3 := db.EncodeSource(otherSource) + casV3 := cas + + // import and get raw doc + _, _ = rt.GetDoc(docID) + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalAll) + require.NoError(t, err) + + // assert that we have two entries in previous versions, and they are correctly converted from deltas back to full value + assert.Equal(t, encodedSourceV3, bucketDoc.HLV.SourceID) + assert.Equal(t, casV3, bucketDoc.HLV.Version) + assert.Len(t, bucketDoc.HLV.PreviousVersions, 2) + assert.Equal(t, casV1, bucketDoc.HLV.PreviousVersions[encodedSourceV1]) + assert.Equal(t, casV2, bucketDoc.HLV.PreviousVersions[encodedSourceV2]) +} + // TestPutDocUpdateVersionVector: // - Put a doc and assert that the versions and the source for the hlv is correctly updated // - Update that doc and assert HLV has also been updated