diff --git a/back/main.rs b/back/main.rs index 5e296eb..6e10e1c 100644 --- a/back/main.rs +++ b/back/main.rs @@ -61,6 +61,20 @@ const DROPPA_DOWNLOADS_DIR: &str = "droppa_files"; const HOME_MOBILE_HTML: &[u8] = include_bytes!("../front/index-mobile.html"); const HOME_DESKTOP_HTML: &[u8] = include_bytes!("../front/index-desktop.html"); +#[derive(Debug, Serialize)] +pub struct TrackFile { + pub size: usize, + pub name: String, + pub progress: u8 +} + +pub struct Client { + sender: watch::Sender::, + progress: u8, + mobile: bool, + size: usize, +} + atomic_type! { type Files = Vec::; type SyncProgressSender = Option::>; @@ -75,24 +89,10 @@ atomic_type! { arc.type Clients = DashMap::; } -#[derive(Serialize)] -pub struct TrackFile { +pub struct File { pub size: usize, pub name: String, - pub progress: u8 -} - -pub struct Client { - sender: watch::Sender::, - progress: u8, - mobile: bool, - size: usize, -} - -pub struct File { - size: usize, - name: String, - bytes: Vec:: + pub bytes: Vec:: } impl File { @@ -147,27 +147,26 @@ impl File { bytes.extend_from_slice(&chunk); let progress = (bytes.len() * 100 / size).min(100) as u8; if progress % 5 == 0 { - #[cfg(feature = "dbg")] println!("[INFO {name}] copying chunk.."); - { - let Some(mut ps) = clients.get_mut(name) else { - println!("[ERROR] no: {name} in the clients hashmap, returning an error.."); - return Err(MultipartError::Incomplete) - }; - ps.size = size; - ps.progress = progress; - if let Err(e) = ps.sender.send(progress) { - eprintln!("[ERROR] failed to send progress: {e}"); - } + println!("[INFO {name}] copying chunk.."); + + let Some(mut ps) = clients.get_mut(name) else { + println!("[ERROR] no: {name} in the clients hashmap, returning an error.."); + return Err(MultipartError::Incomplete) + }; + + ps.size = size; + ps.progress = progress; + + if let Err(e) = ps.sender.send(progress) { + eprintln!("[ERROR] failed to send progress: {e}"); } - #[cfg(feature = "dbg")] println!("[INFO {name}] copied chunk, trying to lock the pinger.."); + + println!("[INFO {name}] copied chunk, trying to lock the pinger.."); + if let Ok(pp) = pp.try_lock() { if let Some(pp) = pp.as_ref() { - if pp.try_send(()).is_ok() { - #[cfg(feature = "dbg")] println!("[INFO {name}] pinged successfully.."); - } + _ = pp.try_send(()).ok() } - } else { - #[cfg(feature = "dbg")] println!("[INFO {name}] lock failed.."); } } Ok((bytes, name, clients, pp)) @@ -256,9 +255,9 @@ async fn track_progress(rq: HttpRequest, path: Path::, state: Data::(format!("data: {{ \"progress\": {data} }}\n\n").into()) })) @@ -271,7 +270,7 @@ async fn index(rq: HttpRequest) -> impl Responder { }; HttpResponse::Ok() - .content_type("text/html") + .append_header(("Content-Type", "text/html")) .body(if user_agent_is_mobile(user_agent) {HOME_MOBILE_HTML} else {HOME_DESKTOP_HTML}) } @@ -365,14 +364,16 @@ impl ProgressTracker:: { impl Write for ProgressTracker:: { fn write(&mut self, buf: &[u8]) -> std::io::Result:: { - let written = self.writer.write(buf)?; - self.written += written; + let written_ = self.writer.write(buf)?; + self.written += written_; let p = self.progress(); if p % 5 == 0 { let progress_sender = self.progress_sender.lock().unwrap(); progress_sender.as_ref().map(|ps| ps.try_send(p as _)); - } Ok(written) + } + + Ok(written_) } #[inline(always)] @@ -429,7 +430,6 @@ async fn download_files(state: Data::) -> impl Responder { .body(zip_bytes) } - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // I could've used the `FnOnce` and `FnMut` traits here and called different async closures do to different things, // // but it seems that this feature is really, really underdeveloped yet. // @@ -443,6 +443,8 @@ async fn download_files(state: Data::) -> impl Responder { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// async fn stream_progress(state: Data::, transmission: Transmission) -> impl Responder { + use Transmission::*; + let ptx = watch::channel("[]".to_owned()).0; let streamer = WatchStream::new(ptx.subscribe()); @@ -463,46 +465,50 @@ async fn stream_progress(state: Data::, transmission: Transmission) -> i **progress_streamer = Some(ptx) } - if matches!(transmission, Transmission::Zipping) { - let (tx, mut rx) = mpsc::channel(8); - *state.zipping_progress_sender.lock().unwrap() = Some(tx); - - let state = Data::clone(&state); - actix_rt::spawn(async move { - loop { - if let Ok(progress) = rx.try_recv() { - let streamer = state.zipping_progress_streamer.lock().await; - let streamer = streamer.as_ref().unwrap(); - _ = streamer.send(format!("{{ \"progress\": {progress} }}")); - tokio_sleep(TokioDuration::from_millis(100)).await - } else { - tokio_sleep(TokioDuration::from_millis(150)).await - } - } - }); - } else { - let (tx, mut rx) = mpsc::channel(8); - *state.files_progress_pinger.lock().await = Some(tx); - - let state = Data::clone(&state); - actix_rt::spawn(async move { - loop { - if rx.try_recv().is_err() { - tokio_sleep(TokioDuration::from_millis(150)).await; - continue + match transmission { + Zipping => { + let (tx, mut rx) = mpsc::channel(8); + *state.zipping_progress_sender.lock().unwrap() = Some(tx); + + let state = Data::clone(&state); + actix_rt::spawn(async move { + loop { + if let Ok(progress) = rx.try_recv() { + let streamer = state.zipping_progress_streamer.lock().await; + let streamer = streamer.as_ref().unwrap(); + _ = streamer.send(format!("{{ \"progress\": {progress} }}")); + tokio_sleep(TokioDuration::from_millis(100)).await + } else { + tokio_sleep(TokioDuration::from_millis(150)).await + } } + }) + } + _ => { + let (tx, mut rx) = mpsc::channel(8); + *state.files_progress_pinger.lock().await = Some(tx); + + let state = Data::clone(&state); + actix_rt::spawn(async move { + loop { + if rx.try_recv().is_err() { + tokio_sleep(TokioDuration::from_millis(150)).await; + continue + } - let mobile = matches!(transmission, Transmission::Mobile); - let data = state.clients.iter().filter(|p| p.mobile != mobile).map(|p| { - TrackFile { name: p.key().to_owned(), progress: p.progress, size: p.size } - }).collect::>(); + let mobile = matches!(transmission, Mobile); + let data = state.clients.iter().filter(|p| p.mobile != mobile).map(|p| { + TrackFile { name: p.key().to_owned(), progress: p.progress, size: p.size } + }).collect::>(); - let json = serde_json::to_string(&data).unwrap(); - state.streamer_send(json, transmission).await; - tokio_sleep(TokioDuration::from_millis(100)).await; - } - }); - } + let json = serde_json::to_string(&data).unwrap(); + + state.streamer_send(json, transmission).await; + tokio_sleep(TokioDuration::from_millis(100)).await; + } + }) + } + }; HttpResponse::Ok() .append_header(("Content-Type", "text/event-stream")) @@ -551,9 +557,12 @@ async fn main() -> std::io::Result<()> { downloads_dir: { let mut dir = dirs::download_dir().expect("could not get user's `Downloads` directory"); dir.push(DROPPA_DOWNLOADS_DIR); + if !dir.exists() { fs::create_dir(&dir).expect("could not create `droppa` downloads sub-directory") - } dir + } + + dir }, files: Arc::new(Mutex::new(Vec::new())), @@ -577,9 +586,9 @@ async fn main() -> std::io::Result<()> { .service(index) .service(qr_code) .service(upload_mobile) + .service(upload_desktop) .service(track_progress) .service(download_files) - .service(upload_desktop) .service(zipping_progress) .service(download_files_progress_mobile) .service(download_files_progress_desktop)