Skip to content

Commit 8aff535

Browse files
committed
[ENH]: move collection hard deletes to garbage collector
1 parent 9aed3e4 commit 8aff535

File tree

7 files changed

+521
-159
lines changed

7 files changed

+521
-159
lines changed

rust/garbage_collector/src/construct_version_graph_orchestrator.rs

Lines changed: 69 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,10 @@ pub enum ConstructVersionGraphError {
130130
InvalidUuid(#[from] uuid::Error),
131131
#[error("Invalid timestamp: {0}")]
132132
InvalidTimestamp(i64),
133-
#[error("Expected node not found while constructing graph")]
134-
ExpectedNodeNotFound,
133+
#[error("Expected node not found while constructing graph (collection {0}@v{1:?})")]
134+
ExpectedNodeNotFound(CollectionUuid, Option<i64>),
135+
#[error("Invariant violation: {0}")]
136+
InvariantViolation(String),
135137
}
136138

137139
impl<E> From<TaskError<E>> for ConstructVersionGraphError
@@ -159,7 +161,8 @@ impl ChromaError for ConstructVersionGraphError {
159161
ConstructVersionGraphError::FetchVersionFilePaths(err) => err.code(),
160162
ConstructVersionGraphError::InvalidUuid(_) => ErrorCodes::Internal,
161163
ConstructVersionGraphError::InvalidTimestamp(_) => ErrorCodes::InvalidArgument,
162-
ConstructVersionGraphError::ExpectedNodeNotFound => ErrorCodes::Internal,
164+
ConstructVersionGraphError::ExpectedNodeNotFound(_, _) => ErrorCodes::Internal,
165+
ConstructVersionGraphError::InvariantViolation(_) => ErrorCodes::Internal,
163166
}
164167
}
165168
}
@@ -263,6 +266,11 @@ impl ConstructVersionGraphOrchestrator {
263266
versions.sort_unstable_by_key(|v| v.0);
264267
}
265268

269+
tracing::trace!(
270+
"Versions by collection ID: {:#?}",
271+
versions_by_collection_id
272+
);
273+
266274
let mut graph = DiGraph::new();
267275
for (collection_id, versions) in versions_by_collection_id.iter() {
268276
let mut prev_node = None;
@@ -279,23 +287,57 @@ impl ConstructVersionGraphOrchestrator {
279287
}
280288
}
281289

