Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent directory download #68

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ tabwriter = "1.2.1"
tar = "0.4.38"
tempfile = "3.3.0"
tokio = { version = "1.23.0", features = ["full"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7.8", features = ["io", "compat"] }
2 changes: 2 additions & 0 deletions src/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod copy;
pub mod delete;
pub mod download;
pub mod export;
mod ext;
pub mod generate_ids;
pub mod import;
pub mod info;
Expand All @@ -16,6 +17,7 @@ pub use copy::copy;
pub use delete::delete;
pub use download::download;
pub use export::export;
pub use ext::FileExtension;
pub use generate_ids::generate_ids;
pub use import::import;
pub use info::info;
Expand Down
157 changes: 133 additions & 24 deletions src/files/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ use crate::common::drive_file;
use crate::common::file_tree_drive;
use crate::common::file_tree_drive::FileTreeDrive;
use crate::common::hub_helper;
use crate::common::md5_writer::Md5Writer;

use crate::files;
use crate::files::list;
use crate::files::list::ListQuery;
use crate::files::FileExtension;
use crate::hub::Hub;
use async_recursion::async_recursion;

use futures::stream;
use futures::stream::StreamExt;

use futures::TryStreamExt;
use google_drive3::hyper;
use human_bytes::human_bytes;
use std::error;
Expand All @@ -18,13 +24,22 @@ use std::io;
use std::io::BufReader;
use std::io::Read;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;

use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::InspectReader;

use super::list::list_files;

type GFile = google_drive3::api::File;

pub struct Config {
pub file_id: String,
pub existing_file_action: ExistingFileAction,
pub follow_shortcuts: bool,
pub download_directories: bool,
pub parallelisme: usize,
pub destination: Destination,
}

Expand Down Expand Up @@ -71,7 +86,84 @@ pub enum ExistingFileAction {
Overwrite,
}

#[async_recursion]
pub async fn _download_file(
hub: &Hub,
file_path: impl AsRef<Path>,
file: &GFile,
) -> Result<(), Error> {
let file_id = file.id.as_ref().ok_or_else(|| Error::MissingFileName)?;
let body = download_file(&hub, file_id.as_str())
.await
.map_err(Error::DownloadFile)?;

let file_path = file_path.as_ref();

println!("Downloading file '{}'", file_path.display());
save_body_to_file(body, &file_path, None).await?;

Ok(())
}

pub async fn _download_dir(hub: &Hub, file: GFile, config: &Config) -> Result<(), Error> {
let root_path = config.canonical_destination_root()?;
let file_name = file.name.as_ref().ok_or_else(|| Error::MissingFileName)?;
let path = root_path.join(file_name.as_str());

stream::unfold(vec![(path, file)], |mut to_visit| async {
let (path, file) = to_visit.pop()?;
let file_id = file.id.as_ref()?;
let files = list_files(
&hub,
&list::ListFilesConfig {
query: ListQuery::FilesInFolder {
folder_id: file_id.clone(),
},
order_by: Default::default(),
max_files: usize::MAX,
},
)
.await;

let file_stream = match files {
Ok(files) => {
let (dirs, others): (Vec<_>, Vec<_>) =
files.into_iter().partition(|f| f.is_directory()); // TODO: drain filter
to_visit.extend(
dirs.into_iter()
.filter_map(|file| Some((path.join(file.name.as_ref()?), file))),
);
stream::iter(
others
.into_iter()
.filter_map(move |file| Some((path.join(file.name.as_ref()?), file))),
)
.map(Ok)
.left_stream()
}
Err(err) => stream::once(async {
Err(Error::CreateFileTree(file_tree_drive::Error::ListFiles(
err,
)))
})
.right_stream(),
};

Some((file_stream, to_visit))
})
.flatten()
.map(|file| async move {
match file {
Ok((path, file)) => _download_file(&hub, &path, &file).await,
Err(_err) => Err(Error::MissingFileName), // TODO: fix error
}
})
.buffer_unordered(config.parallelisme)
.collect::<Vec<_>>()
.await;

Ok(())
}

pub async fn download(config: Config) -> Result<(), Error> {
let hub = hub_helper::get_hub().await.map_err(Error::Hub)?;

Expand All @@ -84,19 +176,19 @@ pub async fn download(config: Config) -> Result<(), Error> {
err_if_shortcut(&file, &config)?;

if drive_file::is_shortcut(&file) {
let target_file_id = file.shortcut_details.and_then(|details| details.target_id);
// let target_file_id = file.shortcut_details.and_then(|details| details.target_id);

err_if_shortcut_target_is_missing(&target_file_id)?;
// err_if_shortcut_target_is_missing(&target_file_id)?;

download(Config {
file_id: target_file_id.unwrap_or_default(),
..config
})
.await?;
// download(Config {
// file_id: target_file_id.unwrap_or_default(),
// ..config
// })
// .await?;
} else if drive_file::is_directory(&file) {
download_directory(&hub, &file, &config).await?;
_download_dir(&hub, file, &config).await?;
} else {
download_regular(&hub, &file, &config).await?;
// download_regular(&hub, &file, &config).await?;
}

Ok(())
Expand Down Expand Up @@ -294,28 +386,45 @@ impl Display for Error {

// TODO: move to common
pub async fn save_body_to_file(
mut body: hyper::Body,
file_path: &PathBuf,
body: hyper::Body,
file_path: impl AsRef<Path>,
expected_md5: Option<String>,
) -> Result<(), Error> {
let file_path = file_path.as_ref();
// Create temporary file

tokio::fs::create_dir_all(file_path.parent().unwrap())
.await
.map_err(|err| Error::CreateDirectory(file_path.to_path_buf(), err))?;

let tmp_file_path = file_path.with_extension("incomplete");
let file = File::create(&tmp_file_path).map_err(Error::CreateFile)?;
let mut file = tokio::fs::File::create(&tmp_file_path)
.await
.map_err(Error::CreateFile)?;

// Wrap file in writer that calculates md5
let mut writer = Md5Writer::new(file);
let mut md5 = md5::Context::new();

// Read chunks from stream and write to file
while let Some(chunk_result) = body.next().await {
let chunk = chunk_result.map_err(Error::ReadChunk)?;
writer.write_all(&chunk).map_err(Error::WriteChunk)?;
}
let body = body
.into_stream()
.map(|result| {
result.map_err(|_error| std::io::Error::new(std::io::ErrorKind::Other, "Error!"))
})
.into_async_read()
.compat();

let mut body = InspectReader::new(body, |bytes| md5.consume(&bytes));

tokio::io::copy(&mut body, &mut file)
.await
.map_err(|err| Error::WriteChunk(err))?;

// Check md5
err_if_md5_mismatch(expected_md5, writer.md5())?;
err_if_md5_mismatch(expected_md5, format!("{:x}", md5.compute()))?;

// Rename temporary file to final file
fs::rename(&tmp_file_path, &file_path).map_err(Error::RenameFile)
tokio::fs::rename(&tmp_file_path, &file_path)
.await
.map_err(Error::RenameFile)
}

// TODO: move to common
Expand Down
21 changes: 21 additions & 0 deletions src/files/ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::common::drive_file::{MIME_TYPE_DRIVE_FOLDER, MIME_TYPE_DRIVE_SHORTCUT};

pub trait FileExtension {
fn is_directory(&self) -> bool;
fn is_binary(&self) -> bool;
fn is_shortcut(&self) -> bool;
}

impl FileExtension for google_drive3::api::File {
fn is_directory(&self) -> bool {
self.mime_type == Some(String::from(MIME_TYPE_DRIVE_FOLDER))
}

fn is_binary(&self) -> bool {
self.md5_checksum != None
}

fn is_shortcut(&self) -> bool {
self.mime_type == Some(String::from(MIME_TYPE_DRIVE_SHORTCUT))
}
}
1 change: 1 addition & 0 deletions src/files/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub async fn list(config: Config) -> Result<(), Error> {
Ok(())
}

#[derive(Default)]
pub struct ListFilesConfig {
pub query: ListQuery,
pub order_by: ListSortOrder,
Expand Down
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ enum FileCommand {
#[arg(long)]
recursive: bool,

#[arg(long)]
concurent: usize,

/// Path where the file/directory should be downloaded to
#[arg(long, value_name = "PATH")]
destination: Option<PathBuf>,
Expand Down Expand Up @@ -498,6 +501,7 @@ async fn main() {
recursive,
destination,
stdout,
concurent,
} => {
let existing_file_action = if overwrite {
files::download::ExistingFileAction::Overwrite
Expand All @@ -519,6 +523,7 @@ async fn main() {
follow_shortcuts,
download_directories: recursive,
destination: dst,
parallelisme: concurent,
})
.await
.unwrap_or_else(handle_error)
Expand Down