Skip to content

Commit

Permalink
Improve readability a bit, fix the bug with the clients HashMap not…
Browse files Browse the repository at this point in the history
… being populated with data if compiling without `dbg` feature
  • Loading branch information
rakivo committed Dec 26, 2024
1 parent eac3eec commit ba8fe05
Showing 1 changed file with 88 additions and 79 deletions.
167 changes: 88 additions & 79 deletions back/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ const DROPPA_DOWNLOADS_DIR: &str = "droppa_files";
const HOME_MOBILE_HTML: &[u8] = include_bytes!("../front/index-mobile.html");
const HOME_DESKTOP_HTML: &[u8] = include_bytes!("../front/index-desktop.html");

#[derive(Debug, Serialize)]
pub struct TrackFile {
pub size: usize,
pub name: String,
pub progress: u8
}

pub struct Client {
sender: watch::Sender::<u8>,
progress: u8,
mobile: bool,
size: usize,
}

atomic_type! {
type Files = Vec::<File>;
type SyncProgressSender = Option::<mpsc::Sender::<u8>>;
Expand All @@ -75,24 +89,10 @@ atomic_type! {
arc.type Clients = DashMap::<String, Client>;
}

#[derive(Serialize)]
pub struct TrackFile {
pub struct File {
pub size: usize,
pub name: String,
pub progress: u8
}

pub struct Client {
sender: watch::Sender::<u8>,
progress: u8,
mobile: bool,
size: usize,
}

pub struct File {
size: usize,
name: String,
bytes: Vec::<u8>
pub bytes: Vec::<u8>
}

impl File {
Expand Down Expand Up @@ -147,27 +147,26 @@ impl File {
bytes.extend_from_slice(&chunk);
let progress = (bytes.len() * 100 / size).min(100) as u8;
if progress % 5 == 0 {
#[cfg(feature = "dbg")] println!("[INFO {name}] copying chunk..");
{
let Some(mut ps) = clients.get_mut(name) else {
println!("[ERROR] no: {name} in the clients hashmap, returning an error..");
return Err(MultipartError::Incomplete)
};
ps.size = size;
ps.progress = progress;
if let Err(e) = ps.sender.send(progress) {
eprintln!("[ERROR] failed to send progress: {e}");
}
println!("[INFO {name}] copying chunk..");

let Some(mut ps) = clients.get_mut(name) else {
println!("[ERROR] no: {name} in the clients hashmap, returning an error..");
return Err(MultipartError::Incomplete)
};

ps.size = size;
ps.progress = progress;

if let Err(e) = ps.sender.send(progress) {
eprintln!("[ERROR] failed to send progress: {e}");
}
#[cfg(feature = "dbg")] println!("[INFO {name}] copied chunk, trying to lock the pinger..");

println!("[INFO {name}] copied chunk, trying to lock the pinger..");

if let Ok(pp) = pp.try_lock() {
if let Some(pp) = pp.as_ref() {
if pp.try_send(()).is_ok() {
#[cfg(feature = "dbg")] println!("[INFO {name}] pinged successfully..");
}
_ = pp.try_send(()).ok()
}
} else {
#[cfg(feature = "dbg")] println!("[INFO {name}] lock failed..");
}
}
Ok((bytes, name, clients, pp))
Expand Down Expand Up @@ -256,9 +255,9 @@ async fn track_progress(rq: HttpRequest, path: Path::<String>, state: Data::<Ser
});

HttpResponse::Ok()
.keep_alive()
.content_type("text/event-stream")
.append_header(("Content-Type", "text/event-stream"))
.append_header(("Cache-Control", "no-cache"))
.append_header(("Connection", "keep-alive"))
.streaming(rx.map(|data| {
Ok::<_, actix_web::Error>(format!("data: {{ \"progress\": {data} }}\n\n").into())
}))
Expand All @@ -271,7 +270,7 @@ async fn index(rq: HttpRequest) -> impl Responder {
};

HttpResponse::Ok()
.content_type("text/html")
.append_header(("Content-Type", "text/html"))
.body(if user_agent_is_mobile(user_agent) {HOME_MOBILE_HTML} else {HOME_DESKTOP_HTML})
}

Expand Down Expand Up @@ -365,14 +364,16 @@ impl<W: Write> ProgressTracker::<W> {

impl<W: Write> Write for ProgressTracker::<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result::<usize> {
let written = self.writer.write(buf)?;
self.written += written;
let written_ = self.writer.write(buf)?;
self.written += written_;

let p = self.progress();
if p % 5 == 0 {
let progress_sender = self.progress_sender.lock().unwrap();
progress_sender.as_ref().map(|ps| ps.try_send(p as _));
} Ok(written)
}

Ok(written_)
}

#[inline(always)]
Expand Down Expand Up @@ -429,7 +430,6 @@ async fn download_files(state: Data::<Server>) -> impl Responder {
.body(zip_bytes)
}


//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// I could've used the `FnOnce` and `FnMut` traits here and called different async closures do to different things, //
// but it seems that this feature is really, really underdeveloped yet. //
Expand All @@ -443,6 +443,8 @@ async fn download_files(state: Data::<Server>) -> impl Responder {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

async fn stream_progress(state: Data::<Server>, transmission: Transmission) -> impl Responder {
use Transmission::*;

let ptx = watch::channel("[]".to_owned()).0;
let streamer = WatchStream::new(ptx.subscribe());

Expand All @@ -463,46 +465,50 @@ async fn stream_progress(state: Data::<Server>, transmission: Transmission) -> i
**progress_streamer = Some(ptx)
}

if matches!(transmission, Transmission::Zipping) {
let (tx, mut rx) = mpsc::channel(8);
*state.zipping_progress_sender.lock().unwrap() = Some(tx);

let state = Data::clone(&state);
actix_rt::spawn(async move {
loop {
if let Ok(progress) = rx.try_recv() {
let streamer = state.zipping_progress_streamer.lock().await;
let streamer = streamer.as_ref().unwrap();
_ = streamer.send(format!("{{ \"progress\": {progress} }}"));
tokio_sleep(TokioDuration::from_millis(100)).await
} else {
tokio_sleep(TokioDuration::from_millis(150)).await
}
}
});
} else {
let (tx, mut rx) = mpsc::channel(8);
*state.files_progress_pinger.lock().await = Some(tx);

let state = Data::clone(&state);
actix_rt::spawn(async move {
loop {
if rx.try_recv().is_err() {
tokio_sleep(TokioDuration::from_millis(150)).await;
continue
match transmission {
Zipping => {
let (tx, mut rx) = mpsc::channel(8);
*state.zipping_progress_sender.lock().unwrap() = Some(tx);

let state = Data::clone(&state);
actix_rt::spawn(async move {
loop {
if let Ok(progress) = rx.try_recv() {
let streamer = state.zipping_progress_streamer.lock().await;
let streamer = streamer.as_ref().unwrap();
_ = streamer.send(format!("{{ \"progress\": {progress} }}"));
tokio_sleep(TokioDuration::from_millis(100)).await
} else {
tokio_sleep(TokioDuration::from_millis(150)).await
}
}
})
}
_ => {
let (tx, mut rx) = mpsc::channel(8);
*state.files_progress_pinger.lock().await = Some(tx);

let state = Data::clone(&state);
actix_rt::spawn(async move {
loop {
if rx.try_recv().is_err() {
tokio_sleep(TokioDuration::from_millis(150)).await;
continue
}

let mobile = matches!(transmission, Transmission::Mobile);
let data = state.clients.iter().filter(|p| p.mobile != mobile).map(|p| {
TrackFile { name: p.key().to_owned(), progress: p.progress, size: p.size }
}).collect::<Vec::<_>>();
let mobile = matches!(transmission, Mobile);
let data = state.clients.iter().filter(|p| p.mobile != mobile).map(|p| {
TrackFile { name: p.key().to_owned(), progress: p.progress, size: p.size }
}).collect::<Vec::<_>>();

let json = serde_json::to_string(&data).unwrap();
state.streamer_send(json, transmission).await;
tokio_sleep(TokioDuration::from_millis(100)).await;
}
});
}
let json = serde_json::to_string(&data).unwrap();

state.streamer_send(json, transmission).await;
tokio_sleep(TokioDuration::from_millis(100)).await;
}
})
}
};

HttpResponse::Ok()
.append_header(("Content-Type", "text/event-stream"))
Expand Down Expand Up @@ -551,9 +557,12 @@ async fn main() -> std::io::Result<()> {
downloads_dir: {
let mut dir = dirs::download_dir().expect("could not get user's `Downloads` directory");
dir.push(DROPPA_DOWNLOADS_DIR);

if !dir.exists() {
fs::create_dir(&dir).expect("could not create `droppa` downloads sub-directory")
} dir
}

dir
},

files: Arc::new(Mutex::new(Vec::new())),
Expand All @@ -577,9 +586,9 @@ async fn main() -> std::io::Result<()> {
.service(index)
.service(qr_code)
.service(upload_mobile)
.service(upload_desktop)
.service(track_progress)
.service(download_files)
.service(upload_desktop)
.service(zipping_progress)
.service(download_files_progress_mobile)
.service(download_files_progress_desktop)
Expand Down

0 comments on commit ba8fe05

Please sign in to comment.