diff --git a/src/server.rs b/src/server.rs index 534b3c92a..af04c404d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,7 @@ use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::Compression; use futures::stream::StreamExt; use futures::TryStreamExt; +use hyper::body::Bytes; use hyper::header::HeaderValue; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, StatusCode}; @@ -15,10 +16,9 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::SystemTime; use tokio::fs::File; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncReadExt, AsyncWrite}; use tokio::{fs, io}; use tokio_util::codec::{BytesCodec, FramedRead}; -use tokio_util::io::ReaderStream; use tokio_util::io::StreamReader; type Request = hyper::Request; @@ -35,6 +35,7 @@ macro_rules! status_code { const INDEX_HTML: &str = include_str!("index.html"); const INDEX_CSS: &str = include_str!("index.css"); +const BUF_SIZE: usize = 1024 * 16; pub async fn serve(args: Args) -> BoxResult<()> { let address = args.address()?; @@ -187,10 +188,27 @@ impl InnerService { } async fn handle_send_dir_zip(&self, path: &Path) -> BoxResult { - let (mut writer, reader) = tokio::io::duplex(1024 * 1024 * 20); - dir_zip(&mut writer, path).await?; - let stream = ReaderStream::new(reader); - let body = Body::wrap_stream(stream); + let (mut tx, body) = Body::channel(); + let (mut writer, mut reader) = tokio::io::duplex(BUF_SIZE); + let path = path.to_owned(); + tokio::spawn(async move { + if let Err(e) = dir_zip(&mut writer, &path).await { + error!("Fail to zip {}, {}", path.display(), e.to_string()); + } + }); + tokio::spawn(async move { + // Reuse this buffer + let mut buf = [0_u8; BUF_SIZE]; + loop { + let n = reader.read(&mut buf).await.unwrap(); + if n == 0 { + break; + } + if (tx.send_data(Bytes::from(buf[..n].to_vec())).await).is_err() { + break; + } + } + }); Ok(Response::new(body)) }