Skip to content

Commit 716f29a

Browse files
committed
[ENH]: sysdb changes to support moving collection hard deletes to garbage collector
1 parent 8f20043 commit 716f29a

File tree

16 files changed

+227
-439
lines changed

16 files changed

+227
-439
lines changed

go/cmd/coordinator/cmd.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,6 @@ func init() {
4444
Cmd.Flags().IntVar(&conf.DBConfig.MaxOpenConns, "max-open-conns", 10, "MetaTable max open connections")
4545
Cmd.Flags().StringVar(&conf.DBConfig.SslMode, "ssl-mode", "disable", "SSL mode for database connection")
4646

47-
// Soft deletes
48-
Cmd.Flags().BoolVar(&conf.SoftDeleteEnabled, "soft-delete-enabled", true, "Enable soft deletes")
49-
Cmd.Flags().DurationVar(&conf.SoftDeleteCleanupInterval, "soft-delete-cleanup-interval", 30*time.Second, "Soft delete cleanup interval")
50-
Cmd.Flags().DurationVar(&conf.SoftDeleteMaxAge, "soft-delete-max-age", 72*time.Hour, "Soft delete max age")
51-
Cmd.Flags().UintVar(&conf.SoftDeleteCleanupBatchSize, "soft-delete-cleanup-batch-size", 100, "Soft delete cleanup batch size")
52-
// With above values, the soft delete cleaner can remove around 1000 collections in 5 minutes.
53-
5447
// Memberlist
5548
Cmd.Flags().StringVar(&conf.KubernetesNamespace, "kubernetes-namespace", "chroma", "Kubernetes namespace")
5649
Cmd.Flags().DurationVar(&conf.ReconcileInterval, "reconcile-interval", 100*time.Millisecond, "Reconcile interval")

go/pkg/common/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var (
3030
ErrCollectionTooManyFork = errors.New("collection entry has too many forks")
3131
ErrCollectionDeletedWithLocksHeld = errors.New("collection got deleted concurrently even though select for update locks were held. Not possible unless corruption somehow")
3232
ErrMissingLineageFileName = errors.New("missing lineage file name in root collection entry")
33+
ErrCollectionWasNotSoftDeleted = errors.New("collection was not soft deleted")
3334

3435
// Collection metadata errors
3536
ErrUnknownCollectionMetadataType = errors.New("collection metadata value type not supported")

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,18 @@ import (
1515
"go.uber.org/zap"
1616
)
1717

18-
// DeleteMode represents whether to perform a soft or hard delete
19-
type DeleteMode int
20-
21-
const (
22-
// SoftDelete marks records as deleted but keeps them in the database
23-
SoftDelete DeleteMode = iota
24-
// HardDelete permanently removes records from the database
25-
HardDelete
26-
)
27-
2818
// Coordinator is the top level component.
2919
// Currently, it only has the system catalog related APIs and will be extended to
3020
// support other functionalities such as membership managed and propagation.
3121
type Coordinator struct {
3222
ctx context.Context
3323
catalog Catalog
34-
deleteMode DeleteMode
3524
objectStore *s3metastore.S3MetaStore
3625
}
3726

38-
func NewCoordinator(ctx context.Context, deleteMode DeleteMode, objectStore *s3metastore.S3MetaStore, versionFileEnabled bool) (*Coordinator, error) {
27+
func NewCoordinator(ctx context.Context, objectStore *s3metastore.S3MetaStore, versionFileEnabled bool) (*Coordinator, error) {
3928
s := &Coordinator{
4029
ctx: ctx,
41-
deleteMode: deleteMode,
4230
objectStore: objectStore,
4331
}
4432

@@ -142,14 +130,11 @@ func (s *Coordinator) GetSoftDeletedCollections(ctx context.Context, collectionI
142130
return s.catalog.GetSoftDeletedCollections(ctx, collectionID, tenantID, databaseName, limit)
143131
}
144132

145-
func (s *Coordinator) DeleteCollection(ctx context.Context, deleteCollection *model.DeleteCollection) error {
146-
if s.deleteMode == SoftDelete {
147-
return s.catalog.DeleteCollection(ctx, deleteCollection, true)
148-
}
149-
return s.catalog.DeleteCollection(ctx, deleteCollection, false)
133+
func (s *Coordinator) SoftDeleteCollection(ctx context.Context, deleteCollection *model.DeleteCollection) error {
134+
return s.catalog.DeleteCollection(ctx, deleteCollection, true)
150135
}
151136

152-
func (s *Coordinator) CleanupSoftDeletedCollection(ctx context.Context, deleteCollection *model.DeleteCollection) error {
137+
func (s *Coordinator) FinishCollectionDeletion(ctx context.Context, deleteCollection *model.DeleteCollection) error {
153138
return s.catalog.DeleteCollection(ctx, deleteCollection, false)
154139
}
155140

@@ -267,7 +252,6 @@ func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, re
267252
return s.catalog.BatchGetCollectionVersionFilePaths(ctx, req.CollectionIds)
268253
}
269254

270-
// SetDeleteMode sets the delete mode for testing
271-
func (c *Coordinator) SetDeleteMode(mode DeleteMode) {
272-
c.deleteMode = mode
255+
func (s *Coordinator) BatchGetCollectionSoftDeleteStatus(ctx context.Context, req *coordinatorpb.BatchGetCollectionSoftDeleteStatusRequest) (*coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse, error) {
256+
return s.catalog.BatchGetCollectionSoftDeleteStatus(ctx, req.CollectionIds)
273257
}

go/pkg/sysdb/coordinator/coordinator_test.go

Lines changed: 15 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (suite *APIsTestSuite) SetupTest() {
8080
collection.Name = "collection_" + suite.T().Name() + strconv.Itoa(index)
8181
}
8282
ctx := context.Background()
83-
c, err := NewCoordinator(ctx, SoftDelete, suite.s3MetaStore, true)
83+
c, err := NewCoordinator(ctx, suite.s3MetaStore, true)
8484
if err != nil {
8585
suite.T().Fatalf("error creating coordinator: %v", err)
8686
}
@@ -111,7 +111,7 @@ func (suite *APIsTestSuite) TearDownTest() {
111111
func testCollection(t *rapid.T) {
112112
dbcore.ConfigDatabaseForTesting()
113113
ctx := context.Background()
114-
c, err := NewCoordinator(ctx, HardDelete, nil, false)
114+
c, err := NewCoordinator(ctx, nil, false)
115115
if err != nil {
116116
t.Fatalf("error creating coordinator: %v", err)
117117
}
@@ -164,7 +164,7 @@ func testCollection(t *rapid.T) {
164164
func testSegment(t *rapid.T) {
165165
dbcore.ConfigDatabaseForTesting()
166166
ctx := context.Background()
167-
c, err := NewCoordinator(ctx, HardDelete, nil, false)
167+
c, err := NewCoordinator(ctx, nil, false)
168168
if err != nil {
169169
t.Fatalf("error creating coordinator: %v", err)
170170
}
@@ -486,7 +486,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteCollections() {
486486
DatabaseName: suite.databaseName,
487487
TenantID: suite.tenantName,
488488
}
489-
err = suite.coordinator.DeleteCollection(ctx, deleteCollection)
489+
err = suite.coordinator.SoftDeleteCollection(ctx, deleteCollection)
490490
suite.NoError(err)
491491

492492
results, err = suite.coordinator.GetCollections(ctx, types.NilUniqueID(), nil, suite.tenantName, suite.databaseName, nil, nil)
@@ -508,7 +508,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteCollections() {
508508
suite.Empty(byIDResult)
509509

510510
// Duplicate delete throws an exception
511-
err = suite.coordinator.DeleteCollection(ctx, deleteCollection)
511+
err = suite.coordinator.SoftDeleteCollection(ctx, deleteCollection)
512512
suite.Error(err)
513513

514514
// Re-create the deleted collection
@@ -560,7 +560,7 @@ func (suite *APIsTestSuite) TestCreateGetDeleteCollections() {
560560
DatabaseName: suite.databaseName,
561561
TenantID: suite.tenantName,
562562
}
563-
err = suite.coordinator.DeleteCollection(ctx, deleteCollection)
563+
err = suite.coordinator.SoftDeleteCollection(ctx, deleteCollection)
564564
suite.NoError(err)
565565

566566
// Verify collection and segment were deleted
@@ -572,68 +572,11 @@ func (suite *APIsTestSuite) TestCreateGetDeleteCollections() {
572572
segments, err = suite.coordinator.GetSegments(ctx, segment.ID, nil, nil, createCollection.ID)
573573
suite.NoError(err)
574574
suite.NotEmpty(segments)
575-
suite.coordinator.deleteMode = HardDelete
576-
err = suite.coordinator.DeleteCollection(ctx, deleteCollection)
575+
err = suite.coordinator.FinishCollectionDeletion(ctx, deleteCollection)
577576
suite.NoError(err)
578577
segments, err = suite.coordinator.GetSegments(ctx, segment.ID, nil, nil, createCollection.ID)
579578
suite.NoError(err)
580579
suite.Empty(segments)
581-
582-
// Check for forward and backward compatibility with soft and hard delete.
583-
// 1. Create a collection with soft delete enabled.
584-
// 2. Delete the collection (i.e. it will be marked as is_deleted)
585-
// 3. Disable soft delete.
586-
// 4. Query for the deleted collection. It should not be found.
587-
// 5. Enable soft delete.
588-
// 6. Query for the deleted collection. It should be found.
589-
590-
suite.coordinator.deleteMode = SoftDelete
591-
collectionId := types.NewUniqueID()
592-
suite.coordinator.CreateCollection(ctx, &model.CreateCollection{
593-
ID: collectionId,
594-
Name: "test_coll_fwd_bkwd_compat",
595-
TenantID: suite.tenantName,
596-
DatabaseName: suite.databaseName,
597-
})
598-
suite.coordinator.DeleteCollection(ctx, &model.DeleteCollection{
599-
ID: collectionId,
600-
DatabaseName: suite.databaseName,
601-
TenantID: suite.tenantName,
602-
})
603-
collection, err := suite.coordinator.GetCollections(ctx, collectionId, nil, suite.tenantName, suite.databaseName, nil, nil)
604-
suite.NoError(err)
605-
// Check that the collection is deleted
606-
suite.Empty(collection)
607-
// Toggle the mode.
608-
suite.coordinator.deleteMode = HardDelete
609-
collection, err = suite.coordinator.GetCollections(ctx, collectionId, nil, suite.tenantName, suite.databaseName, nil, nil)
610-
suite.NoError(err)
611-
// Check that the collection is still deleted
612-
suite.Empty(collection)
613-
// Create a collection and delete while being in HardDelete mode.
614-
anotherCollectionId := types.NewUniqueID()
615-
suite.coordinator.CreateCollection(ctx, &model.CreateCollection{
616-
ID: anotherCollectionId,
617-
Name: "test_coll_fwd_bkwd_compat",
618-
TenantID: suite.tenantName,
619-
DatabaseName: suite.databaseName,
620-
})
621-
suite.coordinator.DeleteCollection(ctx, &model.DeleteCollection{
622-
ID: anotherCollectionId,
623-
DatabaseName: suite.databaseName,
624-
TenantID: suite.tenantName,
625-
})
626-
627-
// Toggle the mode.
628-
suite.coordinator.deleteMode = SoftDelete
629-
collection, err = suite.coordinator.GetCollections(ctx, collectionId, nil, suite.tenantName, suite.databaseName, nil, nil)
630-
suite.NoError(err)
631-
// Check that the collection is still deleted
632-
suite.Empty(collection)
633-
collection, err = suite.coordinator.GetCollections(ctx, anotherCollectionId, nil, suite.tenantName, suite.databaseName, nil, nil)
634-
suite.NoError(err)
635-
// Check that another collection is still deleted
636-
suite.Empty(collection)
637580
}
638581

639582
func (suite *APIsTestSuite) TestCollectionSize() {
@@ -1310,7 +1253,6 @@ func (suite *APIsTestSuite) TestSoftAndHardDeleteCollection() {
13101253
ctx := context.Background()
13111254

13121255
// Test Hard Delete scenario
1313-
suite.coordinator.deleteMode = HardDelete
13141256
// Create test collection
13151257
testCollection2 := &model.CreateCollection{
13161258
ID: types.NewUniqueID(),
@@ -1324,7 +1266,13 @@ func (suite *APIsTestSuite) TestSoftAndHardDeleteCollection() {
13241266
suite.NoError(err)
13251267

13261268
// Hard delete the collection
1327-
err = suite.coordinator.DeleteCollection(ctx, &model.DeleteCollection{
1269+
err = suite.coordinator.SoftDeleteCollection(ctx, &model.DeleteCollection{
1270+
ID: testCollection2.ID,
1271+
TenantID: testCollection2.TenantID,
1272+
DatabaseName: testCollection2.DatabaseName,
1273+
})
1274+
suite.NoError(err)
1275+
err = suite.coordinator.FinishCollectionDeletion(ctx, &model.DeleteCollection{
13281276
ID: testCollection2.ID,
13291277
TenantID: testCollection2.TenantID,
13301278
DatabaseName: testCollection2.DatabaseName,
@@ -1343,7 +1291,6 @@ func (suite *APIsTestSuite) TestSoftAndHardDeleteCollection() {
13431291
suite.Empty(softDeletedResults)
13441292

13451293
// Test Soft Delete scenario
1346-
suite.coordinator.deleteMode = SoftDelete
13471294
// Create a test collection
13481295
testCollection := &model.CreateCollection{
13491296
ID: types.NewUniqueID(),
@@ -1357,7 +1304,7 @@ func (suite *APIsTestSuite) TestSoftAndHardDeleteCollection() {
13571304
suite.NoError(err)
13581305

13591306
// Soft delete the collection
1360-
err = suite.coordinator.DeleteCollection(ctx, &model.DeleteCollection{
1307+
err = suite.coordinator.SoftDeleteCollection(ctx, &model.DeleteCollection{
13611308
ID: testCollection.ID,
13621309
TenantID: testCollection.TenantID,
13631310
DatabaseName: testCollection.DatabaseName,

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,50 @@ func (tc *Catalog) hardDeleteCollection(ctx context.Context, deleteCollection *m
564564
return common.ErrCollectionDeleteNonExistingCollection
565565
}
566566

567+
if !collectionEntry.IsDeleted {
568+
return common.ErrCollectionWasNotSoftDeleted
569+
}
570+
571+
if collectionEntry.RootCollectionId != nil {
572+
// This was a forked collection, so we need to update the lineage file
573+
rootCollection, err := tc.metaDomain.CollectionDb(txCtx).GetCollectionWithoutMetadata(collectionEntry.RootCollectionId, nil, nil)
574+
if err != nil {
575+
return err
576+
}
577+
if rootCollection == nil {
578+
return errors.New("root collection not found")
579+
}
580+
581+
if rootCollection.LineageFileName == nil {
582+
return errors.New("lineage file name is nil on root collection")
583+
}
584+
585+
lineageFile, err := tc.getLineageFile(txCtx, rootCollection.LineageFileName)
586+
if err != nil {
587+
return err
588+
}
589+
// Remove collection being deleted from the dependencies
590+
updatedDependencies := make([]*coordinatorpb.CollectionVersionDependency, 0)
591+
for _, dependency := range lineageFile.Dependencies {
592+
if dependency.TargetCollectionId != deleteCollection.ID.String() {
593+
updatedDependencies = append(updatedDependencies, dependency)
594+
}
595+
}
596+
lineageFile.Dependencies = updatedDependencies
597+
598+
newLineageFileId, err := uuid.NewV7()
599+
if err != nil {
600+
return err
601+
}
602+
603+
newLineageFileFullName, err := tc.s3Store.PutLineageFile(collectionEntry.Tenant, collectionEntry.DatabaseID, rootCollection.ID, fmt.Sprintf("%s.binpb", newLineageFileId.String()), lineageFile)
604+
if err != nil {
605+
return err
606+
}
607+
608+
tc.metaDomain.CollectionDb(txCtx).UpdateCollectionLineageFilePath(rootCollection.ID, rootCollection.LineageFileName, newLineageFileFullName)
609+
}
610+
567611
// Delete collection and collection metadata.
568612
collectionDeletedCount, err := tc.metaDomain.CollectionDb(txCtx).DeleteCollectionByID(collectionID.String())
569613
if err != nil {
@@ -1053,7 +1097,12 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
10531097
TargetCollectionId: forkCollection.TargetCollectionID.String(),
10541098
})
10551099

1056-
newLineageFileBaseName := fmt.Sprintf("%s/%d/%s.binpb", sourceCollectionIDStr, sourceCollection.Version, forkCollection.TargetCollectionID)
1100+
newLineageFileId, err := uuid.NewV7()
1101+
if err != nil {
1102+
return err
1103+
}
1104+
1105+
newLineageFileBaseName := fmt.Sprintf("%s.binpb", newLineageFileId.String())
10571106
newLineageFileFullName, err = tc.s3Store.PutLineageFile(lineageFileTenantId, databaseID, rootCollectionIDStr, newLineageFileBaseName, lineageFile)
10581107
if err != nil {
10591108
return err
@@ -1964,7 +2013,7 @@ func (tc *Catalog) MarkVersionForDeletion(ctx context.Context, req *coordinatorp
19642013

19652014
func (tc *Catalog) updateProtoRemoveVersionEntries(versionFilePb *coordinatorpb.CollectionVersionFile, versions []int64) error {
19662015
// Check if version history exists
1967-
if versionFilePb.GetVersionHistory() == nil || len(versionFilePb.GetVersionHistory().Versions) == 0 {
2016+
if versionFilePb.GetVersionHistory() == nil {
19682017
log.Error("version history not found")
19692018
return errors.New("version history not found")
19702019
}
@@ -2000,14 +2049,14 @@ func (tc *Catalog) getNumberOfActiveVersions(versionFilePb *coordinatorpb.Collec
20002049
return len(activeVersions)
20012050
}
20022051

2003-
func (tc *Catalog) getOldestVersionTs(versionFilePb *coordinatorpb.CollectionVersionFile) time.Time {
2052+
func (tc *Catalog) getOldestVersionTs(versionFilePb *coordinatorpb.CollectionVersionFile) *time.Time {
20042053
if versionFilePb.GetVersionHistory() == nil || len(versionFilePb.GetVersionHistory().Versions) == 0 {
2005-
// Returning a zero timestamp that represents an unset value.
2006-
return time.Time{}
2054+
return nil
20072055
}
20082056
oldestVersionTs := versionFilePb.GetVersionHistory().Versions[0].CreatedAtSecs
20092057

2010-
return time.Unix(oldestVersionTs, 0)
2058+
ts := time.Unix(oldestVersionTs, 0)
2059+
return &ts
20112060
}
20122061

20132062
func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenantID string, collectionID string, versions []int64) error {
@@ -2021,8 +2070,7 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20212070

20222071
// Read the existing version file
20232072
collectionIDPtr := &collectionID
2024-
isDeleted := false
2025-
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionWithoutMetadata(collectionIDPtr, nil, &isDeleted)
2073+
collectionEntry, err := tc.metaDomain.CollectionDb(ctx).GetCollectionWithoutMetadata(collectionIDPtr, nil, nil)
20262074
if err != nil {
20272075
return err
20282076
}
@@ -2042,14 +2090,18 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20422090
}
20432091

20442092
numActiveVersions := tc.getNumberOfActiveVersions(versionFilePb)
2045-
if numActiveVersions < 1 {
2093+
if numActiveVersions < 1 && !collectionEntry.IsDeleted {
20462094
// No remaining valid versions after GC.
20472095
return errors.New("no valid versions after gc")
20482096
}
20492097

20502098
// Get the creation time of the oldest version.
20512099
oldestVersionTs := tc.getOldestVersionTs(versionFilePb)
2052-
if oldestVersionTs.IsZero() {
2100+
if oldestVersionTs == nil {
2101+
if !collectionEntry.IsDeleted {
2102+
return errors.New("oldest version timestamp is nil after GC, this should only happen if all versions are deleted")
2103+
}
2104+
} else if oldestVersionTs.IsZero() {
20532105
// This should never happen.
20542106
log.Error("oldest version timestamp is zero after GC.", zap.String("collection_id", collectionID))
20552107
// No versions to delete.
@@ -2069,7 +2121,7 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20692121
}
20702122

20712123
// Update the version file name in Postgres table as a CAS operation
2072-
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVerFileFullPath, &oldestVersionTs, &numActiveVersions)
2124+
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVerFileFullPath, oldestVersionTs, &numActiveVersions)
20732125
if err != nil {
20742126
// Delete the newly created version file from S3 since it is not needed
20752127
tc.s3Store.DeleteVersionFile(tenantID, collectionEntry.DatabaseID, collectionID, newVersionFileName)
@@ -2117,6 +2169,20 @@ func (tc *Catalog) BatchGetCollectionVersionFilePaths(ctx context.Context, colle
21172169
return &result, nil
21182170
}
21192171

2172+
func (tc *Catalog) BatchGetCollectionSoftDeleteStatus(ctx context.Context, collectionIds []string) (*coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse, error) {
2173+
result := coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse{
2174+
CollectionIdToIsSoftDeleted: make(map[string]bool),
2175+
}
2176+
2177+
status, err := tc.metaDomain.CollectionDb(ctx).BatchGetCollectionSoftDeleteStatus(collectionIds)
2178+
if err != nil {
2179+
return nil, err
2180+
}
2181+
result.CollectionIdToIsSoftDeleted = status
2182+
2183+
return &result, nil
2184+
}
2185+
21202186
func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantID string, collectionID string) (string, error) {
21212187
collectionIDPtr := &collectionID
21222188
isDeleted := false

0 commit comments

Comments
 (0)