Skip to content

Commit

Permalink
v0.2.1 use FuturesUnordered
Browse files Browse the repository at this point in the history
  • Loading branch information
Mon-ius committed May 12, 2024
1 parent 6540cd3 commit e5d6795
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 35 deletions.
2 changes: 1 addition & 1 deletion hfd-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ clap = { version= "4.5.4", features=["derive"] }
reqwest = { version = "0.12.4", default-features = false, features = ["stream", "http2", "json", "rustls-tls"] }
tokio = { version = "1.37.0", default-features = false, features = ["rt-multi-thread", "fs"] }
serde_json = { version = "1.0.116" }
tokio-stream = "0.1.15"
futures = "0.3.30"

[[bin]]
name = "hfd"
Expand Down
61 changes: 27 additions & 34 deletions hfd-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::PathBuf;
use reqwest::header::{HeaderMap, AUTHORIZATION, CONTENT_RANGE, RANGE, USER_AGENT};
use tokio::time::Duration;
use tokio::io::{AsyncSeekExt, SeekFrom};
use tokio_stream::StreamExt;
use futures::StreamExt;

const CHUNK_SIZE: usize = 10_000_000;

Expand Down Expand Up @@ -89,7 +89,7 @@ async fn download_chunk(
let client= reqwest::Client::builder()
.default_headers(headers.clone())
.pool_idle_timeout(Duration::from_secs(10))
.http2_keep_alive_timeout(Duration::from_secs(30)).build()?;
.build()?;

let range = format!("bytes={s}-{e}");

Expand All @@ -111,9 +111,12 @@ async fn download(
path: PathBuf,
chunk_size: usize
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let handle = tokio::runtime::Handle::current();
let mut tasks = futures::stream::FuturesUnordered::new();

let client = reqwest::Client::builder()
.default_headers(headers.clone())
.http2_keep_alive_timeout(Duration::from_secs(15)).build()?;
.build()?;

let response = client.get(&url).header(RANGE, "bytes=0-0").send().await?;
let length: usize = response
Expand All @@ -129,20 +132,16 @@ async fn download(
.set_len(length as u64)
.await?;

let tasks: Vec<_> = (0..length)
.into_iter()
.step_by(chunk_size)
.map(|s| {
let _url = url.clone();
let _path = path.clone();
let headers = headers.clone();
let e = std::cmp::min(s + chunk_size - 1, length);
tokio::spawn(async move { download_chunk(&headers, &_url, &_path, s, e).await })
})
.collect();
for s in (0..length).step_by(chunk_size) {
let _url = url.clone();
let _path = path.clone();
let headers = headers.clone();
let e = std::cmp::min(s + chunk_size - 1, length);
tasks.push(handle.spawn(async move { download_chunk(&headers, &_url, &_path, s, e).await }));
}

for task in tasks {
let _ = task.await.unwrap();
while let Some(handle) = tasks.next().await {
handle.unwrap();
}
Ok(())
}
Expand Down Expand Up @@ -181,7 +180,7 @@ impl HfClient {

async fn list_files(&self) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let client = reqwest::Client::builder()
.http2_keep_alive_timeout(Duration::from_secs(15)).build()?;
.build()?;
let api = self.hf_url.api();
let response = client.get(api)
.headers(self.headers.clone())
Expand Down Expand Up @@ -212,26 +211,20 @@ impl HfClient {
}

pub async fn download_all(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut tasks = Vec::new();

let files = self.list_files().await?;
let _ = self.create_dir_all(files.clone());
let file_chunks: Vec<_> = files
.chunks(30)
.map(|chunk| chunk.to_owned())
.collect();

for fc in file_chunks{
let tasks: Vec<_> = fc.into_iter()
.map(|f| {
let url = self.hf_url.path(&f);
let path = self.root.join(&f);
let headers = self.headers.clone();
tokio::spawn(async move {let _ = download(headers, url, path, CHUNK_SIZE).await; })
})
.collect();

for task in tasks {
task.await.unwrap();
}
for file in files{
let url = self.hf_url.path(&file);
let path = self.root.join(&file);
let headers = self.headers.clone();
tasks.push(tokio::spawn(async move { download(headers, url, path, CHUNK_SIZE).await; }));
}

for task in tasks {
task.await.unwrap();
}
Ok(())
}
Expand Down

0 comments on commit e5d6795

Please sign in to comment.