Skip to content

Commit

Permalink
refactor(core): download archive chunks in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef committed Jul 31, 2024
1 parent 0bf83c0 commit 389b27b
Showing 1 changed file with 103 additions and 85 deletions.
188 changes: 103 additions & 85 deletions core/src/blockchain_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,104 +370,122 @@ impl BlockchainRpcClient {
mc_seqno: u32,
output: &mut (dyn Write + Send),
) -> Result<usize, Error> {
const CHUNK_SIZE: u32 = 5 << 20; // 5 MB
const CHUNK_SIZE: u64 = bytesize::MIB * 15; // 16 is limit of tl_proto
const PARALLEL_REQUESTS: usize = 10;

// TODO: Iterate through all known (or unknown) neighbours
const NEIGHBOUR_COUNT: usize = 10;
let neighbours = self
.overlay_client()
.neighbours()
.choose_multiple(NEIGHBOUR_COUNT)
.await;
let (neighbour, archive_id, archive_size) =
find_peer_with_archive(self.overlay_client().clone(), mc_seqno).await?;

// Find a neighbour which has the requested archive
let (neighbour, archive_id, archive_size) = 'info: {
let req = Request::from_tl(rpc::GetArchiveInfo { mc_seqno });
tracing::info!(peer_id = %neighbour.peer_id(), archive_id, "archive found. {archive_size} bytes");

let mut futures = FuturesUnordered::new();
for neighbour in neighbours {
futures.push(self.overlay_client().query_raw(neighbour, req.clone()));
}
let mut verifier = ArchiveVerifier::default();

let mut err = None;
while let Some(info) = futures.next().await {
let (handle, info) = match info {
Ok(res) => res.split(),
Err(e) => {
err = Some(e);
continue;
}
};
tracing::debug!("size {}, chunk size {}", archive_size, CHUNK_SIZE);
let mut stream = futures_util::stream::iter(
(0..(archive_size / CHUNK_SIZE)).step_by(CHUNK_SIZE as usize),
)
.inspect(|offset| tracing::debug!("downloading archive chunk at offset {}", offset))
.map(|offset| {
Request::from_tl(rpc::GetArchiveSlice {
archive_id,
offset,
limit: CHUNK_SIZE as u32,
})
})
.map(|req| {
let neighbour = neighbour.clone();
let overlay_client = self.overlay_client().clone();
JoinTask::new(download_archive_inner(req, overlay_client, neighbour))
})
.buffered(PARALLEL_REQUESTS);

let mut downloaded: u64 = 0;

let mut stream = std::pin::pin!(stream);
while let Some(chunk) = stream.next().await {
downloaded += chunk.len() as u64;

tracing::debug!(
downloaded = %bytesize::ByteSize::b(downloaded),
"got archive chunk"
);
verifier.write_verify(&chunk).map_err(|e| {
Error::Internal(anyhow::anyhow!("Received invalid archive chunk: {e}"))
})?;

output.write_all(&chunk).map_err(|e| {
Error::Internal(anyhow::anyhow!("Failed to write archive chunk: {e}"))
})?;
}

match info {
ArchiveInfo::Found { id, size } => break 'info (handle.accept(), id, size),
ArchiveInfo::NotFound => continue,
}
}
verifier
.final_check()
.map_err(|e| Error::Internal(anyhow::anyhow!("Received invalid archive: {e}")))?;
output
.flush()
.map_err(|e| Error::Internal(anyhow::anyhow!("Failed to flush archive: {e}")))?;

return match err {
None => Err(Error::Internal(anyhow::anyhow!(
"no neighbour has the requested archive",
))),
Some(err) => Err(err),
};
};
Ok(downloaded as usize)
}
}

tracing::debug!(peer_id = %neighbour.peer_id(), archive_id, "archive found. {archive_size} bytes");
async fn find_peer_with_archive(
client: PublicOverlayClient,
mc_seqno: u32,
) -> Result<(Neighbour, u64, u64), Error> {
// TODO: Iterate through all known (or unknown) neighbours
const NEIGHBOUR_COUNT: usize = 10;
let neighbours = client.neighbours().choose_multiple(NEIGHBOUR_COUNT).await;

let mut verifier = ArchiveVerifier::default();
let req = Request::from_tl(rpc::GetArchiveInfo { mc_seqno });

// TODO: add retry count to interrupt infinite loop
let mut offset = 0;
loop {
let req = Request::from_tl(rpc::GetArchiveSlice {
archive_id,
offset,
limit: CHUNK_SIZE,
});
let mut futures = FuturesUnordered::new();
for neighbour in neighbours {
futures.push(client.query_raw(neighbour, req.clone()));
}

let res = self
.overlay_client()
.query_raw::<Data>(neighbour.clone(), req)
.await;

match res {
Ok(res) => {
let chunk = &res.data().data;

tracing::debug!(
downloaded = %bytesize::ByteSize::b(offset + chunk.len() as u64),
"got archive chunk"
);

verifier.write_verify(chunk).map_err(|e| {
Error::Internal(anyhow::anyhow!("Received invalid archive chunk: {e}"))
})?;

let is_last = chunk.len() < CHUNK_SIZE as usize;
if is_last {
verifier.final_check().map_err(|e| {
Error::Internal(anyhow::anyhow!("Received invalid archive: {e}"))
})?;
}
let mut err = None;
while let Some(info) = futures.next().await {
let (handle, info) = match info {
Ok(res) => res.split(),
Err(e) => {
err = Some(e);
continue;
}
};

output.write_all(chunk).map_err(|e| {
Error::Internal(anyhow::anyhow!("Failed to write archive chunk: {e}"))
})?;
match info {
ArchiveInfo::Found { id, size } => return Ok((handle.accept(), id, size)),
ArchiveInfo::NotFound => continue,
}
}

offset += chunk.len() as u64;
match err {
None => Err(Error::Internal(anyhow::anyhow!(
"no neighbour has the requested archive",
))),
Some(err) => Err(err),
}
}

if is_last {
return Ok(offset as usize);
}
}
Err(e) => {
tracing::error!(
archive_id,
offset,
"Failed to download archive slice: {e:?}",
);
}
async fn download_archive_inner(
req: Request,
overlay_client: PublicOverlayClient,
neighbour: Neighbour,
) -> Bytes {
// todo: backoff + peer change after some time
// todo: maybe download from multiple peers?
loop {
let res = overlay_client
.query_raw::<Data>(neighbour.clone(), req.clone())
.await;
match res {
Ok(arch) => {
let (_, data) = arch.accept();
return data.data;
}
Err(e) => {
tracing::error!("Failed to download archive slice: {e}");
}
}
}
Expand Down

0 comments on commit 389b27b

Please sign in to comment.