Skip to content

Commit 8f20043

Browse files
committed
[ENH]: add validation during garbage collection for empty file paths
1 parent 9dec43e commit 8f20043

File tree

3 files changed

+202
-15
lines changed

3 files changed

+202
-15
lines changed

rust/garbage_collector/src/construct_version_graph_orchestrator.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,28 @@ mod tests {
617617
let collection_id_c = CollectionUuid::new();
618618
let collection_id_d = CollectionUuid::new();
619619

620+
for collection_id in [
621+
collection_id_a,
622+
collection_id_b,
623+
collection_id_c,
624+
collection_id_d,
625+
] {
626+
sysdb
627+
.create_collection(
628+
"test_tenant".to_string(),
629+
"test_database".to_string(),
630+
collection_id,
631+
format!("test_collection_{}", collection_id),
632+
vec![],
633+
None,
634+
None,
635+
None,
636+
false,
637+
)
638+
.await
639+
.unwrap();
640+
}
641+
620642
let version_file_a_path =
621643
create_version_file(collection_id_a, vec![0, 1], storage.clone()).await;
622644
let version_file_b_path =

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 165 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::construct_version_graph_orchestrator::{
2-
ConstructVersionGraphError, ConstructVersionGraphOrchestrator,
2+
ConstructVersionGraphError, ConstructVersionGraphOrchestrator, VersionGraph,
33
};
44
use crate::operators::compute_versions_to_delete_from_graph::{
55
CollectionVersionAction, ComputeVersionsToDeleteError, ComputeVersionsToDeleteInput,
@@ -60,6 +60,7 @@ pub struct GarbageCollectorOrchestrator {
6060
file_ref_counts: HashMap<String, u32>,
6161
num_pending_tasks: usize,
6262
min_versions_to_keep: u32,
63+
graph: Option<VersionGraph>,
6364

6465
num_files_deleted: u32,
6566
num_versions_deleted: u32,
@@ -105,6 +106,7 @@ impl GarbageCollectorOrchestrator {
105106
pending_list_files_at_version_tasks: HashSet::new(),
106107
num_pending_tasks: 0,
107108
min_versions_to_keep,
109+
graph: None,
108110

109111
num_files_deleted: 0,
110112
num_versions_deleted: 0,
@@ -215,6 +217,7 @@ impl GarbageCollectorOrchestrator {
215217
);
216218
let output = orchestrator.run(self.system.clone()).await?;
217219
self.version_files = output.version_files;
220+
self.graph = Some(output.graph.clone());
218221

219222
let task = wrap(
220223
Box::new(ComputeVersionsToDeleteOperator {}),
@@ -375,6 +378,63 @@ impl GarbageCollectorOrchestrator {
375378
output.file_paths,
376379
);
377380

381+
if output.file_paths.is_empty() {
382+
// We only allow empty file paths if the version is 0 and all ancestors are also at v0. Otherwise, compaction should have flushed new file paths. This check is defensive and should never fail.
383+
let graph = self
384+
.graph
385+
.as_ref()
386+
.ok_or(GarbageCollectorError::InvariantViolation(
387+
"Expected graph to be set".to_string(),
388+
))?;
389+
390+
let this_node = graph
391+
.node_indices()
392+
.find(|&n| {
393+
let node = graph.node_weight(n).expect("Node should exist");
394+
node.collection_id == output.collection_id && node.version == output.version
395+
})
396+
.ok_or(GarbageCollectorError::InvariantViolation(format!(
397+
"Expected to find node for collection {} at version {}",
398+
output.collection_id, output.version
399+
)))?;
400+
401+
let root = graph
402+
.node_indices()
403+
.find(|&n| {
404+
graph
405+
.neighbors_directed(n, petgraph::Direction::Incoming)
406+
.next()
407+
.is_none()
408+
})
409+
.ok_or(GarbageCollectorError::InvariantViolation(
410+
"Expected to find root node".to_string(),
411+
))?;
412+
413+
let versions_from_root_to_this_node =
414+
petgraph::algo::astar(graph, root, |finish| finish == this_node, |_| 1, |_| 0)
415+
.ok_or(GarbageCollectorError::InvariantViolation(format!(
416+
"Expected to find path from root to node for {}@v{}",
417+
output.collection_id, output.version
418+
)))?
419+
.1
420+
.into_iter()
421+
.map(|i| {
422+
let node = graph.node_weight(i).expect("Node should exist");
423+
node.version
424+
})
425+
.collect::<Vec<_>>();
426+
let are_all_versions_v0 = versions_from_root_to_this_node
427+
.iter()
428+
.all(|&version| version == 0);
429+
430+
if !are_all_versions_v0 {
431+
return Err(GarbageCollectorError::InvariantViolation(format!(
432+
"Version {} of collection {} has no file paths, but has non-v0 ancestors. This should never happen.",
433+
output.version, output.collection_id
434+
)));
435+
}
436+
}
437+
378438
// Update the file ref counts. Counts in the map should:
379439
// - be 0 if we know about the file but it is unused
380440
// - be > 0 if we know about the file and it is used
@@ -728,3 +788,107 @@ impl Handler<TaskResult<DeleteVersionsAtSysDbOutput, DeleteVersionsAtSysDbError>
728788
}
729789
}
730790
}
791+
792+
#[cfg(test)]
793+
mod tests {
794+
use super::GarbageCollectorOrchestrator;
795+
use chroma_blockstore::RootManager;
796+
use chroma_cache::nop::NopCache;
797+
use chroma_storage::test_storage;
798+
use chroma_sysdb::TestSysDb;
799+
use chroma_system::{Dispatcher, Orchestrator, System};
800+
use chroma_types::{
801+
CollectionUuid, Segment, SegmentFlushInfo, SegmentScope, SegmentType, SegmentUuid,
802+
};
803+
use chrono::DateTime;
804+
use std::{collections::HashMap, sync::Arc, time::SystemTime};
805+
806+
#[tokio::test(flavor = "multi_thread")]
807+
async fn errors_on_empty_file_paths() {
808+
let storage = test_storage();
809+
let mut test_sysdb = TestSysDb::new();
810+
test_sysdb.set_storage(Some(storage.clone()));
811+
let mut sysdb = chroma_sysdb::SysDb::Test(test_sysdb);
812+
813+
let system = System::new();
814+
let dispatcher = Dispatcher::new(Default::default());
815+
let dispatcher_handle = system.start_component(dispatcher);
816+
let root_manager = RootManager::new(storage.clone(), Box::new(NopCache));
817+
818+
let tenant = "test_tenant".to_string();
819+
let database = "test_database".to_string();
820+
821+
let root_collection_id = CollectionUuid::new();
822+
let segment_id = SegmentUuid::new();
823+
let segment = Segment {
824+
id: segment_id,
825+
r#type: SegmentType::BlockfileMetadata,
826+
scope: SegmentScope::METADATA,
827+
collection: root_collection_id,
828+
metadata: None,
829+
file_path: HashMap::new(),
830+
};
831+
832+
sysdb
833+
.create_collection(
834+
tenant.clone(),
835+
database,
836+
root_collection_id,
837+
"Root Collection".to_string(),
838+
vec![segment],
839+
None,
840+
None,
841+
None,
842+
false,
843+
)
844+
.await
845+
.unwrap();
846+
847+
// Create v1 with no file paths
848+
sysdb
849+
.flush_compaction(
850+
tenant,
851+
root_collection_id,
852+
0,
853+
0,
854+
Arc::new([SegmentFlushInfo {
855+
segment_id,
856+
file_paths: HashMap::new(),
857+
}]),
858+
0,
859+
0,
860+
)
861+
.await
862+
.unwrap();
863+
864+
// Should fail
865+
let mut collections = sysdb
866+
.get_collections(Some(root_collection_id), None, None, None, None, 0)
867+
.await
868+
.unwrap();
869+
let root_collection = collections.pop().unwrap();
870+
let orchestrator = GarbageCollectorOrchestrator::new(
871+
root_collection_id,
872+
root_collection.version_file_path.unwrap(),
873+
None,
874+
DateTime::from_timestamp(
875+
SystemTime::now()
876+
.duration_since(SystemTime::UNIX_EPOCH)
877+
.unwrap()
878+
.as_secs() as i64,
879+
0,
880+
)
881+
.unwrap(),
882+
sysdb,
883+
dispatcher_handle,
884+
system.clone(),
885+
storage,
886+
root_manager,
887+
crate::types::CleanupMode::Delete,
888+
1,
889+
);
890+
let result = orchestrator.run(system).await;
891+
assert!(result.is_err());
892+
assert!(format!("{:?}", result).contains("no file paths"));
893+
}
894+
}

rust/sysdb/src/test_sysdb.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ struct Inner {
3838
segments: HashMap<SegmentUuid, Segment>,
3939
tenant_last_compaction_time: HashMap<String, i64>,
4040
collection_to_version_file: HashMap<CollectionUuid, CollectionVersionFile>,
41-
collection_to_version_file_name: HashMap<CollectionUuid, String>,
4241
#[derivative(Debug = "ignore")]
4342
storage: Option<chroma_storage::Storage>,
4443
mock_time: u64,
@@ -53,7 +52,6 @@ impl TestSysDb {
5352
segments: HashMap::new(),
5453
tenant_last_compaction_time: HashMap::new(),
5554
collection_to_version_file: HashMap::new(),
56-
collection_to_version_file_name: HashMap::new(),
5755
storage: None,
5856
mock_time: 0,
5957
})),
@@ -100,9 +98,8 @@ impl TestSysDb {
10098
) {
10199
let mut inner = self.inner.lock();
102100

103-
inner
104-
.collection_to_version_file_name
105-
.insert(collection_id, version_file_path);
101+
let collection = inner.collections.get_mut(&collection_id).unwrap();
102+
collection.version_file_path = Some(version_file_path);
106103
}
107104

108105
fn filter_collections(
@@ -332,9 +329,11 @@ impl TestSysDb {
332329
VERSION_FILE_S3_PREFIX, collection_id, next_version
333330
);
334331

335-
inner
336-
.collection_to_version_file_name
337-
.insert(collection_id, version_file_name.clone());
332+
let collection = inner
333+
.collections
334+
.get_mut(&collection_id)
335+
.expect("Expected collection");
336+
collection.version_file_path = Some(version_file_name.clone());
338337

339338
// Serialize the version file to bytes and write to storage
340339
let version_file_bytes = version_file.encode_to_vec();
@@ -393,11 +392,8 @@ impl TestSysDb {
393392

394393
pub fn get_version_file_name(&self, collection_id: CollectionUuid) -> String {
395394
let inner = self.inner.lock();
396-
inner
397-
.collection_to_version_file_name
398-
.get(&collection_id)
399-
.unwrap()
400-
.clone()
395+
let collection = inner.collections.get(&collection_id).unwrap();
396+
collection.version_file_path.clone().unwrap()
401397
}
402398

403399
#[allow(clippy::too_many_arguments)]
@@ -569,7 +565,12 @@ impl TestSysDb {
569565
let inner = self.inner.lock();
570566
let mut paths = HashMap::new();
571567
for collection_id in collection_ids {
572-
if let Some(path) = inner.collection_to_version_file_name.get(&collection_id) {
568+
if let Some(path) = &inner
569+
.collections
570+
.get(&collection_id)
571+
.unwrap()
572+
.version_file_path
573+
{
573574
paths.insert(collection_id, path.clone());
574575
} else {
575576
return Err(BatchGetCollectionVersionFilePathsError::Grpc(

0 commit comments

Comments
 (0)