Skip to content

Commit 5b21248

Browse files
committed
[ENH]: move collection hard deletes to garbage collector
1 parent c570721 commit 5b21248

File tree

20 files changed

+632
-516
lines changed

20 files changed

+632
-516
lines changed

go/cmd/coordinator/cmd.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,6 @@ func init() {
4444
Cmd.Flags().IntVar(&conf.DBConfig.MaxOpenConns, "max-open-conns", 10, "MetaTable max open connections")
4545
Cmd.Flags().StringVar(&conf.DBConfig.SslMode, "ssl-mode", "disable", "SSL mode for database connection")
4646

47-
// Soft deletes
48-
Cmd.Flags().BoolVar(&conf.SoftDeleteEnabled, "soft-delete-enabled", true, "Enable soft deletes")
49-
Cmd.Flags().DurationVar(&conf.SoftDeleteCleanupInterval, "soft-delete-cleanup-interval", 30*time.Second, "Soft delete cleanup interval")
50-
Cmd.Flags().DurationVar(&conf.SoftDeleteMaxAge, "soft-delete-max-age", 72*time.Hour, "Soft delete max age")
51-
Cmd.Flags().UintVar(&conf.SoftDeleteCleanupBatchSize, "soft-delete-cleanup-batch-size", 100, "Soft delete cleanup batch size")
52-
// With above values, the soft delete cleaner can remove around 1000 collections in 5 minutes.
53-
5447
// Memberlist
5548
Cmd.Flags().StringVar(&conf.KubernetesNamespace, "kubernetes-namespace", "chroma", "Kubernetes namespace")
5649
Cmd.Flags().DurationVar(&conf.ReconcileInterval, "reconcile-interval", 100*time.Millisecond, "Reconcile interval")

go/pkg/common/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var (
3030
ErrCollectionTooManyFork = errors.New("collection entry has too many forks")
3131
ErrCollectionDeletedWithLocksHeld = errors.New("collection got deleted concurrently even though select for update locks were held. Not possible unless corruption somehow")
3232
ErrMissingLineageFileName = errors.New("missing lineage file name in root collection entry")
33+
ErrCollectionWasNotSoftDeleted = errors.New("collection was not soft deleted")
3334

3435
// Collection metadata errors
3536
ErrUnknownCollectionMetadataType = errors.New("collection metadata value type not supported")

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,18 @@ import (
1515
"go.uber.org/zap"
1616
)
1717

18-
// DeleteMode represents whether to perform a soft or hard delete
19-
type DeleteMode int
20-
21-
const (
22-
// SoftDelete marks records as deleted but keeps them in the database
23-
SoftDelete DeleteMode = iota
24-
// HardDelete permanently removes records from the database
25-
HardDelete
26-
)
27-
2818
// Coordinator is the top level component.
2919
// Currently, it only has the system catalog related APIs and will be extended to
3020
// support other functionalities such as membership managed and propagation.
3121
type Coordinator struct {
3222
ctx context.Context
3323
catalog Catalog
34-
deleteMode DeleteMode
3524
objectStore *s3metastore.S3MetaStore
3625
}
3726

38-
func NewCoordinator(ctx context.Context, deleteMode DeleteMode, objectStore *s3metastore.S3MetaStore, versionFileEnabled bool) (*Coordinator, error) {
27+
func NewCoordinator(ctx context.Context, objectStore *s3metastore.S3MetaStore, versionFileEnabled bool) (*Coordinator, error) {
3928
s := &Coordinator{
4029
ctx: ctx,
41-
deleteMode: deleteMode,
4230
objectStore: objectStore,
4331
}
4432

@@ -142,14 +130,11 @@ func (s *Coordinator) GetSoftDeletedCollections(ctx context.Context, collectionI
142130
return s.catalog.GetSoftDeletedCollections(ctx, collectionID, tenantID, databaseName, limit)
143131
}
144132

145-
func (s *Coordinator) DeleteCollection(ctx context.Context, deleteCollection *model.DeleteCollection) error {
146-
if s.deleteMode == SoftDelete {
147-
return s.catalog.DeleteCollection(ctx, deleteCollection, true)
148-
}
149-
return s.catalog.DeleteCollection(ctx, deleteCollection, false)
133+
func (s *Coordinator) SoftDeleteCollection(ctx context.Context, deleteCollection *model.DeleteCollection) error {
134+
return s.catalog.DeleteCollection(ctx, deleteCollection, true)
150135
}
151136

152-
func (s *Coordinator) CleanupSoftDeletedCollection(ctx context.Context, deleteCollection *model.DeleteCollection) error {
137+
func (s *Coordinator) FinishCollectionDeletion(ctx context.Context, deleteCollection *model.DeleteCollection) error {
153138
return s.catalog.DeleteCollection(ctx, deleteCollection, false)
154139
}
155140

@@ -267,7 +252,6 @@ func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, re
267252
return s.catalog.BatchGetCollectionVersionFilePaths(ctx, req.CollectionIds)
268253
}
269254

270-
// SetDeleteMode sets the delete mode for testing
271-
func (c *Coordinator) SetDeleteMode(mode DeleteMode) {
272-
c.deleteMode = mode
255+
func (s *Coordinator) BatchGetCollectionSoftDeleteStatus(ctx context.Context, req *coordinatorpb.BatchGetCollectionSoftDeleteStatusRequest) (*coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse, error) {
256+
return s.catalog.BatchGetCollectionSoftDeleteStatus(ctx, req.CollectionIds)
273257
}

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ func (tc *Catalog) hardDeleteCollection(ctx context.Context, deleteCollection *m
555555
return tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
556556
collectionID := deleteCollection.ID
557557

558-
collectionEntry, err := tc.metaDomain.CollectionDb(txCtx).GetCollectionWithoutMetadata(types.FromUniqueID(collectionID), &deleteCollection.DatabaseName, nil)
558+
collectionEntry, err := tc.metaDomain.CollectionDb(txCtx).GetCollectionWithoutMetadata(types.FromUniqueID(collectionID), &deleteCollection.DatabaseName, nil) // todo: should filter for soft deleted
559559
if err != nil {
560560
return err
561561
}
@@ -564,6 +564,51 @@ func (tc *Catalog) hardDeleteCollection(ctx context.Context, deleteCollection *m
564564
return common.ErrCollectionDeleteNonExistingCollection
565565
}
566566

567+
if !collectionEntry.IsDeleted {
568+
return common.ErrCollectionWasNotSoftDeleted
569+
}
570+
571+
if collectionEntry.RootCollectionId != nil {
572+
rootCollection, err := tc.metaDomain.CollectionDb(txCtx).GetCollectionWithoutMetadata(collectionEntry.RootCollectionId, nil, nil)
573+
if err != nil {
574+
return err
575+
}
576+
if rootCollection == nil {
577+
return errors.New("root collection not found")
578+
}
579+
580+
if rootCollection.LineageFileName == nil {
581+
return errors.New("lineage file name is nil on root collection")
582+
}
583+
584+
lineageFile, err := tc.getLineageFile(txCtx, rootCollection.LineageFileName)
585+
if err != nil {
586+
return err
587+
}
588+
// filter out this collection
589+
updatedDependencies := make([]*coordinatorpb.CollectionVersionDependency, 0)
590+
for _, dependency := range lineageFile.Dependencies {
591+
if dependency.TargetCollectionId != deleteCollection.ID.String() {
592+
updatedDependencies = append(updatedDependencies, dependency)
593+
}
594+
}
595+
lineageFile.Dependencies = updatedDependencies
596+
597+
newLineageFileId, err := uuid.NewV7()
598+
if err != nil {
599+
return err
600+
}
601+
602+
log.Info("new lineage file id", zap.String("newLineageFileId", newLineageFileId.String()))
603+
604+
newLineageFileFullName, err := tc.s3Store.PutLineageFile(collectionEntry.Tenant, collectionEntry.DatabaseID, rootCollection.ID, fmt.Sprintf("%s.binpb", newLineageFileId.String()), lineageFile)
605+
if err != nil {
606+
return err
607+
}
608+
609+
tc.metaDomain.CollectionDb(txCtx).UpdateCollectionLineageFilePath(rootCollection.ID, rootCollection.LineageFileName, newLineageFileFullName)
610+
}
611+
567612
// Delete collection and collection metadata.
568613
collectionDeletedCount, err := tc.metaDomain.CollectionDb(txCtx).DeleteCollectionByID(collectionID.String())
569614
if err != nil {
@@ -1960,7 +2005,7 @@ func (tc *Catalog) MarkVersionForDeletion(ctx context.Context, req *coordinatorp
19602005

19612006
func (tc *Catalog) updateProtoRemoveVersionEntries(versionFilePb *coordinatorpb.CollectionVersionFile, versions []int64) error {
19622007
// Check if version history exists
1963-
if versionFilePb.GetVersionHistory() == nil || len(versionFilePb.GetVersionHistory().Versions) == 0 {
2008+
if versionFilePb.GetVersionHistory() == nil {
19642009
log.Error("version history not found")
19652010
return errors.New("version history not found")
19662011
}
@@ -1996,14 +2041,14 @@ func (tc *Catalog) getNumberOfActiveVersions(versionFilePb *coordinatorpb.Collec
19962041
return len(activeVersions)
19972042
}
19982043

1999-
func (tc *Catalog) getOldestVersionTs(versionFilePb *coordinatorpb.CollectionVersionFile) time.Time {
2044+
func (tc *Catalog) getOldestVersionTs(versionFilePb *coordinatorpb.CollectionVersionFile) *time.Time {
20002045
if versionFilePb.GetVersionHistory() == nil || len(versionFilePb.GetVersionHistory().Versions) == 0 {
2001-
// Returning a zero timestamp that represents an unset value.
2002-
return time.Time{}
2046+
return nil
20032047
}
20042048
oldestVersionTs := versionFilePb.GetVersionHistory().Versions[0].CreatedAtSecs
20052049

2006-
return time.Unix(oldestVersionTs, 0)
2050+
ts := time.Unix(oldestVersionTs, 0)
2051+
return &ts
20072052
}
20082053

20092054
func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenantID string, collectionID string, versions []int64) error {
@@ -2017,8 +2062,8 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20172062

20182063
// Read the existing version file
20192064
collectionIDPtr := &collectionID
2020-
isDeleted := false
2021-
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionWithoutMetadata(collectionIDPtr, nil, &isDeleted)
2065+
// isDeleted := true // todo
2066+
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionWithoutMetadata(collectionIDPtr, nil, nil) // todo
20222067
if err != nil {
20232068
return err
20242069
}
@@ -2038,14 +2083,19 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20382083
}
20392084

20402085
numActiveVersions := tc.getNumberOfActiveVersions(versionFilePb)
2041-
if numActiveVersions < 1 {
2086+
if numActiveVersions < 1 && !collectionEntry.IsDeleted {
20422087
// No remaining valid versions after GC.
20432088
return errors.New("no valid versions after gc")
20442089
}
20452090

20462091
// Get the creation time of the oldest version.
20472092
oldestVersionTs := tc.getOldestVersionTs(versionFilePb)
2048-
if oldestVersionTs.IsZero() {
2093+
if oldestVersionTs == nil {
2094+
if !collectionEntry.IsDeleted {
2095+
// todo
2096+
return errors.New("oldest version timestamp is nil after GC")
2097+
}
2098+
} else if oldestVersionTs.IsZero() {
20492099
// This should never happen.
20502100
log.Error("oldest version timestamp is zero after GC.", zap.String("collection_id", collectionID))
20512101
// No versions to delete.
@@ -2065,7 +2115,7 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20652115
}
20662116

20672117
// Update the version file name in Postgres table as a CAS operation
2068-
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVerFileFullPath, &oldestVersionTs, &numActiveVersions)
2118+
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVerFileFullPath, oldestVersionTs, &numActiveVersions)
20692119
if err != nil {
20702120
// Delete the newly created version file from S3 since it is not needed
20712121
tc.s3Store.DeleteVersionFile(tenantID, collectionEntry.DatabaseID, collectionID, newVersionFileName)
@@ -2113,6 +2163,20 @@ func (tc *Catalog) BatchGetCollectionVersionFilePaths(ctx context.Context, colle
21132163
return &result, nil
21142164
}
21152165

2166+
func (tc *Catalog) BatchGetCollectionSoftDeleteStatus(ctx context.Context, collectionIds []string) (*coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse, error) {
2167+
result := coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse{
2168+
CollectionIdToIsSoftDeleted: make(map[string]bool),
2169+
}
2170+
2171+
status, err := tc.metaDomain.CollectionDb(ctx).BatchGetCollectionSoftDeleteStatus(collectionIds)
2172+
if err != nil {
2173+
return nil, err
2174+
}
2175+
result.CollectionIdToIsSoftDeleted = status
2176+
2177+
return &result, nil
2178+
}
2179+
21162180
func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantID string, collectionID string) (string, error) {
21172181
collectionIDPtr := &collectionID
21182182
isDeleted := false

go/pkg/sysdb/grpc/cleanup.go

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)