diff --git a/daemon/src/index/index.rs b/daemon/src/index/index.rs index 022bd28..fa097a4 100644 --- a/daemon/src/index/index.rs +++ b/daemon/src/index/index.rs @@ -3,6 +3,7 @@ use super::*; #[derive(Clone)] pub struct DocumentIndex { config: Arc, + status: Arc>, inner: Arc>, } @@ -11,6 +12,7 @@ impl DocumentIndex { pub async fn new(config: Arc) -> DocumentIndex { DocumentIndex { inner: Arc::new(RwLock::new(DocumentIndexInner::new(Arc::clone(&config)).await)), + status: Arc::new(RwLock::new(IndexingStatus::default())), config, } } @@ -48,7 +50,7 @@ impl DocumentIndex { let mut to_list = Vec::new(); let mut to_load = HashMap::new(); let mut to_load_unprioritized = HashMap::new(); - + // List pinned elements let pinned = match list_pinned(&self.config.ipfs_rpc).await { Ok(pinned) => pinned, @@ -64,13 +66,15 @@ impl DocumentIndex { }; last_printed_error = None; to_list.extend(pinned.iter().filter_map(normalize_cid).filter(|cid| !listed.contains(cid))); + self.set_status(listed.len(), to_list.len(), loaded.len(), to_load.len(), to_load_unprioritized.len()).await; // Explore directories let start = Instant::now(); - let mut i = 0; if !to_list.is_empty() {debug!("{} elements to list", to_list.len())} while let Some(cid) = to_list.pop() { if !listed.insert(cid.clone()) {continue} + self.set_status(listed.len(), to_list.len()+1, loaded.len(), to_load.len(), to_load_unprioritized.len()).await; + let new_links = match ls(ipfs_rpc, cid.clone()).await { Ok(new_links) => new_links, Err(e) => { @@ -94,25 +98,21 @@ impl DocumentIndex { } to_list.sort(); to_list.dedup(); - i += 1; - if i % 500 == 0 { - debug!("Still listing pinned files ({i} in {:.02})", start.elapsed().as_secs_f32()); - } } // Load documents - i = 0; if !to_load.is_empty() {debug!("{} documents to load ({:.02?}s)", to_load.len(), start.elapsed().as_secs_f32())} - for (cid, (name, parent_cid)) in to_load.drain().chain(to_load_unprioritized.drain()) { + let (to_load_len, to_load_unprioritized_len) = (to_load.len(), to_load_unprioritized.len()); + for (i, (cid, (name, parent_cid))) in to_load.drain().chain(to_load_unprioritized.drain()).enumerate() { + let remaining_to_load = to_load_len.saturating_sub(i); + let remaining_unprioritized = std::cmp::min(to_load_unprioritized_len, to_load_len + to_load_unprioritized_len - i); + self.set_status(listed.len(), to_list.len(), loaded.len(), remaining_to_load, remaining_unprioritized).await; + if !loaded.insert(cid.clone()) {continue} let Ok(document) = fetch_document(ipfs_rpc, &cid).await else {continue}; let Some(inspected) = inspect_document(document) else {continue}; self.add_document(&cid, inspected).await; self.add_ancestor(&cid, name, false, &parent_cid).await; - i += 1; - if i % 500 == 0 { - debug!("Still loading files ({i} in {:.02})", start.elapsed().as_secs_f32()); - } } // Update filter @@ -126,6 +126,24 @@ impl DocumentIndex { } } + async fn set_status(&self, listed: usize, to_list: usize, loaded: usize, to_load: usize, to_load_unprioritized: usize) { + let mut status = self.status.write().await; + status.listed = listed; + status.to_list = to_list; + status.loaded = loaded; + status.to_load = to_load; + status.to_load_unprioritized = to_load_unprioritized; + } + + async fn set_status_updating_filter(&self, updating_filter: bool) { + let mut status = self.status.write().await; + status.updating_filter = updating_filter; + } + + pub async fn status(&self) -> IndexingStatus { + self.status.read().await.clone() + } + pub async fn documents(&self) -> HashSet { self.inner.read().await.documents() } @@ -147,7 +165,9 @@ impl DocumentIndex { } pub async fn update_filter(&self) { + self.set_status_updating_filter(true).await; self.inner.write().await.update_filter().await; + self.set_status_updating_filter(false).await; } } diff --git a/daemon/src/index/inner_im.rs b/daemon/src/index/inner_im.rs index da1167c..582da31 100644 --- a/daemon/src/index/inner_im.rs +++ b/daemon/src/index/inner_im.rs @@ -33,6 +33,7 @@ impl DocumentIndexInner { } } + #[allow(dead_code)] pub(super) async fn sweep(&mut self) {} pub fn documents(&self) -> HashSet { diff --git a/daemon/src/index/mod.rs b/daemon/src/index/mod.rs index 9328587..12c203e 100644 --- a/daemon/src/index/mod.rs +++ b/daemon/src/index/mod.rs @@ -8,8 +8,10 @@ const REFRESH_INTERVAL: u64 = 100; const SWEEP_INTERVAL: u64 = 30; mod index; +mod status; mod inner_common; pub use index::*; +pub use status::*; #[cfg(any(feature = "database-lmdb", feature = "database-mdbx"))] mod inner_db; diff --git a/daemon/src/index/status.rs b/daemon/src/index/status.rs new file mode 100644 index 0000000..4ff62a9 --- /dev/null +++ b/daemon/src/index/status.rs @@ -0,0 +1,11 @@ +use serde::Serialize; + +#[derive(Default, Debug, Clone, Serialize)] +pub struct IndexingStatus { + pub listed: usize, + pub to_list: usize, + pub loaded: usize, + pub to_load: usize, + pub to_load_unprioritized: usize, + pub updating_filter: bool, +}