Skip to content

Commit

Permalink
feat: get multiple blocks in unixfs operations
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Jan 4, 2025
1 parent fdf666e commit 75236d2
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 68 deletions.
53 changes: 29 additions & 24 deletions src/unixfs/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,32 +215,37 @@ impl Stream for UnixfsCat {
// would probably always cost many unnecessary clones, but it would be nice to "shut"
// the subscriber so that it will only resolve to a value but still keep the operation
// going. Not that we have any "operation" concept of the Want yet.
let (next, _) = visit.pending_links();

let (next, pending) = visit.pending_links();
let borrow = &repo;
let block = borrow.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await.map_err(|e| TraversalFailed::Loading(*next, e))?;

let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?;

let list = std::iter::once(next).chain(pending);

let mut block = borrow.get_blocks(list).providers(&providers).set_local(local_only).timeout(timeout);

while let Some(block) = block.next().await {
let block = block.map_err(|e| TraversalFailed::Io(std::io::Error::other(e)))?;
let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?;

size += bytes.len();

if let Some(length) = length {
if size > length {
let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
fn_err()?;
return;
size += bytes.len();

if let Some(length) = length {
if size > length {
let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
fn_err()?;
return;
}
}

if !bytes.is_empty() {
yield Bytes::copy_from_slice(bytes);
}

match next_visit {
Some(v) => visit = v,
None => return,
}
}

if !bytes.is_empty() {
yield Bytes::copy_from_slice(bytes);
}

match next_visit {
Some(v) => visit = v,
None => return,
}

}
}.boxed();

Expand All @@ -265,8 +270,8 @@ impl std::future::IntoFuture for UnixfsCat {
}
Ok(data.into())
}
.instrument(span)
.boxed()
.instrument(span)
.boxed()
}
}

Expand Down
96 changes: 52 additions & 44 deletions src/unixfs/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,57 +150,65 @@ impl Stream for UnixfsGet {
let mut walker = Walker::new(*cid, root_name);

while walker.should_continue() {
let (next, _) = walker.pending_links();
let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
Ok(block) => block,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: e };
return;
}
};
let block_data = block.data();
let (next, pending) = walker.pending_links();

match walker.next(block_data, &mut cache) {
Ok(ContinuedWalk::Bucket(..)) => {}
Ok(ContinuedWalk::File(segment, _, _, _, size)) => {
let list = std::iter::once(next).chain(pending);

if segment.is_first() {
total_size = Some(size as usize);
yield UnixfsStatus::ProgressStatus { written, total_size };
let mut blocks = repo.get_blocks(list).providers(&providers).set_local(local_only).timeout(timeout);

while let Some(result) = blocks.next().await {
let block = match result {
Ok(block) => block,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: e };
return;
}
// even if the largest of files can have 256 kB blocks and about the same
// amount of content, try to consume it in small parts not to grow the buffers
// too much.

let mut n = 0usize;
let slice = segment.as_ref();
let total = slice.len();

while n < total {
let next = &slice[n..];
n += next.len();
if let Err(e) = file.write_all(next).await {
yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
return;
};

let block_data = block.data();

match walker.next(block_data, &mut cache) {
Ok(ContinuedWalk::Bucket(..)) => {}
Ok(ContinuedWalk::File(segment, _, _, _, size)) => {

if segment.is_first() {
total_size = Some(size as usize);
yield UnixfsStatus::ProgressStatus { written, total_size };
}
if let Err(e) = file.sync_all().await {
yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
return;
// even if the largest of files can have 256 kB blocks and about the same
// amount of content, try to consume it in small parts not to grow the buffers
// too much.

let mut n = 0usize;
let slice = segment.as_ref();
let total = slice.len();

while n < total {
let next = &slice[n..];
n += next.len();
if let Err(e) = file.write_all(next).await {
yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
return;
}
if let Err(e) = file.sync_all().await {
yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
return;
}

written += n;
}

written += n;
}

yield UnixfsStatus::ProgressStatus { written, total_size };
yield UnixfsStatus::ProgressStatus { written, total_size };

},
Ok(ContinuedWalk::Directory( .. )) | Ok(ContinuedWalk::RootDirectory( .. )) => {}, //TODO
Ok(ContinuedWalk::Symlink( .. )) => {},
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
return;
}
};
},
Ok(ContinuedWalk::Directory( .. )) | Ok(ContinuedWalk::RootDirectory( .. )) => {}, //TODO
Ok(ContinuedWalk::Symlink( .. )) => {},
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
return;
}
};
}
};

yield UnixfsStatus::CompletedStatus { path, written, total_size }
Expand Down

0 comments on commit 75236d2

Please sign in to comment.