Skip to content

Commit

Permalink
feat: smooth progress bar for backup transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Oct 5, 2024
1 parent 12f955c commit bc15715
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ once_cell = { workspace = true }
parking_lot = "0.12"
percent-encoding = "2.3"
pgp = { version = "0.13.2", default-features = false }
pin-project = "1"
qrcodegen = "1.7.0"
quick-xml = "0.36"
quoted_printable = "0.5"
Expand Down
4 changes: 0 additions & 4 deletions src/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,6 @@ impl<'a> BlobDirContents<'a> {
pub(crate) fn iter(&self) -> BlobDirIter<'_> {
BlobDirIter::new(self.context, self.inner.iter())
}

pub(crate) fn len(&self) -> usize {
self.inner.len()
}
}

/// A iterator over all the [`BlobObject`]s in the blobdir.
Expand Down
168 changes: 140 additions & 28 deletions src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::pin::Pin;

use ::pgp::types::KeyTrait;
use anyhow::{bail, ensure, format_err, Context as _, Result};
use futures::TryStreamExt;
use futures_lite::FutureExt;
use pin_project::pin_project;

use tokio::fs::{self, File};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_tar::Archive;

use crate::blob::BlobDirContents;
Expand Down Expand Up @@ -212,7 +215,7 @@ async fn imex_inner(
path.display()
);
ensure!(context.sql.is_open().await, "Database not opened.");
context.emit_event(EventType::ImexProgress(10));
context.emit_event(EventType::ImexProgress(1));

if what == ImexMode::ExportBackup || what == ImexMode::ExportSelfKeys {
// before we export anything, make sure the private key exists
Expand Down Expand Up @@ -294,42 +297,81 @@ pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
.0
}

#[pin_project]
struct ProgressReader<R> {
#[pin]
inner: R,

#[pin]
read: usize,

#[pin]
file_size: usize,

#[pin]
last_progress: usize,

#[pin]
context: Context,
}

impl<R> ProgressReader<R> {
fn new(r: R, context: Context, file_size: u64) -> Self {
Self {
inner: r,
read: 0,
file_size: file_size as usize,
last_progress: 1,
context,
}
}
}

impl<R> AsyncRead for ProgressReader<R>
where
R: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let mut this = self.project();
let before = buf.filled().len();
let res = this.inner.poll_read(cx, buf);
if let std::task::Poll::Ready(Ok(())) = res {
*this.read = this.read.saturating_add(buf.filled().len() - before);

let progress = std::cmp::min(1000 * *this.read / *this.file_size, 999);
if progress > *this.last_progress {
this.context.emit_event(EventType::ImexProgress(progress));
*this.last_progress = progress;
}
}
res
}
}

async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>(
context: &Context,
backup_file: R,
file_size: u64,
passphrase: String,
) -> (Result<()>,) {
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
let mut archive = Archive::new(backup_file);

let mut entries = match archive.entries() {
Ok(entries) => entries,
Err(e) => return (Err(e).context("Failed to get archive entries"),),
};
let mut blobs = Vec::new();
// We already emitted ImexProgress(10) above
let mut last_progress = 10;
const PROGRESS_MIGRATIONS: u128 = 999;
let mut total_size: u64 = 0;
let mut res: Result<()> = loop {
let mut f = match entries.try_next().await {
Ok(Some(f)) => f,
Ok(None) => break Ok(()),
Err(e) => break Err(e).context("Failed to get next entry"),
};
total_size += match f.header().entry_size() {
Ok(size) => size,
Err(e) => break Err(e).context("Failed to get entry size"),
};
let max = PROGRESS_MIGRATIONS - 1;
let progress = std::cmp::min(
max * u128::from(total_size) / std::cmp::max(u128::from(file_size), 1),
max,
);
if progress > last_progress {
context.emit_event(EventType::ImexProgress(progress as usize));
last_progress = progress;
}

let path = match f.path() {
Ok(path) => path.to_path_buf(),
Expand Down Expand Up @@ -379,7 +421,7 @@ async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>(
.log_err(context)
.ok();
if res.is_ok() {
context.emit_event(EventType::ImexProgress(PROGRESS_MIGRATIONS as usize));
context.emit_event(EventType::ImexProgress(999));
res = context.sql.run_migrations(context).await;
}
if res.is_ok() {
Expand Down Expand Up @@ -452,41 +494,111 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res

let file = File::create(&temp_path).await?;
let blobdir = BlobDirContents::new(context).await?;
export_backup_stream(context, &temp_db_path, blobdir, file)

let mut file_size = 0;
file_size += temp_db_path.metadata()?.len();
for blob in blobdir.iter() {
file_size += blob.to_abs_path().metadata()?.len()
}

export_backup_stream(context, &temp_db_path, blobdir, file, file_size)
.await
.context("Exporting backup to file failed")?;
fs::rename(temp_path, &dest_path).await?;
context.emit_event(EventType::ImexFileWritten(dest_path));
Ok(())
}

#[pin_project]
struct ProgressWriter<W> {
#[pin]
inner: W,

#[pin]
wrote: usize,

#[pin]
file_size: usize,

#[pin]
last_progress: usize,

#[pin]
context: Context,
}

impl<W> ProgressWriter<W> {
fn new(w: W, context: Context, file_size: u64) -> Self {
Self {
inner: w,
wrote: 0,
file_size: file_size as usize,
last_progress: 1,
context,
}
}
}

impl<W> AsyncWrite for ProgressWriter<W>
where
W: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
let mut this = self.project();
let res = this.inner.poll_write(cx, buf);
if let std::task::Poll::Ready(Ok(wrote)) = res {
*this.wrote = this.wrote.saturating_add(wrote);

let progress = std::cmp::min(1000 * *this.wrote / *this.file_size, 999);
if progress > *this.last_progress {
this.context.emit_event(EventType::ImexProgress(progress));
*this.last_progress = progress;
}
}
res
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().inner.poll_flush(cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().inner.poll_shutdown(cx)
}
}

/// Exports the database and blobs into a stream.
pub(crate) async fn export_backup_stream<'a, W>(
context: &'a Context,
temp_db_path: &Path,
blobdir: BlobDirContents<'a>,
writer: W,
file_size: u64,
) -> Result<()>
where
W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static,
{
let writer = ProgressWriter::new(writer, context.clone(), file_size);
let mut builder = tokio_tar::Builder::new(writer);

builder
.append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME)
.await?;

let mut last_progress = 10;

for (i, blob) in blobdir.iter().enumerate() {
for blob in blobdir.iter() {
let mut file = File::open(blob.to_abs_path()).await?;
let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(blob.as_name());
builder.append_file(path_in_archive, &mut file).await?;
let progress = std::cmp::min(1000 * i / blobdir.len(), 999);
if progress > last_progress {
context.emit_event(EventType::ImexProgress(progress));
last_progress = progress;
}
}

builder.finish().await?;
Expand Down
4 changes: 2 additions & 2 deletions src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl BackupProvider {
export_database(context, &dbfile, passphrase, time())
.await
.context("Database export failed")?;
context.emit_event(EventType::ImexProgress(300));
context.emit_event(EventType::ImexProgress(1));

let drop_token = CancellationToken::new();
let handle = {
Expand Down Expand Up @@ -190,7 +190,7 @@ impl BackupProvider {

send_stream.write_all(&file_size.to_be_bytes()).await?;

export_backup_stream(&context, &dbfile, blobdir, send_stream)
export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
.await
.context("Failed to write backup into QUIC stream")?;
info!(context, "Finished writing backup into QUIC stream.");
Expand Down

0 comments on commit bc15715

Please sign in to comment.