Skip to content

Commit add6683

Browse files
committed
[ENH]: soft delete databases, hard delete from garbage collector
1 parent ed65c55 commit add6683

File tree

12 files changed

+210
-42
lines changed

12 files changed

+210
-42
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package coordinator
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/chroma-core/chroma/go/pkg/common"
78
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
@@ -255,3 +256,15 @@ func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, re
255256
func (s *Coordinator) BatchGetCollectionSoftDeleteStatus(ctx context.Context, req *coordinatorpb.BatchGetCollectionSoftDeleteStatusRequest) (*coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse, error) {
256257
return s.catalog.BatchGetCollectionSoftDeleteStatus(ctx, req.CollectionIds)
257258
}
259+
260+
func (s *Coordinator) FinishDatabaseDeletion(ctx context.Context, req *coordinatorpb.FinishDatabaseDeletionRequest) (*coordinatorpb.FinishDatabaseDeletionResponse, error) {
261+
numDeleted, err := s.catalog.FinishDatabaseDeletion(ctx, time.Unix(req.CutoffTime.Seconds, int64(req.CutoffTime.Nanos)))
262+
if err != nil {
263+
return nil, err
264+
}
265+
266+
res := &coordinatorpb.FinishDatabaseDeletionResponse{
267+
NumDeleted: numDeleted,
268+
}
269+
return res, nil
270+
}

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,32 @@ func (tc *Catalog) DeleteDatabase(ctx context.Context, deleteDatabase *model.Del
184184
if len(databases) == 0 {
185185
return common.ErrDatabaseNotFound
186186
}
187-
err = tc.metaDomain.DatabaseDb(txCtx).Delete(databases[0].ID)
187+
err = tc.metaDomain.DatabaseDb(txCtx).SoftDelete(databases[0].ID)
188188
if err != nil {
189189
return err
190190
}
191+
192+
collections, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(nil, nil, deleteDatabase.Tenant, deleteDatabase.Name, nil, nil)
193+
if err != nil {
194+
return err
195+
}
196+
197+
for _, collection := range collections {
198+
collectionID, err := types.Parse(collection.Collection.ID)
199+
if err != nil {
200+
return err
201+
}
202+
203+
err = tc.softDeleteCollection(txCtx, &model.DeleteCollection{
204+
ID: collectionID,
205+
TenantID: deleteDatabase.Tenant,
206+
DatabaseName: deleteDatabase.Name,
207+
})
208+
if err != nil {
209+
return err
210+
}
211+
}
212+
191213
return nil
192214
})
193215
}
@@ -664,7 +686,7 @@ func (tc *Catalog) softDeleteCollection(ctx context.Context, deleteCollection *m
664686

665687
// Generate new name with timestamp and random number
666688
oldName := *collections[0].Collection.Name
667-
newName := fmt.Sprintf("_deleted_%s_%s", oldName, *types.FromUniqueID(deleteCollection.ID))
689+
newName := fmt.Sprintf("_deleted_%s_%s", oldName, deleteCollection.ID.String())
668690

669691
dbCollection := &dbmodel.Collection{
670692
ID: deleteCollection.ID.String(),
@@ -2196,3 +2218,7 @@ func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantI
21962218

21972219
return collectionEntry.VersionFileName, nil
21982220
}
2221+
2222+
func (tc *Catalog) FinishDatabaseDeletion(ctx context.Context, cutoffTime time.Time) (uint64, error) {
2223+
return tc.metaDomain.DatabaseDb(ctx).FinishDatabaseDeletion(cutoffTime)
2224+
}

go/pkg/sysdb/grpc/tenant_database_service.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,11 @@ func (s *Server) GetLastCompactionTimeForTenant(ctx context.Context, req *coordi
158158
}
159159
return res, nil
160160
}
161+
162+
func (s *Server) FinishDatabaseDeletion(ctx context.Context, req *coordinatorpb.FinishDatabaseDeletionRequest) (*coordinatorpb.FinishDatabaseDeletionResponse, error) {
163+
res, err := s.coordinator.FinishDatabaseDeletion(ctx, req)
164+
if err != nil {
165+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
166+
}
167+
return res, nil
168+
}

