Skip to content

Commit

Permalink
CBG-4210: Attachment metadata migration background job (#7125)
Browse files Browse the repository at this point in the history
* CBG-4210: new background task to migrate attachment metadata from sync data to global sync data

* update to test

* lint error

* more lint

* minor test tweak

* updates to not process docs with no attachments to migrate

* updates to add new test

* updated based off review

* comment updates

* address review

* remove commneted out code

* updates from review
  • Loading branch information
gregns1 authored Oct 7, 2024
1 parent af62933 commit 6547f7d
Show file tree
Hide file tree
Showing 14 changed files with 751 additions and 47 deletions.
44 changes: 38 additions & 6 deletions base/constants_syncdocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ func CollectionSyncFunctionKeyWithGroupID(groupID string, scopeName, collectionN

// SyncInfo documents are stored in collections to identify the metadataID associated with sync metadata in that collection
type SyncInfo struct {
MetadataID string `json:"metadataID"`
MetadataID string `json:"metadataID,omitempty"`
MetaDataVersion string `json:"metadata_version,omitempty"`
}

// initSyncInfo attempts to initialize syncInfo for a datastore
Expand Down Expand Up @@ -412,17 +413,48 @@ func InitSyncInfo(ds DataStore, metadataID string) (requiresResync bool, err err
return syncInfo.MetadataID != metadataID, nil
}

// SetSyncInfo sets syncInfo in a DataStore to the specified metadataID
func SetSyncInfo(ds DataStore, metadataID string) error {
// SetSyncInfoMetadataID sets syncInfo in a DataStore to the specified metadataID, preserving metadata version if present
func SetSyncInfoMetadataID(ds DataStore, metadataID string) error {

// If the metadataID isn't defined, don't persist SyncInfo. Defensive handling for legacy use cases.
if metadataID == "" {
return nil
}
syncInfo := &SyncInfo{
MetadataID: metadataID,
_, err := ds.Update(SGSyncInfo, 0, func(current []byte) (updated []byte, expiry *uint32, delete bool, err error) {
var syncInfo SyncInfo
if current != nil {
parseErr := JSONUnmarshal(current, &syncInfo)
if parseErr != nil {
return nil, nil, false, parseErr
}
}
// if we have a metadataID to set, set it preserving the metadata version if present
syncInfo.MetadataID = metadataID
bytes, err := JSONMarshal(&syncInfo)
return bytes, nil, false, err
})
return err
}

// SetSyncInfoMetaVersion sets sync info in DataStore to specified metadata version, preserving metadataID if present
func SetSyncInfoMetaVersion(ds DataStore, metaVersion string) error {
if metaVersion == "" {
return nil
}
return ds.Set(SGSyncInfo, 0, nil, syncInfo)
_, err := ds.Update(SGSyncInfo, 0, func(current []byte) (updated []byte, expiry *uint32, delete bool, err error) {
var syncInfo SyncInfo
if current != nil {
parseErr := JSONUnmarshal(current, &syncInfo)
if parseErr != nil {
return nil, nil, false, parseErr
}
}
// if we have a meta version to set, set it preserving the metadata ID if present
syncInfo.MetaDataVersion = metaVersion
bytes, err := JSONMarshal(&syncInfo)
return bytes, nil, false, err
})
return err
}

// SerializeIfLonger returns name as a sha1 string if the length of the name is greater or equal to the length specificed. Otherwise, returns the original string.
Expand Down
2 changes: 1 addition & 1 deletion db/background_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type BackgroundManagerStatus struct {
}

// BackgroundManagerProcessI is an interface satisfied by any of the background processes
// Examples of this: ReSync, Compaction
// Examples of this: ReSync, Compaction, Attachment Migration
type BackgroundManagerProcessI interface {
Init(ctx context.Context, options map[string]interface{}, clusterStatus []byte) error
Run(ctx context.Context, options map[string]interface{}, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error
Expand Down
20 changes: 2 additions & 18 deletions db/background_mgr_attachment_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,23 +102,6 @@ func (a *AttachmentCompactionManager) Init(ctx context.Context, options map[stri
return newRunInit()
}

func (a *AttachmentCompactionManager) PurgeDCPMetadata(ctx context.Context, datastore base.DataStore, database *Database, metadataKeyPrefix string) error {

bucket, err := base.AsGocbV2Bucket(database.Bucket)
if err != nil {
return err
}
numVbuckets, err := bucket.GetMaxVbno()
if err != nil {
return err
}

metadata := base.NewDCPMetadataCS(ctx, datastore, numVbuckets, base.DefaultNumWorkers, metadataKeyPrefix)
base.InfofCtx(ctx, base.KeyDCP, "purging persisted dcp metadata for attachment compaction run %s", a.CompactID)
metadata.Purge(ctx, base.DefaultNumWorkers)
return nil
}

func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[string]interface{}, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error {
database := options["database"].(*Database)

Expand Down Expand Up @@ -204,7 +187,8 @@ func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ct
if errors.As(err, &rollbackErr) || errors.Is(err, base.ErrVbUUIDMismatch) {
base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase)
// to rollback any phase for attachment compaction we need to purge all persisted dcp metadata
err = a.PurgeDCPMetadata(ctx, dataStore, database, keyPrefix)
base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID)
err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, keyPrefix, a.CompactID)
if err != nil {
base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err)
return false, err
Expand Down
Loading

0 comments on commit 6547f7d

Please sign in to comment.