Skip to content

Commit 9568b75

Browse files
committed
Fix memory leak in downloading JoinSet
1 parent b2013f5 commit 9568b75

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

crates/tako/src/internal/datasrv/download.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::connection::Connection;
22
use crate::datasrv::DataObjectId;
3-
use crate::internal::datasrv::DataObjectRef;
43
use crate::internal::datasrv::messages::{
54
DataDown, FromDataClientMessage, ToDataClientMessageDown,
65
};
76
use crate::internal::datasrv::utils::DataObjectComposer;
7+
use crate::internal::datasrv::DataObjectRef;
88
use crate::{Map, WrappedRcRefCell};
99
use orion::kex::SecretKey;
1010
use priority_queue::PriorityQueue;
@@ -13,8 +13,8 @@ use std::rc::Rc;
1313
use std::sync::Arc;
1414
use std::time::Duration;
1515
use tokio::net::TcpStream;
16-
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, oneshot};
17-
use tokio::task::{AbortHandle, JoinSet, spawn_local};
16+
use tokio::sync::{oneshot, Notify, OwnedSemaphorePermit, Semaphore};
17+
use tokio::task::{spawn_local, AbortHandle, JoinSet};
1818
use tokio::time::Instant;
1919

2020
const PROTOCOL_VERSION: u32 = 0;
@@ -304,29 +304,28 @@ pub(crate) async fn download_manager_process<
304304
let mut join_set = JoinSet::new();
305305
let semaphore = Arc::new(Semaphore::new(max_parallel_downloads as usize));
306306
loop {
307-
{
308-
let permit = semaphore.clone().acquire_owned().await.unwrap();
309-
let is_empty = {
310-
let mut dm = dm_ref.get_mut();
311-
if let Some((data_id, _)) = dm.download_queue.pop() {
312-
let abort = join_set.spawn_local(download_process(
313-
dm_ref.clone(),
314-
data_id,
315-
permit,
316-
max_download_tries,
317-
wait_between_download_tries,
318-
));
319-
let info = dm.download_info.get_mut(&data_id).unwrap();
320-
assert!(info.abort_handle.is_none());
321-
info.abort_handle = Some(abort);
322-
dm.download_queue.is_empty()
323-
} else {
324-
true
325-
}
326-
};
327-
if is_empty {
328-
notify.notified().await;
307+
let permit = semaphore.clone().acquire_owned().await.unwrap();
308+
let is_empty = {
309+
let mut dm = dm_ref.get_mut();
310+
if let Some((data_id, _)) = dm.download_queue.pop() {
311+
let abort = join_set.spawn_local(download_process(
312+
dm_ref.clone(),
313+
data_id,
314+
permit,
315+
max_download_tries,
316+
wait_between_download_tries,
317+
));
318+
let info = dm.download_info.get_mut(&data_id).unwrap();
319+
assert!(info.abort_handle.is_none());
320+
info.abort_handle = Some(abort);
321+
dm.download_queue.is_empty()
322+
} else {
323+
true
329324
}
330325
};
326+
if is_empty {
327+
notify.notified().await;
328+
}
329+
while let Some(_) = join_set.try_join_next() { /* Do nothing */ }
331330
}
332331
}

0 commit comments

Comments
 (0)