Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
WIP (docs/tests)
Browse files Browse the repository at this point in the history
  • Loading branch information
cdata committed Aug 3, 2023
1 parent ef12e4a commit 5b8ab9c
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 46 deletions.
15 changes: 13 additions & 2 deletions rust/noosphere-cli/src/native/paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use cid::{multihash::Code, multihash::MultihashDigest, Cid};
use libipld_core::raw::RawCodec;
use noosphere_core::data::{Did, MemoIpld};
use noosphere_storage::base64_encode;
use std::path::{Path, PathBuf};
use std::{
fmt::Debug,
path::{Path, PathBuf},
};

use super::extension::infer_file_extension;

Expand Down Expand Up @@ -34,7 +37,7 @@ pub const LINK_RECORD_FILE: &str = "link_record";
/// https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry
/// See also:
/// https://learn.microsoft.com/en-us/windows/win32/fileio/naming-a-file#naming-conventions
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct SpherePaths {
root: PathBuf,
sphere: PathBuf,
Expand All @@ -47,6 +50,14 @@ pub struct SpherePaths {
depth: PathBuf,
}

impl Debug for SpherePaths {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpherePaths")
.field("root", &self.root)
.finish()
}
}

impl SpherePaths {
/// Returns true if the given path has a .sphere folder
fn has_sphere_directory(path: &Path) -> bool {
Expand Down
44 changes: 29 additions & 15 deletions rust/noosphere-cli/src/native/render/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl SphereRenderRequest {
pub enum JobKind {
Root,
Peer(Did, Cid, LinkRecord),
RefreshPeers,
}

pub struct SphereRenderJob<C, S>
Expand Down Expand Up @@ -103,6 +104,11 @@ where
return Err(anyhow!("No peer found at {}", self.petname_path.join(".")));
};
}
JobKind::RefreshPeers => {
debug!("Running refresh peers render job...");
self.refresh_peers(SphereCursor::latest(self.context.clone()))
.await?;
}
}

Ok(())
Expand All @@ -115,23 +121,37 @@ where

debug!("Starting full render of {identity} @ {version}...");

let content_stream = SphereWalker::from(&cursor).into_content_stream();
{
let content_stream = SphereWalker::from(&cursor).into_content_stream();

tokio::pin!(content_stream);
tokio::pin!(content_stream);

let mut content_change_buffer = ChangeBuffer::new(CONTENT_CHANGE_BUFFER_CAPACITY);
let mut content_change_buffer = ChangeBuffer::new(CONTENT_CHANGE_BUFFER_CAPACITY);

// Write all content
while let Some((slug, file)) = content_stream.try_next().await? {
content_change_buffer.add(slug, file)?;
// Write all content
while let Some((slug, file)) = content_stream.try_next().await? {
content_change_buffer.add(slug, file)?;

if content_change_buffer.is_full() {
content_change_buffer.flush_to_writer(&self.writer).await?;
if content_change_buffer.is_full() {
content_change_buffer.flush_to_writer(&self.writer).await?;
}
}

content_change_buffer.flush_to_writer(&self.writer).await?;
}

content_change_buffer.flush_to_writer(&self.writer).await?;
self.refresh_peers(cursor).await?;

// Write out the latest version that was rendered
tokio::try_join!(
self.writer.write_identity_and_version(&identity, &version),
self.writer.write_link_record()
)?;

Ok(())
}

async fn refresh_peers(&self, cursor: SphereCursor<C, S>) -> Result<()> {
let petname_stream = SphereWalker::from(&cursor).into_petname_stream();
let db = cursor.sphere_context().await?.db().clone();

Expand Down Expand Up @@ -181,12 +201,6 @@ where

petname_change_buffer.flush_to_writer(&self.writer).await?;

// Write out the latest version that was rendered
tokio::try_join!(
self.writer.write_identity_and_version(&identity, &version),
self.writer.write_link_record()
)?;

Ok(())
}

Expand Down
66 changes: 59 additions & 7 deletions rust/noosphere-cli/src/native/render/renderer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Result;
use noosphere_sphere::HasSphereContext;
use noosphere_storage::Storage;
use std::{collections::BTreeSet, marker::PhantomData, sync::Arc, thread::available_parallelism};

use tokio::{select, task::JoinSet};

use super::{SphereRenderJob, SphereRenderRequest};
Expand Down Expand Up @@ -35,6 +36,19 @@ where
}
}