go/pkg/sysdb/grpc/tenant_database_service_test.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbcore"
1515
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
1616
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
17+
"github.com/chroma-core/chroma/go/pkg/types"
1718
"github.com/google/uuid"
1819
"github.com/pingcap/log"
1920
"github.com/stretchr/testify/suite"
@@ -116,7 +117,7 @@ func (suite *TenantDatabaseServiceTestSuite) TestServer_DeleteDatabase() {
116117
tenantName := "TestDeleteDatabase"
117118
databaseName := "TestDeleteDatabase"
118119
// Generate random uuid for db id
119-
databaseeId := uuid.New().String()
120+
databaseId := uuid.New().String()
120121

121122
_, err := suite.catalog.CreateTenant(context.Background(), &model.CreateTenant{
122123
Name: tenantName,
@@ -127,29 +128,61 @@ func (suite *TenantDatabaseServiceTestSuite) TestServer_DeleteDatabase() {
127128
_, err = suite.catalog.CreateDatabase(context.Background(), &model.CreateDatabase{
128129
Tenant: tenantName,
129130
Name: databaseName,
130-
ID: databaseeId,
131+
ID: databaseId,
131132
Ts: time.Now().Unix(),
132133
}, time.Now().Unix())
133134
suite.NoError(err)
134135

136+
collectionID := types.NewUniqueID()
135137
_, _, err = suite.catalog.CreateCollection(context.Background(), &model.CreateCollection{
138+
ID: collectionID,
136139
TenantID: tenantName,
137140
DatabaseName: databaseName,
138141
Name: "TestCollection",
139142
}, time.Now().Unix())
140143
suite.NoError(err)
141144

145+
timeBeforeSoftDelete := time.Now()
146+
142147
err = suite.catalog.DeleteDatabase(context.Background(), &model.DeleteDatabase{
143148
Tenant: tenantName,
144149
Name: databaseName,
145150
})
146151
suite.NoError(err)
147152

148-
// Check that associated collection was deleted
149-
var count int64
153+
// Check that associated collection was soft deleted
150154
var collections []*dbmodel.Collection
151-
suite.NoError(suite.db.Find(&collections).Count(&count).Error)
152-
suite.Equal(int64(0), count)
155+
suite.NoError(suite.db.Find(&collections).Error)
156+
suite.Equal(1, len(collections))
157+
suite.Equal(true, collections[0].IsDeleted)
158+
159+
// Database should not be eligible for hard deletion yet because it still has a (soft deleted) collection
160+
numDeleted, err := suite.catalog.FinishDatabaseDeletion(context.Background(), time.Now())
161+
suite.NoError(err)
162+
suite.Equal(uint64(0), numDeleted)
163+
164+
// Hard delete associated collection
165+
suite.NoError(err)
166+
suite.NoError(suite.catalog.DeleteCollection(context.Background(), &model.DeleteCollection{
167+
TenantID: tenantName,
168+
DatabaseName: databaseName,
169+
ID: collectionID,
170+
}, false))
171+
172+
// Database should now be eligible for hard deletion, but first verify that database is not deleted if cutoff time is prior to soft delete
173+
numDeleted, err = suite.catalog.FinishDatabaseDeletion(context.Background(), timeBeforeSoftDelete)
174+
suite.NoError(err)
175+
suite.Equal(uint64(0), numDeleted)
176+
177+
// Hard delete database
178+
numDeleted, err = suite.catalog.FinishDatabaseDeletion(context.Background(), time.Now())
179+
suite.NoError(err)
180+
suite.Equal(uint64(1), numDeleted)
181+
182+
// Verify that database is hard deleted
183+
var databases []*dbmodel.Database
184+
suite.NoError(suite.db.Debug().Where("id = ?", databaseId).Find(&databases).Error)
185+
suite.Equal(0, len(databases))
153186
}
154187

155188
func TestTenantDatabaseServiceTestSuite(t *testing.T) {

go/pkg/sysdb/metastore/db/dao/database.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dao
22

33
import (
44
"errors"
5+
"time"
56

67
"github.com/chroma-core/chroma/go/pkg/common"
78
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
@@ -95,13 +96,13 @@ func (s *databaseDb) Insert(database *dbmodel.Database) error {
9596
return err
9697
}
9798

98-
func (s *databaseDb) Delete(databaseID string) error {
99+
func (s *databaseDb) SoftDelete(databaseID string) error {
99100
return s.db.Transaction(func(tx *gorm.DB) error {
100-
if err := tx.Where("id = ?", databaseID).Delete(&dbmodel.Database{}).Error; err != nil {
101-
return err
102-
}
103-
104-
if err := tx.Where("database_id = ?", databaseID).Delete(&dbmodel.Collection{}).Error; err != nil {
101+
if err := tx.Table("databases").
102+
Where("id = ?", databaseID).
103+
Update("is_deleted", true).
104+
Update("updated_at", time.Now()).
105+
Error; err != nil {
105106
return err
106107
}
107108

@@ -121,3 +122,35 @@ func (s *databaseDb) GetDatabasesByTenantID(tenantID string) ([]*dbmodel.Databas
121122
}
122123
return databases, nil
123124
}
125+
126+
func (s *databaseDb) FinishDatabaseDeletion(cutoffTime time.Time) (uint64, error) {
127+
numDeleted := uint64(0)
128+
129+
for {
130+
// Only hard delete databases that were soft deleted prior to the cutoff time and have no collections
131+
databasesSubQuery := s.db.
132+
Table("databases d").
133+
Select("d.id").
134+
Joins("LEFT JOIN collections c ON c.database_id = d.id").
135+
Where("d.is_deleted = ?", true).
136+
Where("d.updated_at < ?", cutoffTime).
137+
Group("d.id").
138+
Having("COUNT(c.id) = 0").
139+
Limit(1000)
140+
141+
res := s.db.Table("databases").
142+
Where("id IN (?)", databasesSubQuery).
143+
Delete(&dbmodel.Database{})
144+
if res.Error != nil {
145+
return numDeleted, res.Error
146+
}
147+
148+
numDeleted += uint64(res.RowsAffected)
149+
150+
if res.RowsAffected == 0 {
151+
break
152+
}
153+
}
154+
155+
return numDeleted, nil
156+
}

go/pkg/sysdb/metastore/db/dbmodel/database.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,6 @@ type IDatabaseDb interface {
2727
ListDatabases(limit *int32, offset *int32, tenantID string) ([]*Database, error)
2828
Insert(in *Database) error
2929
DeleteAll() error
30-
Delete(databaseID string) error
30+
SoftDelete(databaseID string) error
31+
FinishDatabaseDeletion(cutoffTime time.Time) (uint64, error)
3132
}

idl/chromadb/proto/coordinator.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ message DeleteDatabaseRequest {
4646

4747
message DeleteDatabaseResponse {}
4848

49+
message FinishDatabaseDeletionRequest {
50+
google.protobuf.Timestamp cutoff_time = 1;
51+
}
52+
53+
message FinishDatabaseDeletionResponse {
54+
uint64 num_deleted = 1;
55+
}
56+
4957
message CreateTenantRequest {
5058
string name = 2; // Names are globally unique
5159
}
@@ -496,6 +504,7 @@ service SysDB {
496504
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
497505
rpc ListDatabases(ListDatabasesRequest) returns (ListDatabasesResponse) {}
498506
rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse) {}
507+
rpc FinishDatabaseDeletion(FinishDatabaseDeletionRequest) returns (FinishDatabaseDeletionResponse) {}
499508
rpc CreateTenant(CreateTenantRequest) returns (CreateTenantResponse) {}
500509
rpc GetTenant(GetTenantRequest) returns (GetTenantResponse) {}
501510
rpc CreateSegment(CreateSegmentRequest) returns (CreateSegmentResponse) {}

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use chroma_system::{
3434
use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection};
3535
use chroma_types::{
3636
BatchGetCollectionSoftDeleteStatusError, CollectionUuid, DeleteCollectionError,
37+
DeleteDatabaseError,
3738
};
3839
use chrono::{DateTime, Utc};
3940
use std::collections::{HashMap, HashSet};
@@ -158,6 +159,8 @@ pub enum GarbageCollectorError {
158159
UnparsableUuid(#[from] uuid::Error),
159160
#[error("Collection deletion failed: {0}")]
160161
CollectionDeletionFailed(#[from] DeleteCollectionError),
162+
#[error("Database deletion failed: {0}")]
163+
DeleteDatabaseFailed(#[from] DeleteDatabaseError),
161164
}
162165

163166
impl ChromaError for GarbageCollectorError {
@@ -716,6 +719,19 @@ impl GarbageCollectorOrchestrator {
716719

717720
self.num_pending_tasks -= 1;
718721
if self.num_pending_tasks == 0 {
722+
let tenant = self
723+
.tenant
724+
.clone()
725+
.ok_or(GarbageCollectorError::InvariantViolation(
726+
"Expected tenant to be set".to_string(),
727+
))?;
728+
let database_name =
729+
self.database_name
730+
.clone()
731+
.ok_or(GarbageCollectorError::InvariantViolation(
732+
"Expected database to be set".to_string(),
733+
))?;
734+
719735
for collection_id in self.soft_deleted_collections_to_gc.iter() {
720736
let graph =
721737
self.graph
@@ -758,16 +774,8 @@ impl GarbageCollectorOrchestrator {
758774
if are_all_children_in_fork_tree_also_soft_deleted {
759775
self.sysdb_client
760776
.finish_collection_deletion(
761-
self.tenant.clone().ok_or(
762-
GarbageCollectorError::InvariantViolation(
763-
"Expected tenant to be set".to_string(),
764-
),
765-
)?,
766-
self.database_name.clone().ok_or(
767-
GarbageCollectorError::InvariantViolation(
768-
"Expected database to be set".to_string(),
769-
),
770-
)?,
777+
tenant.clone(),
778+
database_name.clone(),
771779
*collection_id,
772780
)
773781
.await?;

rust/garbage_collector/tests/proptest_helpers/garbage_collector_reference.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,6 @@ impl ReferenceStateMachine for ReferenceGarbageCollector {
232232
segments: segment_group,
233233
});
234234

235-
let _delete_collection_transition = alive_collection_id_strategy
236-
.clone()
237-
.prop_map(Transition::DeleteCollection);
238-
239235
let fork_collection_transition =
240236
alive_collection_id_strategy
241237
.clone()
@@ -277,11 +273,11 @@ impl ReferenceStateMachine for ReferenceGarbageCollector {
277273

278274
if alive_collection_ids.is_empty() {
279275
if state.root_collection_id.is_some() {
280-
// If all collections are deleted, we cannot create a new collection, so there is nothing further to do
276+
// If all collections are deleted, we cannot create a new collection
281277
return Just(Transition::NoOp).boxed();
282278
}
283279

284-
return prop_oneof![create_collection_transition,].boxed();
280+
return create_collection_transition.boxed();
285281
}
286282

287283
// While the garbage collector can technically run on any collection in a fork tree, we always run it on the root collection as the test fixture will call `ListCollectionsToGc()` which only returns the root collection.
@@ -293,10 +289,10 @@ impl ReferenceStateMachine for ReferenceGarbageCollector {
293289
});
294290

295291
return prop_oneof![
296-
2 => fork_collection_transition,
297-
3 => increment_collection_version_transition,
298-
2 => garbage_collect_transition,
299-
1 => delete_collection_transition,
292+
5 => increment_collection_version_transition,
293+
4 => fork_collection_transition,
294+
4 => garbage_collect_transition,
295+
3 => delete_collection_transition,
300296
]
301297
.boxed();
302298
}

rust/garbage_collector/tests/proptest_helpers/garbage_collector_under_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ define_thread_local_stats!(STATS);
3232
pub struct GarbageCollectorUnderTest {
3333
runtime: Arc<tokio::runtime::Runtime>,
3434
tenant: String,
35-
database: String,
35+
database_name: String,
3636
sysdb: SysDb,
3737
storage: Storage,
3838
root_manager: RootManager,
@@ -125,7 +125,7 @@ impl StateMachineTest for GarbageCollectorUnderTest {
125125
Self {
126126
runtime: ref_state.runtime.clone(),
127127
tenant: tenant_name,
128-
database: database_name,
128+
database_name,
129129
sysdb,
130130
storage: storage.clone(),
131131
root_manager,
@@ -192,7 +192,7 @@ impl StateMachineTest for GarbageCollectorUnderTest {
192192
.sysdb
193193
.create_collection(
194194
state.tenant.clone(),
195-
state.database.clone(),
195+
state.database_name.clone(),
196196
collection_id,
197197
format!("Collection {}", collection_id),
198198
segments,
@@ -211,7 +211,7 @@ impl StateMachineTest for GarbageCollectorUnderTest {
211211
.runtime
212212
.block_on(state.sysdb.delete_collection(
213213
state.tenant.clone(),
214-
state.database.clone(),
214+
state.database_name.clone(),
215215
collection_id,
216216
vec![],
217217
))

0 commit comments

Comments
 (0)