Skip to content

Commit cf6858c

Browse files
committed
[ENH]: move collection hard deletes to garbage collector
1 parent 8253c18 commit cf6858c

8 files changed

+649
-150
lines changed

rust/garbage_collector/src/construct_version_graph_orchestrator.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,10 @@ pub enum ConstructVersionGraphError {
130130
InvalidUuid(#[from] uuid::Error),
131131
#[error("Invalid timestamp: {0}")]
132132
InvalidTimestamp(i64),
133-
#[error("Expected node not found while constructing graph")]
134-
ExpectedNodeNotFound,
133+
#[error("Expected node not found while constructing graph (collection {0}@v{1:?})")]
134+
ExpectedNodeNotFound(CollectionUuid, Option<i64>),
135+
#[error("Invariant violation: {0}")]
136+
InvariantViolation(String),
135137
}
136138

137139
impl<E> From<TaskError<E>> for ConstructVersionGraphError
@@ -159,7 +161,8 @@ impl ChromaError for ConstructVersionGraphError {
159161
ConstructVersionGraphError::FetchVersionFilePaths(err) => err.code(),
160162
ConstructVersionGraphError::InvalidUuid(_) => ErrorCodes::Internal,
161163
ConstructVersionGraphError::InvalidTimestamp(_) => ErrorCodes::InvalidArgument,
162-
ConstructVersionGraphError::ExpectedNodeNotFound => ErrorCodes::Internal,
164+
ConstructVersionGraphError::ExpectedNodeNotFound(_, _) => ErrorCodes::Internal,
165+
ConstructVersionGraphError::InvariantViolation(_) => ErrorCodes::Internal,
163166
}
164167
}
165168
}
@@ -224,9 +227,11 @@ impl ConstructVersionGraphOrchestrator {
224227
ctx: &ComponentContext<ConstructVersionGraphOrchestrator>,
225228
) -> Result<(), ConstructVersionGraphError> {
226229
if self.num_pending_tasks == 0 {
230+
// This map will be used as a basis for building the graph
227231
let mut versions_by_collection_id: HashMap<CollectionUuid, Vec<(i64, VersionStatus)>> =
228232
HashMap::new();
229233

234+
// Add all known versions from version files to map
230235
for (collection_id, version_file) in self.version_files.iter() {
231236
if let Some(versions) = &version_file.version_history {
232237
for version in versions.versions.iter() {
@@ -251,6 +256,7 @@ impl ConstructVersionGraphOrchestrator {
251256
}
252257
}
253258

259+
// If any version appears as a version dependency (from the lineage file) but does not already exist in the map from the version files, the version must have been deleted.
254260
for dependency in self.version_dependencies.iter() {
255261
let source_collection_id = dependency.source_collection_id;
256262
let source_collection_version = dependency.source_collection_version;
@@ -272,6 +278,11 @@ impl ConstructVersionGraphOrchestrator {
272278
versions.sort_unstable_by_key(|v| v.0);
273279
}
274280

281+
tracing::trace!(
282+
"Versions by collection ID: {:#?}",
283+
versions_by_collection_id
284+
);
285+
275286
let mut graph = DiGraph::new();
276287
for (collection_id, versions) in versions_by_collection_id.iter() {
277288
let mut prev_node = None;
@@ -282,12 +293,14 @@ impl ConstructVersionGraphOrchestrator {
282293
status: *status,
283294
});
284295
if let Some(prev) = prev_node {
296+
// Add edge between each successive pair of collection versions
285297
graph.add_edge(prev, node, ());
286298
}
287299
prev_node = Some(node);
288300
}
289301
}
290302

303+
// Add edges for forked collections
291304
for dependency in self.version_dependencies.iter() {
292305
let source_node = graph
293306
.node_indices()
@@ -296,15 +309,25 @@ impl ConstructVersionGraphOrchestrator {
296309
node.collection_id == dependency.source_collection_id
297310
&& node.version == dependency.source_collection_version
298311
})
299-
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
312+
.ok_or_else(|| {
313+
ConstructVersionGraphError::ExpectedNodeNotFound(
314+
dependency.source_collection_id,
315+
Some(dependency.source_collection_version),
316+
)
317+
})?;
300318

301319
let target_node = graph
302320
.node_indices()
303321
.find(|n| {
304322
let node = graph.node_weight(*n).expect("node index should exist");
305323
node.collection_id == dependency.target_collection_id
306324
})
307-
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
325+
.ok_or_else(|| {
326+
ConstructVersionGraphError::ExpectedNodeNotFound(
327+
dependency.target_collection_id,
328+
None,
329+
)
330+
})?;
308331

309332
graph.add_edge(source_node, target_node, ());
310333
}
@@ -317,6 +340,15 @@ impl ConstructVersionGraphOrchestrator {
317340

318341
tracing::trace!("Version files: {:#?}", self.version_files);
319342

343+
let components = petgraph::algo::connected_components(&graph);
344+
if components != 1 {
345+
// This is a defensive check, it should never happen
346+
return Err(ConstructVersionGraphError::InvariantViolation(format!(
347+
"Graph is not fully connected, found {} components",
348+
components
349+
)));
350+
}
351+
320352
self.terminate_with_result(
321353
Ok(ConstructVersionGraphResponse {
322354
graph,

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 150 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ use chroma_system::{
3232
PanicError, System, TaskError, TaskResult,
3333
};
3434
use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection};
35-
use chroma_types::CollectionUuid;
35+
use chroma_types::{
36+
BatchGetCollectionSoftDeleteStatusError, CollectionUuid, DeleteCollectionError,
37+
};
3638
use chrono::{DateTime, Utc};
3739
use std::collections::{HashMap, HashSet};
3840
use std::str::FromStr;
@@ -61,6 +63,9 @@ pub struct GarbageCollectorOrchestrator {
6163
num_pending_tasks: usize,
6264
min_versions_to_keep: u32,
6365
graph: Option<VersionGraph>,
66+
soft_deleted_collections_to_gc: HashSet<CollectionUuid>,
67+
tenant: Option<String>,
68+
database_name: Option<String>,
6469

6570
num_files_deleted: u32,
6671
num_versions_deleted: u32,
@@ -107,6 +112,9 @@ impl GarbageCollectorOrchestrator {
107112
num_pending_tasks: 0,
108113
min_versions_to_keep,
109114
graph: None,
115+
soft_deleted_collections_to_gc: HashSet::new(),
116+
tenant: None,
117+
database_name: None,
110118

111119
num_files_deleted: 0,
112120
num_versions_deleted: 0,
@@ -127,6 +135,8 @@ pub enum GarbageCollectorError {
127135
#[error("The task was aborted because resources were exhausted")]
128136
Aborted,
129137

138+
#[error("Failed to get collection soft delete status: {0}")]
139+
BatchGetCollectionSoftDeleteStatus(#[from] BatchGetCollectionSoftDeleteStatusError),
130140
#[error("Failed to construct version graph: {0}")]
131141
ConstructVersionGraph(#[from] ConstructVersionGraphError),
132142
#[error("Failed to compute versions to delete: {0}")]
@@ -146,6 +156,8 @@ pub enum GarbageCollectorError {
146156
InvariantViolation(String),
147157
#[error("Could not parse UUID: {0}")]
148158
UnparsableUuid(#[from] uuid::Error),
159+
#[error("Collection deletion failed: {0}")]
160+
CollectionDeletionFailed(#[from] DeleteCollectionError),
149161
}
150162

151163
impl ChromaError for GarbageCollectorError {
@@ -216,13 +228,33 @@ impl GarbageCollectorOrchestrator {
216228
self.lineage_file_path.clone(),
217229
);
218230
let output = orchestrator.run(self.system.clone()).await?;
231+
232+
let collection_ids = output.version_files.keys().cloned().collect::<Vec<_>>();
233+
let soft_delete_statuses = self
234+
.sysdb_client
235+
.batch_get_collection_soft_delete_status(collection_ids)
236+
.await?;
237+
self.soft_deleted_collections_to_gc = soft_delete_statuses
238+
.iter()
239+
.filter_map(
240+
|(collection_id, status)| {
241+
if *status {
242+
Some(*collection_id)
243+
} else {
244+
None
245+
}
246+
},
247+
)
248+
.collect();
249+
219250
self.version_files = output.version_files;
220251
self.graph = Some(output.graph.clone());
221252

222253
let task = wrap(
223254
Box::new(ComputeVersionsToDeleteOperator {}),
224255
ComputeVersionsToDeleteInput {
225256
graph: output.graph,
257+
soft_deleted_collections: self.soft_deleted_collections_to_gc.clone(),
226258
cutoff_time: self.absolute_cutoff_time,
227259
min_versions_to_keep: self.min_versions_to_keep,
228260
},
@@ -262,6 +294,13 @@ impl GarbageCollectorOrchestrator {
262294
})
263295
.collect();
264296

297+
self.pending_mark_versions_at_sysdb_tasks = output
298+
.versions
299+
.keys()
300+
.filter(|collection_id| !self.soft_deleted_collections_to_gc.contains(collection_id))
301+
.cloned()
302+
.collect();
303+
265304
for (collection_id, versions) in &output.versions {
266305
let version_file = self
267306
.version_files
@@ -398,7 +437,7 @@ impl GarbageCollectorOrchestrator {
398437
output.collection_id, output.version
399438
)))?;
400439

401-
let root = graph
440+
let root_index = graph
402441
.node_indices()
403442
.find(|&n| {
404443
graph
@@ -409,20 +448,26 @@ impl GarbageCollectorOrchestrator {
409448
.ok_or(GarbageCollectorError::InvariantViolation(
410449
"Expected to find root node".to_string(),
411450
))?;
412-
413-
let versions_from_root_to_this_node =
414-
petgraph::algo::astar(graph, root, |finish| finish == this_node, |_| 1, |_| 0)
415-
.ok_or(GarbageCollectorError::InvariantViolation(format!(
416-
"Expected to find path from root to node for {}@v{}",
417-
output.collection_id, output.version
418-
)))?
419-
.1
420-
.into_iter()
421-
.map(|i| {
422-
let node = graph.node_weight(i).expect("Node should exist");
423-
node.version
424-
})
425-
.collect::<Vec<_>>();
451+
let root = graph.node_weight(root_index).expect("Node should exist");
452+
453+
let versions_from_root_to_this_node = petgraph::algo::astar(
454+
graph,
455+
root_index,
456+
|finish| finish == this_node,
457+
|_| 1,
458+
|_| 0,
459+
)
460+
.ok_or(GarbageCollectorError::InvariantViolation(format!(
461+
"Expected to find path from root ({}@v{}) to node for {}@v{}",
462+
root.collection_id, root.version, output.collection_id, output.version
463+
)))?
464+
.1
465+
.into_iter()
466+
.map(|i| {
467+
let node = graph.node_weight(i).expect("Node should exist");
468+
node.version
469+
})
470+
.collect::<Vec<_>>();
426471
let are_all_versions_v0 = versions_from_root_to_this_node
427472
.iter()
428473
.all(|&version| version == 0);
@@ -523,14 +568,15 @@ impl GarbageCollectorOrchestrator {
523568
"Expected there to be at least one version file".to_string(),
524569
))?;
525570
// Assumes that all collections in a fork tree are under the same tenant
526-
let tenant_id = version_file
527-
.collection_info_immutable
528-
.as_ref()
529-
.ok_or(GarbageCollectorError::InvariantViolation(
571+
let collection_info = version_file.collection_info_immutable.as_ref().ok_or(
572+
GarbageCollectorError::InvariantViolation(
530573
"Expected collection_info_immutable to be set".to_string(),
531-
))?
532-
.tenant_id
533-
.clone();
574+
),
575+
)?;
576+
let tenant_id = collection_info.tenant_id.clone();
577+
self.tenant = Some(tenant_id.clone());
578+
let database_name = collection_info.database_name.clone();
579+
self.database_name = Some(database_name.clone());
534580

535581
let task = wrap(
536582
Box::new(DeleteUnusedFilesOperator::new(
@@ -659,6 +705,85 @@ impl GarbageCollectorOrchestrator {
659705

660706
Ok(())
661707
}
708+
709+
async fn handle_delete_versions_output(
710+
&mut self,
711+
output: DeleteVersionsAtSysDbOutput,
712+
ctx: &ComponentContext<Self>,
713+
) -> Result<(), GarbageCollectorError> {
714+
tracing::trace!("Received DeleteVersionsAtSysDbOutput: {:#?}", output);
715+
self.num_versions_deleted += output.versions_to_delete.versions.len() as u32;
716+
717+
self.num_pending_tasks -= 1;
718+
if self.num_pending_tasks == 0 {
719+
for collection_id in self.soft_deleted_collections_to_gc.iter() {
720+
let graph =
721+
self.graph
722+
.as_ref()
723+
.ok_or(GarbageCollectorError::InvariantViolation(
724+
"Expected graph to be set".to_string(),
725+
))?;
726+
727+
// Find node with minimum version for the collection
728+
let first_collection_node = graph
729+
.node_indices()
730+
.filter(|&n| {
731+
let node = graph.node_weight(n).expect("Node should exist");
732+
node.collection_id == *collection_id
733+
})
734+
.min_by(|a, b| {
735+
let a_node = graph.node_weight(*a).expect("Node should exist");
736+
let b_node = graph.node_weight(*b).expect("Node should exist");
737+
a_node.version.cmp(&b_node.version)
738+
})
739+
.ok_or(GarbageCollectorError::InvariantViolation(format!(
740+
"Expected to find node for collection {}",
741+
collection_id
742+
)))?;
743+
744+
// We cannot finalize collection deletion (perform a hard delete) if there are any forked collections downstream that are still alive. If we violated this invariant, there would be a missing edge in the lineage file (resulting in an unconnected graph).
745+
let mut dfs = petgraph::visit::Dfs::new(graph, first_collection_node);
746+
let mut seen_collection_ids: HashSet<CollectionUuid> = HashSet::new();
747+
748+
while let Some(nx) = dfs.next(&graph) {
749+
let node = graph.node_weight(nx).expect("Node should exist");
750+
seen_collection_ids.insert(node.collection_id);
751+
}
752+
753+
let are_all_children_in_fork_tree_also_soft_deleted =
754+
seen_collection_ids.iter().all(|collection_id| {
755+
self.soft_deleted_collections_to_gc.contains(collection_id)
756+
});
757+
758+
if are_all_children_in_fork_tree_also_soft_deleted {
759+
self.sysdb_client
760+
.finish_collection_deletion(
761+
self.tenant.clone().ok_or(
762+
GarbageCollectorError::InvariantViolation(
763+
"Expected tenant to be set".to_string(),
764+
),
765+
)?,
766+
self.database_name.clone().ok_or(
767+
GarbageCollectorError::InvariantViolation(
768+
"Expected database to be set".to_string(),
769+
),
770+
)?,
771+
*collection_id,
772+
)
773+
.await?;
774+
}
775+
}
776+
777+
let response = GarbageCollectorResponse {
778+
num_files_deleted: self.num_files_deleted,
779+
num_versions_deleted: self.num_versions_deleted,
780+
};
781+
782+
self.terminate_with_result(Ok(response), ctx).await;
783+
}
784+
785+
Ok(())
786+
}
662787
}
663788

664789
#[async_trait]
@@ -774,18 +899,9 @@ impl Handler<TaskResult<DeleteVersionsAtSysDbOutput, DeleteVersionsAtSysDbError>
774899
Some(output) => output,
775900
None => return,
776901
};
777-
tracing::trace!("Received DeleteVersionsAtSysDbOutput: {:#?}", output);
778-
self.num_versions_deleted += output.versions_to_delete.versions.len() as u32;
779902

780-
self.num_pending_tasks -= 1;
781-
if self.num_pending_tasks == 0 {
782-
let response = GarbageCollectorResponse {
783-
num_files_deleted: self.num_files_deleted,
784-
num_versions_deleted: self.num_versions_deleted,
785-
};
786-
787-
self.terminate_with_result(Ok(response), ctx).await;
788-
}
903+
let res = self.handle_delete_versions_output(output, ctx).await;
904+
self.ok_or_terminate(res, ctx).await;
789905
}
790906
}
791907

0 commit comments

Comments
 (0)