async fn reset_peers(&self) -> Result<()> {
if let Err(error) = tokio::fs::remove_dir_all(self.paths.peers()).await {
warn!(
path = ?self.paths.peers(),
"Failed attempt to reset peers: {}", error
);
}

tokio::fs::create_dir_all(self.paths.peers()).await?;

Ok(())
}

#[instrument(level = "debug", skip(self))]
pub async fn render(&self, depth: Option<u32>) -> Result<()> {
std::env::set_current_dir(self.paths.root())?;
Expand All @@ -45,14 +59,48 @@ where
let max_parallel_jobs = available_parallelism()?.get();
let (tx, mut rx) = tokio::sync::mpsc::channel::<SphereRenderRequest>(max_parallel_jobs);

let last_render_depth =
if let Ok(depth) = tokio::fs::read_to_string(self.paths.depth()).await {
depth.parse::<u32>().ok()
} else {
None
};

let render_depth = if let Some(depth) = depth {
depth
} else if let Ok(depth) = tokio::fs::read_to_string(self.paths.depth()).await {
depth.parse::<u32>().unwrap_or(DEFAULT_RENDER_DEPTH)
} else {
DEFAULT_RENDER_DEPTH
last_render_depth.unwrap_or(DEFAULT_RENDER_DEPTH)
};

if let Some(last_render_depth) = last_render_depth {
if render_depth > last_render_depth {
// NOTE: Sequencing is important here. This reset is performed
// by the renderer in advance of queuing any work because we
// cannot guarantee the order in which requests to render peers
// may come in, and it could happen out-of-order with a "refresh
// peers" job that is running concurrently.
self.reset_peers().await?;

debug!(
?max_parallel_jobs,
?render_depth,
"Spawning peer refresh render job for {}...",
self.context.identity().await?
);

render_jobs.spawn(
SphereRenderJob::new(
self.context.clone(),
JobKind::RefreshPeers,
self.paths.clone(),
Vec::new(),
tx.clone(),
)
.render(),
);
}
}

debug!(
?max_parallel_jobs,
?render_depth,
Expand Down Expand Up @@ -94,15 +142,19 @@ where
continue;
}

debug!("Queuing render job for {} @ {}...", job_id.0, job_id.1);
started_jobs.insert(job_id);

let SphereRenderRequest(petname_path, peer, version, link_record) = job_request;

// NOTE: If a peer is too deep, we _don't_ mark it as started; another
// peer may wish to render this peer at a shallower depth, in which case
// we should proceed.
if petname_path.len() > render_depth as usize {
debug!("Skipping '@{}' (exceeds max render depth)", petname_path.join("."));
debug!("Skipping render job for '@{}' (exceeds max render depth {render_depth})", petname_path.join("."));
continue;
}
warn!("PETNAME PATH {:?}", petname_path);

debug!("Queuing render job for {} @ {}...", job_id.0, job_id.1);
started_jobs.insert(job_id);

if self.paths.peer(&peer, &version).exists() {
// TODO: We may need to re-render if a previous
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-cli/src/native/render/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ impl SphereWriter {

pub fn mount(&self) -> &Path {
self.mount.get_or_init(|| match &self.kind {
JobKind::Root => self.base().to_owned(),
JobKind::Root | JobKind::RefreshPeers => self.base().to_owned(),
JobKind::Peer(_, _, _) => self.base().join(MOUNT_DIRECTORY),
})
}

pub fn base(&self) -> &Path {
self.base.get_or_init(|| match &self.kind {
JobKind::Root => self.paths.root().to_owned(),
JobKind::Root | JobKind::RefreshPeers => self.paths.root().to_owned(),
JobKind::Peer(did, cid, _) => self.paths.peer(did, cid),
})
}

pub fn private(&self) -> &Path {
self.private.get_or_init(|| match &self.kind {
JobKind::Root => self.paths().sphere().to_owned(),
JobKind::Root | JobKind::RefreshPeers => self.paths().sphere().to_owned(),
JobKind::Peer(_, _, _) => self.base().to_owned(),
})
}
Expand Down
Loading

0 comments on commit 5b8ab9c

Please sign in to comment.