Skip to content

Commit 5b6a52b

Browse files
committed
[ENH]: move collection hard deletes to garbage collector
1 parent f545524 commit 5b6a52b

File tree

7 files changed

+458
-158
lines changed

7 files changed

+458
-158
lines changed

rust/garbage_collector/src/construct_version_graph_orchestrator.rs

Lines changed: 69 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,10 @@ pub enum ConstructVersionGraphError {
130130
InvalidUuid(#[from] uuid::Error),
131131
#[error("Invalid timestamp: {0}")]
132132
InvalidTimestamp(i64),
133-
#[error("Expected node not found while constructing graph")]
134-
ExpectedNodeNotFound,
133+
#[error("Expected node not found while constructing graph (collection {0}@v{1:?})")]
134+
ExpectedNodeNotFound(CollectionUuid, Option<i64>),
135+
#[error("Invariant violation: {0}")]
136+
InvariantViolation(String),
135137
}
136138

137139
impl<E> From<TaskError<E>> for ConstructVersionGraphError
@@ -159,7 +161,8 @@ impl ChromaError for ConstructVersionGraphError {
159161
ConstructVersionGraphError::FetchVersionFilePaths(err) => err.code(),
160162
ConstructVersionGraphError::InvalidUuid(_) => ErrorCodes::Internal,
161163
ConstructVersionGraphError::InvalidTimestamp(_) => ErrorCodes::InvalidArgument,
162-
ConstructVersionGraphError::ExpectedNodeNotFound => ErrorCodes::Internal,
164+
ConstructVersionGraphError::ExpectedNodeNotFound(_, _) => ErrorCodes::Internal,
165+
ConstructVersionGraphError::InvariantViolation(_) => ErrorCodes::Internal,
163166
}
164167
}
165168
}
@@ -263,6 +266,11 @@ impl ConstructVersionGraphOrchestrator {
263266
versions.sort_unstable_by_key(|v| v.0);
264267
}
265268

269+
tracing::trace!(
270+
"Versions by collection ID: {:#?}",
271+
versions_by_collection_id
272+
);
273+
266274
let mut graph = DiGraph::new();
267275
for (collection_id, versions) in versions_by_collection_id.iter() {
268276
let mut prev_node = None;
@@ -279,23 +287,57 @@ impl ConstructVersionGraphOrchestrator {
279287
}
280288
}
281289

