diff --git a/rust/noosphere-cli/src/native/paths.rs b/rust/noosphere-cli/src/native/paths.rs index 28a42ad33..70b708a64 100644 --- a/rust/noosphere-cli/src/native/paths.rs +++ b/rust/noosphere-cli/src/native/paths.rs @@ -3,7 +3,10 @@ use cid::{multihash::Code, multihash::MultihashDigest, Cid}; use libipld_core::raw::RawCodec; use noosphere_core::data::{Did, MemoIpld}; use noosphere_storage::base64_encode; -use std::path::{Path, PathBuf}; +use std::{ + fmt::Debug, + path::{Path, PathBuf}, +}; use super::extension::infer_file_extension; @@ -34,7 +37,7 @@ pub const LINK_RECORD_FILE: &str = "link_record"; /// https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry /// See also: /// https://learn.microsoft.com/en-us/windows/win32/fileio/naming-a-file#naming-conventions -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct SpherePaths { root: PathBuf, sphere: PathBuf, @@ -47,6 +50,14 @@ pub struct SpherePaths { depth: PathBuf, } +impl Debug for SpherePaths { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SpherePaths") + .field("root", &self.root) + .finish() + } +} + impl SpherePaths { /// Returns true if the given path has a .sphere folder fn has_sphere_directory(path: &Path) -> bool { diff --git a/rust/noosphere-cli/src/native/render/job.rs b/rust/noosphere-cli/src/native/render/job.rs index e151cb08a..265cbb856 100644 --- a/rust/noosphere-cli/src/native/render/job.rs +++ b/rust/noosphere-cli/src/native/render/job.rs @@ -31,6 +31,7 @@ impl SphereRenderRequest { pub enum JobKind { Root, Peer(Did, Cid, LinkRecord), + RefreshPeers, } pub struct SphereRenderJob @@ -103,6 +104,11 @@ where return Err(anyhow!("No peer found at {}", self.petname_path.join("."))); }; } + JobKind::RefreshPeers => { + debug!("Running refresh peers render job..."); + self.refresh_peers(SphereCursor::latest(self.context.clone())) + .await?; + } } Ok(()) @@ -115,23 +121,37 @@ where debug!("Starting full render of {identity} @ {version}..."); - let content_stream = SphereWalker::from(&cursor).into_content_stream(); + { + let content_stream = SphereWalker::from(&cursor).into_content_stream(); - tokio::pin!(content_stream); + tokio::pin!(content_stream); - let mut content_change_buffer = ChangeBuffer::new(CONTENT_CHANGE_BUFFER_CAPACITY); + let mut content_change_buffer = ChangeBuffer::new(CONTENT_CHANGE_BUFFER_CAPACITY); - // Write all content - while let Some((slug, file)) = content_stream.try_next().await? { - content_change_buffer.add(slug, file)?; + // Write all content + while let Some((slug, file)) = content_stream.try_next().await? { + content_change_buffer.add(slug, file)?; - if content_change_buffer.is_full() { - content_change_buffer.flush_to_writer(&self.writer).await?; + if content_change_buffer.is_full() { + content_change_buffer.flush_to_writer(&self.writer).await?; + } } + + content_change_buffer.flush_to_writer(&self.writer).await?; } - content_change_buffer.flush_to_writer(&self.writer).await?; + self.refresh_peers(cursor).await?; + + // Write out the latest version that was rendered + tokio::try_join!( + self.writer.write_identity_and_version(&identity, &version), + self.writer.write_link_record() + )?; + + Ok(()) + } + async fn refresh_peers(&self, cursor: SphereCursor) -> Result<()> { let petname_stream = SphereWalker::from(&cursor).into_petname_stream(); let db = cursor.sphere_context().await?.db().clone(); @@ -181,12 +201,6 @@ where petname_change_buffer.flush_to_writer(&self.writer).await?; - // Write out the latest version that was rendered - tokio::try_join!( - self.writer.write_identity_and_version(&identity, &version), - self.writer.write_link_record() - )?; - Ok(()) } diff --git a/rust/noosphere-cli/src/native/render/renderer.rs b/rust/noosphere-cli/src/native/render/renderer.rs index ac6040cd3..c2ba11542 100644 --- a/rust/noosphere-cli/src/native/render/renderer.rs +++ b/rust/noosphere-cli/src/native/render/renderer.rs @@ -2,6 +2,7 @@ use anyhow::Result; use noosphere_sphere::HasSphereContext; use noosphere_storage::Storage; use std::{collections::BTreeSet, marker::PhantomData, sync::Arc, thread::available_parallelism}; + use tokio::{select, task::JoinSet}; use super::{SphereRenderJob, SphereRenderRequest}; @@ -35,6 +36,19 @@ where } } + async fn reset_peers(&self) -> Result<()> { + if let Err(error) = tokio::fs::remove_dir_all(self.paths.peers()).await { + warn!( + path = ?self.paths.peers(), + "Failed attempt to reset peers: {}", error + ); + } + + tokio::fs::create_dir_all(self.paths.peers()).await?; + + Ok(()) + } + #[instrument(level = "debug", skip(self))] pub async fn render(&self, depth: Option) -> Result<()> { std::env::set_current_dir(self.paths.root())?; @@ -45,14 +59,48 @@ where let max_parallel_jobs = available_parallelism()?.get(); let (tx, mut rx) = tokio::sync::mpsc::channel::(max_parallel_jobs); + let last_render_depth = + if let Ok(depth) = tokio::fs::read_to_string(self.paths.depth()).await { + depth.parse::().ok() + } else { + None + }; + let render_depth = if let Some(depth) = depth { depth - } else if let Ok(depth) = tokio::fs::read_to_string(self.paths.depth()).await { - depth.parse::().unwrap_or(DEFAULT_RENDER_DEPTH) } else { - DEFAULT_RENDER_DEPTH + last_render_depth.unwrap_or(DEFAULT_RENDER_DEPTH) }; + if let Some(last_render_depth) = last_render_depth { + if render_depth > last_render_depth { + // NOTE: Sequencing is important here. This reset is performed + // by the renderer in advance of queuing any work because we + // cannot guarantee the order in which requests to render peers + // may come in, and it could happen out-of-order with a "refresh + // peers" job that is running concurrently. + self.reset_peers().await?; + + debug!( + ?max_parallel_jobs, + ?render_depth, + "Spawning peer refresh render job for {}...", + self.context.identity().await? + ); + + render_jobs.spawn( + SphereRenderJob::new( + self.context.clone(), + JobKind::RefreshPeers, + self.paths.clone(), + Vec::new(), + tx.clone(), + ) + .render(), + ); + } + } + debug!( ?max_parallel_jobs, ?render_depth, @@ -94,15 +142,19 @@ where continue; } - debug!("Queuing render job for {} @ {}...", job_id.0, job_id.1); - started_jobs.insert(job_id); - let SphereRenderRequest(petname_path, peer, version, link_record) = job_request; + // NOTE: If a peer is too deep, we _don't_ mark it as started; another + // peer may wish to render this peer at a shallower depth, in which case + // we should proceed. if petname_path.len() > render_depth as usize { - debug!("Skipping '@{}' (exceeds max render depth)", petname_path.join(".")); + debug!("Skipping render job for '@{}' (exceeds max render depth {render_depth})", petname_path.join(".")); continue; } + warn!("PETNAME PATH {:?}", petname_path); + + debug!("Queuing render job for {} @ {}...", job_id.0, job_id.1); + started_jobs.insert(job_id); if self.paths.peer(&peer, &version).exists() { // TODO: We may need to re-render if a previous diff --git a/rust/noosphere-cli/src/native/render/writer.rs b/rust/noosphere-cli/src/native/render/writer.rs index dafcb7196..6ef4a5c5d 100644 --- a/rust/noosphere-cli/src/native/render/writer.rs +++ b/rust/noosphere-cli/src/native/render/writer.rs @@ -46,21 +46,21 @@ impl SphereWriter { pub fn mount(&self) -> &Path { self.mount.get_or_init(|| match &self.kind { - JobKind::Root => self.base().to_owned(), + JobKind::Root | JobKind::RefreshPeers => self.base().to_owned(), JobKind::Peer(_, _, _) => self.base().join(MOUNT_DIRECTORY), }) } pub fn base(&self) -> &Path { self.base.get_or_init(|| match &self.kind { - JobKind::Root => self.paths.root().to_owned(), + JobKind::Root | JobKind::RefreshPeers => self.paths.root().to_owned(), JobKind::Peer(did, cid, _) => self.paths.peer(did, cid), }) } pub fn private(&self) -> &Path { self.private.get_or_init(|| match &self.kind { - JobKind::Root => self.paths().sphere().to_owned(), + JobKind::Root | JobKind::RefreshPeers => self.paths().sphere().to_owned(), JobKind::Peer(_, _, _) => self.base().to_owned(), }) } diff --git a/rust/noosphere-cli/tests/cli.rs b/rust/noosphere-cli/tests/cli.rs index 66c620f54..23bcdebfb 100644 --- a/rust/noosphere-cli/tests/cli.rs +++ b/rust/noosphere-cli/tests/cli.rs @@ -178,23 +178,30 @@ mod multiplayer { let mut pair_2 = SpherePair::new("TWO", &ipfs_url, &ns_url).await?; let mut pair_3 = SpherePair::new("THREE", &ipfs_url, &ns_url).await?; let mut pair_4 = SpherePair::new("FOUR", &ipfs_url, &ns_url).await?; + let mut pair_5 = SpherePair::new("FIVE", &ipfs_url, &ns_url).await?; pair_1.start_gateway().await?; pair_2.start_gateway().await?; pair_3.start_gateway().await?; pair_4.start_gateway().await?; + pair_5.start_gateway().await?; let sphere_1_id = pair_1.client.identity.clone(); let sphere_2_id = pair_2.client.identity.clone(); let sphere_3_id = pair_3.client.identity.clone(); let sphere_4_id = pair_4.client.identity.clone(); + let sphere_5_id = pair_5.client.identity.clone(); - for (index, pair) in [&pair_1, &pair_2, &pair_3, &pair_4].iter().enumerate() { + for (index, pair) in [&pair_1, &pair_2, &pair_3, &pair_4, &pair_5] + .iter() + .enumerate() + { pair.spawn(move |mut ctx| async move { + let id = index + 1; ctx.write( - format!("content{}", index).as_str(), + format!("content{}", id).as_str(), "text/plain", - format!("foo{}", index).as_bytes(), + format!("foo{}", id).as_bytes(), None, ) .await?; @@ -263,6 +270,7 @@ mod multiplayer { }) .await?; + // Join the first sphere cli.orb(&[ "sphere", "join", @@ -277,11 +285,11 @@ mod multiplayer { .await?; let expected_content = [ - ("content0.txt", "foo0"), - ("@peer2/content1.txt", "foo1"), - ("@peer3/content2.txt", "foo2"), - ("@peer2/@peer3-of-peer2/content2.txt", "foo2"), - ("@peer2/@peer4/content3.txt", "foo3"), + ("content1.txt", "foo1"), + ("@peer2/content2.txt", "foo2"), + ("@peer3/content3.txt", "foo3"), + ("@peer2/@peer3-of-peer2/content3.txt", "foo3"), + ("@peer2/@peer4/content4.txt", "foo4"), (".sphere/identity", &sphere_1_id), (".sphere/version", &sphere_1_version.to_string()), ]; @@ -293,26 +301,45 @@ mod multiplayer { assert_eq!(&tokio::fs::read_to_string(&path).await?, content); } + // Change a peer-of-my-peer pair_4 .spawn(move |mut ctx| async move { ctx.write( - "content3", + "content4", "text/plain", - "foo3 and something new".as_bytes(), + "foo4 and something new".as_bytes(), None, ) .await?; + ctx.set_petname("peer5", Some(sphere_5_id)).await?; ctx.save(None).await?; ctx.sync(SyncRecovery::Retry(3)).await?; + wait(1).await; + ctx.sync(SyncRecovery::Retry(3)).await?; Ok(()) }) .await?; + // Add another level of depth to the graph + pair_3 + .spawn(move |mut ctx| async move { + ctx.set_petname("peer4-of-peer3", Some(sphere_4_id)).await?; + ctx.save(None).await?; + ctx.sync(SyncRecovery::Retry(3)).await?; + wait(1).await; + ctx.sync(SyncRecovery::Retry(3)).await?; + + Ok(()) + }) + .await?; + + // Change a peer pair_2 .spawn(move |mut ctx| async move { ctx.write("newcontent", "text/plain", "new".as_bytes(), None) .await?; + ctx.set_petname("peer4", None).await?; ctx.save(None).await?; ctx.sync(SyncRecovery::Retry(3)).await?; wait(1).await; @@ -322,10 +349,12 @@ mod multiplayer { }) .await?; + // Rename a peer let sphere_1_version = pair_1 .spawn(move |mut ctx| async move { ctx.set_petname("peer3", None).await?; - ctx.set_petname("peer3-renamed", Some(sphere_3_id)).await?; + ctx.set_petname("peer2", None).await?; + ctx.set_petname("peer2-renamed", Some(sphere_2_id)).await?; ctx.save(None).await?; ctx.sync(SyncRecovery::Retry(3)).await?; wait(1).await; @@ -335,15 +364,23 @@ mod multiplayer { }) .await?; + // Sync to get the latest remote changes cli.orb(&["sphere", "sync", "--auto-retry", "3"]).await?; let expected_content = [ - ("content0.txt", "foo0"), - ("@peer2/content1.txt", "foo1"), - ("@peer2/newcontent.txt", "new"), - ("@peer3-renamed/content2.txt", "foo2"), - ("@peer2/@peer3-of-peer2/content2.txt", "foo2"), - ("@peer2/@peer4/content3.txt", "foo3 and something new"), + ("content1.txt", "foo1"), + ("@peer2-renamed/content2.txt", "foo2"), + ("@peer2-renamed/newcontent.txt", "new"), + ("@peer2-renamed/@peer3-of-peer2/content3.txt", "foo3"), + ( + "@peer2-renamed/@peer3-of-peer2/@peer4-of-peer3/content4.txt", + "foo4 and something new", + ), + ("@peer2-renamed/@peer3-of-peer2/content3.txt", "foo3"), + ( + "@peer2-renamed/@peer3-of-peer2/@peer4-of-peer3/content4.txt", + "foo4 and something new", + ), (".sphere/identity", &sphere_1_id), (".sphere/version", &sphere_1_version.to_string()), ]; @@ -355,7 +392,34 @@ mod multiplayer { assert_eq!(&tokio::fs::read_to_string(&path).await?, content); } - assert!(!tokio::fs::try_exists(&cli.sphere_directory().join("@peer3/content.txt")).await?); + let unexpected_content = [ + // Peer removed + "@peer3/content3.txt", + // Peer renamed + "@peer2/content2.txt", + // Peer removed + "@peer2-renamed/@peer4/content4.txt", + // Peer depth greater than render depth + "@peer2-renamed/@peer3-of-peer2/@peer4-of-peer3/@peer5/content5.txt", + ]; + + for path in unexpected_content { + tracing::info!("CHECKING UNEXPECTED CONTENT {}", path); + assert!(!tokio::fs::try_exists(&cli.sphere_directory().join(path)).await?); + } + + // Sync again, but with a greater render depth + cli.orb(&["sphere", "sync", "--auto-retry", "3", "--render-depth", "4"]) + .await?; + + // Previously omitted peer should be rendered now + assert!( + tokio::fs::try_exists( + &cli.sphere_directory() + .join("@peer2-renamed/@peer3-of-peer2/@peer4-of-peer3/@peer5/content5.txt") + ) + .await? + ); ns_task.abort(); diff --git a/rust/noosphere-core/src/data/address.rs b/rust/noosphere-core/src/data/address.rs index 238484407..ea8664086 100644 --- a/rust/noosphere-core/src/data/address.rs +++ b/rust/noosphere-core/src/data/address.rs @@ -4,6 +4,7 @@ use cid::Cid; use libipld_cbor::DagCborCodec; use noosphere_storage::BlockStore; use serde::{de, ser, Deserialize, Serialize}; +use std::fmt::Debug; use std::{convert::TryFrom, fmt::Display, ops::Deref, str::FromStr}; use ucan::{chain::ProofChain, crypto::did::DidParser, store::UcanJwtStore, Ucan}; @@ -59,10 +60,23 @@ impl IdentityIpld { /// A [LinkRecord] is a wrapper around a decoded [Jwt] ([Ucan]), /// representing a link address as a [Cid] to a sphere. -#[derive(Debug, Clone)] +#[derive(Clone)] #[repr(transparent)] pub struct LinkRecord(Ucan); +impl Debug for LinkRecord { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("LinkRecord") + .field( + &self + .0 + .to_cid(cid::multihash::Code::Blake3_256) + .map_or_else(|_| String::from(""), |cid| cid.to_string()), + ) + .finish() + } +} + impl LinkRecord { /// Validates the [Ucan] token as a [LinkRecord], ensuring that /// the sphere's owner authorized the publishing of a new