282-
for dependency in self.version_dependencies.iter() {
283-
let source_node = graph
284-
.node_indices()
285-
.find(|n| {
286-
let node = graph.node_weight(*n).expect("node index should exist");
287-
node.collection_id == dependency.source_collection_id
288-
&& node.version == dependency.source_collection_version
289-
})
290-
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
290+
let collection_id_soft_delete = self
291+
.sysdb
292+
.batch_get_collection_soft_delete_status(
293+
versions_by_collection_id.keys().cloned().collect(),
294+
)
295+
.await
296+
.unwrap(); // todo
291297

292-
let target_node = graph
293-
.node_indices()
294-
.find(|n| {
295-
let node = graph.node_weight(*n).expect("node index should exist");
296-
node.collection_id == dependency.target_collection_id
297-
})
298-
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
298+
for dependency in self.version_dependencies.iter() {
299+
let source_node = match graph.node_indices().find(|n| {
300+
let node = graph.node_weight(*n).expect("node index should exist");
301+
node.collection_id == dependency.source_collection_id
302+
&& node.version == dependency.source_collection_version
303+
}) {
304+
Some(source_node) => source_node,
305+
None => {
306+
let is_source_soft_deleted = collection_id_soft_delete
307+
.get(&dependency.source_collection_id)
308+
.cloned()
309+
.unwrap_or(false);
310+
311+
if !is_source_soft_deleted {
312+
return Err(ConstructVersionGraphError::ExpectedNodeNotFound(
313+
dependency.source_collection_id,
314+
Some(dependency.source_collection_version),
315+
));
316+
}
317+
318+
// Insert new node
319+
graph.add_node(VersionGraphNode {
320+
collection_id: dependency.source_collection_id,
321+
version: dependency.source_collection_version,
322+
status: VersionStatus::Deleted,
323+
})
324+
}
325+
};
326+
327+
let target_node = match graph.node_indices().find(|n| {
328+
let node = graph.node_weight(*n).expect("node index should exist");
329+
node.collection_id == dependency.target_collection_id
330+
}) {
331+
Some(target_node) => target_node,
332+
None => {
333+
// Insert new node
334+
graph.add_node(VersionGraphNode {
335+
collection_id: dependency.target_collection_id,
336+
version: 0,
337+
status: VersionStatus::Deleted,
338+
})
339+
}
340+
};
299341

300342
graph.add_edge(source_node, target_node, ());
301343
}
@@ -308,6 +350,14 @@ impl ConstructVersionGraphOrchestrator {
308350

309351
tracing::trace!("Version files: {:#?}", self.version_files);
310352

353+
let components = petgraph::algo::connected_components(&graph);
354+
if components != 1 {
355+
return Err(ConstructVersionGraphError::InvariantViolation(format!(
356+
"Graph is not fully connected, found {} components",
357+
components
358+
)));
359+
}
360+
311361
self.terminate_with_result(
312362
Ok(ConstructVersionGraphResponse {
313363
graph,

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 154 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use chroma_system::{
3232
PanicError, System, TaskError, TaskResult,
3333
};
3434
use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection};
35-
use chroma_types::CollectionUuid;
35+
use chroma_types::{CollectionUuid, DeleteCollectionError};
3636
use chrono::{DateTime, Utc};
3737
use std::collections::{HashMap, HashSet};
3838
use std::str::FromStr;
@@ -61,6 +61,9 @@ pub struct GarbageCollectorOrchestrator {
6161
num_pending_tasks: usize,
6262
min_versions_to_keep: u32,
6363
graph: Option<VersionGraph>,
64+
soft_deleted_collections_to_gc: HashSet<CollectionUuid>,
65+
tenant: Option<String>,
66+
database_name: Option<String>,
6467

6568
num_files_deleted: u32,
6669
num_versions_deleted: u32,
@@ -107,6 +110,9 @@ impl GarbageCollectorOrchestrator {
107110
num_pending_tasks: 0,
108111
min_versions_to_keep,
109112
graph: None,
113+
soft_deleted_collections_to_gc: HashSet::new(),
114+
tenant: None,
115+
database_name: None,
110116

111117
num_files_deleted: 0,
112118
num_versions_deleted: 0,
@@ -146,6 +152,8 @@ pub enum GarbageCollectorError {
146152
InvariantViolation(String),
147153
#[error("Could not parse UUID: {0}")]
148154
UnparsableUuid(#[from] uuid::Error),
155+
#[error("Collection deletion failed: {0}")]
156+
CollectionDeletionFailed(#[from] DeleteCollectionError),
149157
}
150158

151159
impl ChromaError for GarbageCollectorError {
@@ -216,13 +224,35 @@ impl GarbageCollectorOrchestrator {
216224
self.lineage_file_path.clone(),
217225
);
218226
let output = orchestrator.run(self.system.clone()).await?;
227+
228+
let collection_ids = output.version_files.keys().cloned().collect::<Vec<_>>();
229+
let soft_delete_statuses = self
230+
.sysdb_client
231+
.batch_get_collection_soft_delete_status(collection_ids)
232+
.await
233+
.unwrap(); // todo
234+
// todo: filter out root collection?
235+
self.soft_deleted_collections_to_gc = soft_delete_statuses
236+
.iter()
237+
.filter_map(
238+
|(collection_id, status)| {
239+
if *status {
240+
Some(*collection_id)
241+
} else {
242+
None
243+
}
244+
},
245+
)
246+
.collect();
247+
219248
self.version_files = output.version_files;
220249
self.graph = Some(output.graph.clone());
221250

222251
let task = wrap(
223252
Box::new(ComputeVersionsToDeleteOperator {}),
224253
ComputeVersionsToDeleteInput {
225254
graph: output.graph,
255+
soft_deleted_collections: self.soft_deleted_collections_to_gc.clone(),
226256
cutoff_time: self.absolute_cutoff_time,
227257
min_versions_to_keep: self.min_versions_to_keep,
228258
},
@@ -262,6 +292,13 @@ impl GarbageCollectorOrchestrator {
262292
})
263293
.collect();
264294

295+
self.pending_mark_versions_at_sysdb_tasks = output
296+
.versions
297+
.keys()
298+
.filter(|collection_id| !self.soft_deleted_collections_to_gc.contains(collection_id))
299+
.cloned()
300+
.collect();
301+
265302
for (collection_id, versions) in &output.versions {
266303
let version_file = self
267304
.version_files
@@ -398,7 +435,7 @@ impl GarbageCollectorOrchestrator {
398435
output.collection_id, output.version
399436
)))?;
400437

401-
let root = graph
438+
let root_index = graph
402439
.node_indices()
403440
.find(|&n| {
404441
graph
@@ -409,20 +446,26 @@ impl GarbageCollectorOrchestrator {
409446
.ok_or(GarbageCollectorError::InvariantViolation(
410447
"Expected to find root node".to_string(),
411448
))?;
412-
413-
let versions_from_root_to_this_node =
414-
petgraph::algo::astar(graph, root, |finish| finish == this_node, |_| 1, |_| 0)
415-
.ok_or(GarbageCollectorError::InvariantViolation(format!(
416-
"Expected to find path from root to node for {}@v{}",
417-
output.collection_id, output.version
418-
)))?
419-
.1
420-
.into_iter()
421-
.map(|i| {
422-
let node = graph.node_weight(i).expect("Node should exist");
423-
node.version
424-
})
425-
.collect::<Vec<_>>();
449+
let root = graph.node_weight(root_index).expect("Node should exist");
450+
451+
let versions_from_root_to_this_node = petgraph::algo::astar(
452+
graph,
453+
root_index,
454+
|finish| finish == this_node,
455+
|_| 1,
456+
|_| 0,
457+
)
458+
.ok_or(GarbageCollectorError::InvariantViolation(format!(
459+
"Expected to find path from root ({}@v{}) to node for {}@v{}",
460+
root.collection_id, root.version, output.collection_id, output.version
461+
)))?
462+
.1
463+
.into_iter()
464+
.map(|i| {
465+
let node = graph.node_weight(i).expect("Node should exist");
466+
node.version
467+
})
468+
.collect::<Vec<_>>();
426469
let are_all_versions_v0 = versions_from_root_to_this_node
427470
.iter()
428471
.all(|&version| version == 0);
@@ -523,14 +566,15 @@ impl GarbageCollectorOrchestrator {
523566
"Expected there to be at least one version file".to_string(),
524567
))?;
525568
// Assumes that all collections in a fork tree are under the same tenant
526-
let tenant_id = version_file
527-
.collection_info_immutable
528-
.as_ref()
529-
.ok_or(GarbageCollectorError::InvariantViolation(
569+
let collection_info = version_file.collection_info_immutable.as_ref().ok_or(
570+
GarbageCollectorError::InvariantViolation(
530571
"Expected collection_info_immutable to be set".to_string(),
531-
))?
532-
.tenant_id
533-
.clone();
572+
),
573+
)?;
574+
let tenant_id = collection_info.tenant_id.clone();
575+
self.tenant = Some(tenant_id.clone());
576+
let database_name = collection_info.database_name.clone();
577+
self.database_name = Some(database_name.clone());
534578

535579
let task = wrap(
536580
Box::new(DeleteUnusedFilesOperator::new(
@@ -659,6 +703,91 @@ impl GarbageCollectorOrchestrator {
659703

660704
Ok(())
661705
}
706+
707+
async fn handle_delete_versions_output(
708+
&mut self,
709+
output: DeleteVersionsAtSysDbOutput,
710+
ctx: &ComponentContext<Self>,
711+
) -> Result<(), GarbageCollectorError> {
712+
tracing::trace!("Received DeleteVersionsAtSysDbOutput: {:#?}", output);
713+
self.num_versions_deleted += output.versions_to_delete.versions.len() as u32;
714+
715+
self.num_pending_tasks -= 1;
716+
if self.num_pending_tasks == 0 {
717+
for collection_id in self.soft_deleted_collections_to_gc.drain() {
718+
let graph =
719+
self.graph
720+
.as_ref()
721+
.ok_or(GarbageCollectorError::InvariantViolation(
722+
"Expected graph to be set".to_string(),
723+
))?;
724+
725+
let first_collection_node = graph
726+
.node_indices()
727+
.filter(|&n| {
728+
let node = graph.node_weight(n).expect("Node should exist");
729+
node.collection_id == collection_id
730+
})
731+
.max_by(|a, b| {
732+
// todo: does this sort in the right direction?
733+
let a_node = graph.node_weight(*a).expect("Node should exist");
734+
let b_node = graph.node_weight(*b).expect("Node should exist");
735+
b_node.version.cmp(&a_node.version)
736+
})
737+
.ok_or(GarbageCollectorError::InvariantViolation(format!(
738+
"Expected to find node for collection {}",
739+
collection_id
740+
)))?;
741+
742+
let first_node = graph
743+
.node_weight(first_collection_node)
744+
.expect("Node should exist");
745+
tracing::debug!(
746+
"First node for collection {}: {:#?}",
747+
collection_id,
748+
first_node
749+
);
750+
751+
let mut dfs = petgraph::visit::Dfs::new(graph, first_collection_node);
752+
let mut seen_collection_ids: HashSet<CollectionUuid> = HashSet::new();
753+
754+
while let Some(nx) = dfs.next(&graph) {
755+
let node = graph.node_weight(nx).expect("Node should exist");
756+
seen_collection_ids.insert(node.collection_id);
757+
}
758+
759+
let is_leaf_node_in_fork_tree =
760+
seen_collection_ids.len() == 1 && seen_collection_ids.contains(&collection_id);
761+
762+
if is_leaf_node_in_fork_tree {
763+
self.sysdb_client
764+
.finish_collection_deletion(
765+
self.tenant.clone().ok_or(
766+
GarbageCollectorError::InvariantViolation(
767+
"Expected tenant to be set".to_string(),
768+
),
769+
)?,
770+
self.database_name.clone().ok_or(
771+
GarbageCollectorError::InvariantViolation(
772+
"Expected database to be set".to_string(),
773+
),
774+
)?,
775+
collection_id,
776+
)
777+
.await?;
778+
}
779+
}
780+
781+
let response = GarbageCollectorResponse {
782+
num_files_deleted: self.num_files_deleted,
783+
num_versions_deleted: self.num_versions_deleted,
784+
};
785+
786+
self.terminate_with_result(Ok(response), ctx).await;
787+
}
788+
789+
Ok(())
790+
}
662791
}
663792

664793
#[async_trait]
@@ -774,18 +903,9 @@ impl Handler<TaskResult<DeleteVersionsAtSysDbOutput, DeleteVersionsAtSysDbError>
774903
Some(output) => output,
775904
None => return,
776905
};
777-
tracing::trace!("Received DeleteVersionsAtSysDbOutput: {:#?}", output);
778-
self.num_versions_deleted += output.versions_to_delete.versions.len() as u32;
779-
780-
self.num_pending_tasks -= 1;
781-
if self.num_pending_tasks == 0 {
782-
let response = GarbageCollectorResponse {
783-
num_files_deleted: self.num_files_deleted,
784-
num_versions_deleted: self.num_versions_deleted,
785-
};
786906

787-
self.terminate_with_result(Ok(response), ctx).await;
788-
}
907+
let res = self.handle_delete_versions_output(output, ctx).await;
908+
self.ok_or_terminate(res, ctx).await;
789909
}
790910
}
791911

0 commit comments

Comments
 (0)