282-
for dependency in self.version_dependencies.iter() {
283-
let source_node = graph
284-
.node_indices()
285-
.find(|n| {
286-
let node = graph.node_weight(*n).expect("node index should exist");
287-
node.collection_id == dependency.source_collection_id
288-
&& node.version == dependency.source_collection_version
289-
})
290-
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
290+
let collection_id_soft_delete = self
291+
.sysdb
292+
.batch_get_collection_soft_delete_status(
293+
versions_by_collection_id.keys().cloned().collect(),
294+
)
295+
.await
296+
.unwrap(); // todo
291297

292-
let target_node = graph
293-
.node_indices()
294-
.find(|n| {
295-
let node = graph.node_weight(*n).expect("node index should exist");
296-
node.collection_id == dependency.target_collection_id
297-
})
298-
.ok_or(ConstructVersionGraphError::ExpectedNodeNotFound)?;
298+
for dependency in self.version_dependencies.iter() {
299+
let source_node = match graph.node_indices().find(|n| {
300+
let node = graph.node_weight(*n).expect("node index should exist");
301+
node.collection_id == dependency.source_collection_id
302+
&& node.version == dependency.source_collection_version
303+
}) {
304+
Some(source_node) => source_node,
305+
None => {
306+
let is_source_soft_deleted = collection_id_soft_delete
307+
.get(&dependency.source_collection_id)
308+
.cloned()
309+
.unwrap_or(false);
310+
311+
if !is_source_soft_deleted {
312+
return Err(ConstructVersionGraphError::ExpectedNodeNotFound(
313+
dependency.source_collection_id,
314+
Some(dependency.source_collection_version),
315+
));
316+
}
317+
318+
// Insert new node
319+
graph.add_node(VersionGraphNode {
320+
collection_id: dependency.source_collection_id,
321+
version: dependency.source_collection_version,
322+
status: VersionStatus::Deleted,
323+
})
324+
}
325+
};
326+
327+
let target_node = match graph.node_indices().find(|n| {
328+
let node = graph.node_weight(*n).expect("node index should exist");
329+
node.collection_id == dependency.target_collection_id
330+
}) {
331+
Some(target_node) => target_node,
332+
None => {
333+
// Insert new node
334+
graph.add_node(VersionGraphNode {
335+
collection_id: dependency.target_collection_id,
336+
version: 0,
337+
status: VersionStatus::Deleted,
338+
})
339+
}
340+
};
299341

300342
graph.add_edge(source_node, target_node, ());
301343
}
@@ -308,6 +350,14 @@ impl ConstructVersionGraphOrchestrator {
308350

309351
tracing::trace!("Version files: {:#?}", self.version_files);
310352

353+
let components = petgraph::algo::connected_components(&graph);
354+
if components != 1 {
355+
return Err(ConstructVersionGraphError::InvariantViolation(format!(
356+
"Graph is not fully connected, found {} components",
357+
components
358+
)));
359+
}
360+
311361
self.terminate_with_result(
312362
Ok(ConstructVersionGraphResponse {
313363
graph,

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 155 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ use chroma_system::{
3232
PanicError, System, TaskError, TaskResult,
3333
};
3434
use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection};
35-
use chroma_types::CollectionUuid;
35+
use chroma_types::{CollectionUuid, DeleteCollectionError};
3636
use chrono::{DateTime, Utc};
37+
use petgraph::visit::NodeCount;
3738
use std::collections::{HashMap, HashSet};
3839
use std::str::FromStr;
3940
use thiserror::Error;
@@ -61,6 +62,9 @@ pub struct GarbageCollectorOrchestrator {
6162
num_pending_tasks: usize,
6263
min_versions_to_keep: u32,
6364
graph: Option<VersionGraph>,
65+
soft_deleted_collections_to_gc: HashSet<CollectionUuid>,
66+
tenant: Option<String>,
67+
database_name: Option<String>,
6468

6569
num_files_deleted: u32,
6670
num_versions_deleted: u32,
@@ -107,6 +111,9 @@ impl GarbageCollectorOrchestrator {
107111
num_pending_tasks: 0,
108112
min_versions_to_keep,
109113
graph: None,
114+
soft_deleted_collections_to_gc: HashSet::new(),
115+
tenant: None,
116+
database_name: None,
110117

111118
num_files_deleted: 0,
112119
num_versions_deleted: 0,
@@ -146,6 +153,8 @@ pub enum GarbageCollectorError {
146153
InvariantViolation(String),
147154
#[error("Could not parse UUID: {0}")]
148155
UnparsableUuid(#[from] uuid::Error),
156+
#[error("Collection deletion failed: {0}")]
157+
CollectionDeletionFailed(#[from] DeleteCollectionError),
149158
}
150159

151160
impl ChromaError for GarbageCollectorError {
@@ -216,13 +225,35 @@ impl GarbageCollectorOrchestrator {
216225
self.lineage_file_path.clone(),
217226
);
218227
let output = orchestrator.run(self.system.clone()).await?;
228+
229+
let collection_ids = output.version_files.keys().cloned().collect::<Vec<_>>();
230+
let soft_delete_statuses = self
231+
.sysdb_client
232+
.batch_get_collection_soft_delete_status(collection_ids)
233+
.await
234+
.unwrap(); // todo
235+
// todo: filter out root collection?
236+
self.soft_deleted_collections_to_gc = soft_delete_statuses
237+
.iter()
238+
.filter_map(
239+
|(collection_id, status)| {
240+
if *status {
241+
Some(*collection_id)
242+
} else {
243+
None
244+
}
245+
},
246+
)
247+
.collect();
248+
219249
self.version_files = output.version_files;
220250
self.graph = Some(output.graph.clone());
221251

222252
let task = wrap(
223253
Box::new(ComputeVersionsToDeleteOperator {}),
224254
ComputeVersionsToDeleteInput {
225255
graph: output.graph,
256+
soft_deleted_collections: self.soft_deleted_collections_to_gc.clone(),
226257
cutoff_time: self.absolute_cutoff_time,
227258
min_versions_to_keep: self.min_versions_to_keep,
228259
},
@@ -262,6 +293,13 @@ impl GarbageCollectorOrchestrator {
262293
})
263294
.collect();
264295

296+
self.pending_mark_versions_at_sysdb_tasks = output
297+
.versions
298+
.keys()
299+
.filter(|collection_id| !self.soft_deleted_collections_to_gc.contains(&collection_id))
300+
.cloned()
301+
.collect();
302+
265303
for (collection_id, versions) in &output.versions {
266304
let version_file = self
267305
.version_files
@@ -398,7 +436,7 @@ impl GarbageCollectorOrchestrator {
398436
output.collection_id, output.version
399437
)))?;
400438

401-
let root = graph
439+
let root_index = graph
402440
.node_indices()
403441
.find(|&n| {
404442
graph
@@ -409,20 +447,26 @@ impl GarbageCollectorOrchestrator {
409447
.ok_or(GarbageCollectorError::InvariantViolation(
410448
"Expected to find root node".to_string(),
411449
))?;
412-
413-
let versions_from_root_to_this_node =
414-
petgraph::algo::astar(graph, root, |finish| finish == this_node, |_| 1, |_| 0)
415-
.ok_or(GarbageCollectorError::InvariantViolation(format!(
416-
"Expected to find path from root to node for {}@v{}",
417-
output.collection_id, output.version
418-
)))?
419-
.1
420-
.into_iter()
421-
.map(|i| {
422-
let node = graph.node_weight(i).expect("Node should exist");
423-
node.version
424-
})
425-
.collect::<Vec<_>>();
450+
let root = graph.node_weight(root_index).expect("Node should exist");
451+
452+
let versions_from_root_to_this_node = petgraph::algo::astar(
453+
graph,
454+
root_index,
455+
|finish| finish == this_node,
456+
|_| 1,
457+
|_| 0,
458+
)
459+
.ok_or(GarbageCollectorError::InvariantViolation(format!(
460+
"Expected to find path from root ({}@v{}) to node for {}@v{}",
461+
root.collection_id, root.version, output.collection_id, output.version
462+
)))?
463+
.1
464+
.into_iter()
465+
.map(|i| {
466+
let node = graph.node_weight(i).expect("Node should exist");
467+
node.version
468+
})
469+
.collect::<Vec<_>>();
426470
let are_all_versions_v0 = versions_from_root_to_this_node
427471
.iter()
428472
.all(|&version| version == 0);
@@ -523,14 +567,15 @@ impl GarbageCollectorOrchestrator {
523567
"Expected there to be at least one version file".to_string(),
524568
))?;
525569
// Assumes that all collections in a fork tree are under the same tenant
526-
let tenant_id = version_file
527-
.collection_info_immutable
528-
.as_ref()
529-
.ok_or(GarbageCollectorError::InvariantViolation(
570+
let collection_info = version_file.collection_info_immutable.as_ref().ok_or(
571+
GarbageCollectorError::InvariantViolation(
530572
"Expected collection_info_immutable to be set".to_string(),
531-
))?
532-
.tenant_id
533-
.clone();
573+
),
574+
)?;
575+
let tenant_id = collection_info.tenant_id.clone();
576+
self.tenant = Some(tenant_id.clone());
577+
let database_name = collection_info.database_name.clone();
578+
self.database_name = Some(database_name.clone());
534579

535580
let task = wrap(
536581
Box::new(DeleteUnusedFilesOperator::new(
@@ -659,6 +704,91 @@ impl GarbageCollectorOrchestrator {
659704

660705
Ok(())
661706
}
707+
708+
async fn handle_delete_versions_output(
709+
&mut self,
710+
output: DeleteVersionsAtSysDbOutput,
711+
ctx: &ComponentContext<Self>,
712+
) -> Result<(), GarbageCollectorError> {
713+
tracing::trace!("Received DeleteVersionsAtSysDbOutput: {:#?}", output);
714+
self.num_versions_deleted += output.versions_to_delete.versions.len() as u32;
715+
716+
self.num_pending_tasks -= 1;
717+
if self.num_pending_tasks == 0 {
718+
for collection_id in self.soft_deleted_collections_to_gc.drain() {
719+
let graph =
720+
self.graph
721+
.as_ref()
722+
.ok_or(GarbageCollectorError::InvariantViolation(
723+
"Expected graph to be set".to_string(),
724+
))?;
725+
726+
let first_collection_node = graph
727+
.node_indices()
728+
.filter(|&n| {
729+
let node = graph.node_weight(n).expect("Node should exist");
730+
node.collection_id == collection_id
731+
})
732+
.max_by(|a, b| {
733+
// todo: does this sort in the right direction?
734+
let a_node = graph.node_weight(*a).expect("Node should exist");
735+
let b_node = graph.node_weight(*b).expect("Node should exist");
736+
b_node.version.cmp(&a_node.version)
737+
})
738+
.ok_or(GarbageCollectorError::InvariantViolation(format!(
739+
"Expected to find node for collection {}",
740+
collection_id
741+
)))?;
742+
743+
let first_node = graph
744+
.node_weight(first_collection_node)
745+
.expect("Node should exist");
746+
tracing::debug!(
747+
"First node for collection {}: {:#?}",
748+
collection_id,
749+
first_node
750+
);
751+
752+
let mut dfs = petgraph::visit::Dfs::new(graph, first_collection_node);
753+
let mut seen_collection_ids: HashSet<CollectionUuid> = HashSet::new();
754+
755+
while let Some(nx) = dfs.next(&graph) {
756+
let node = graph.node_weight(nx).expect("Node should exist");
757+
seen_collection_ids.insert(node.collection_id);
758+
}
759+
760+
let is_leaf_node_in_fork_tree =
761+
seen_collection_ids.len() == 1 && seen_collection_ids.contains(&collection_id);
762+
763+
if is_leaf_node_in_fork_tree {
764+
self.sysdb_client
765+
.finish_collection_deletion(
766+
self.tenant.clone().ok_or(
767+
GarbageCollectorError::InvariantViolation(
768+
"Expected tenant to be set".to_string(),
769+
),
770+
)?,
771+
self.database_name.clone().ok_or(
772+
GarbageCollectorError::InvariantViolation(
773+
"Expected database to be set".to_string(),
774+
),
775+
)?,
776+
collection_id,
777+
)
778+
.await?;
779+
}
780+
}
781+
782+
let response = GarbageCollectorResponse {
783+
num_files_deleted: self.num_files_deleted,
784+
num_versions_deleted: self.num_versions_deleted,
785+
};
786+
787+
self.terminate_with_result(Ok(response), ctx).await;
788+
}
789+
790+
Ok(())
791+
}
662792
}
663793

664794
#[async_trait]
@@ -774,18 +904,9 @@ impl Handler<TaskResult<DeleteVersionsAtSysDbOutput, DeleteVersionsAtSysDbError>
774904
Some(output) => output,
775905
None => return,
776906
};
777-
tracing::trace!("Received DeleteVersionsAtSysDbOutput: {:#?}", output);
778-
self.num_versions_deleted += output.versions_to_delete.versions.len() as u32;
779-
780-
self.num_pending_tasks -= 1;
781-
if self.num_pending_tasks == 0 {
782-
let response = GarbageCollectorResponse {
783-
num_files_deleted: self.num_files_deleted,
784-
num_versions_deleted: self.num_versions_deleted,
785-
};
786907

787-
self.terminate_with_result(Ok(response), ctx).await;
788-
}
908+
let res = self.handle_delete_versions_output(output, ctx).await;
909+
self.ok_or_terminate(res, ctx).await;
789910
}
790911
}
791912

0 commit comments

Comments
 (0)