From 38c3fad7bcab09b7651704b744a655c6d32c00cf Mon Sep 17 00:00:00 2001 From: Gaius Date: Sat, 13 Jul 2024 02:32:02 +0800 Subject: [PATCH] Fix that there are no other competing changes to the value of the same key in DashMap (#587) fix: ensure that there are no other competing changes to the value of the same key in DashMap Signed-off-by: Gaius --- dragonfly-client/src/bin/dfdaemon/main.rs | 2 +- dragonfly-client/src/grpc/dfdaemon_download.rs | 1 + dragonfly-client/src/grpc/dfdaemon_upload.rs | 1 + dragonfly-client/src/grpc/mod.rs | 5 ++++- dragonfly-client/src/resource/piece_collector.rs | 16 +++++----------- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index 41f9ba24..95ec3043 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -310,7 +310,7 @@ async fn main() -> Result<(), anyhow::Error> { info!("proxy server exited"); }, - _ = tokio::spawn(async move { manager_announcer.run().await }) => { + _ = tokio::spawn(async move { manager_announcer.run().await.unwrap_or_else(|err| error!("announcer manager failed: {}", err))}) => { info!("announcer manager exited"); }, diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index af647150..aeb15a36 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -131,6 +131,7 @@ impl DfdaemonDownloadServer { let uds = UnixListener::bind(&self.socket_path).unwrap(); let uds_stream = UnixListenerStream::new(uds); Server::builder() + .max_frame_size(super::MAX_FRAME_SIZE) .concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION) .tcp_keepalive(Some(super::TCP_KEEPALIVE)) .add_service(reflection.clone()) diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index 44f3b784..dfe401d6 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -119,6 +119,7 @@ impl DfdaemonUploadServer { // Start upload grpc server. info!("upload server listening on {}", self.addr); Server::builder() + .max_frame_size(super::MAX_FRAME_SIZE) .concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION) .tcp_keepalive(Some(super::TCP_KEEPALIVE)) .add_service(reflection.clone()) diff --git a/dragonfly-client/src/grpc/mod.rs b/dragonfly-client/src/grpc/mod.rs index e2e9026c..9f5023b7 100644 --- a/dragonfly-client/src/grpc/mod.rs +++ b/dragonfly-client/src/grpc/mod.rs @@ -38,11 +38,14 @@ pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(1); pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); // CONCURRENCY_LIMIT_PER_CONNECTION is the limit of concurrency for each connection. -pub const CONCURRENCY_LIMIT_PER_CONNECTION: usize = 1048; +pub const CONCURRENCY_LIMIT_PER_CONNECTION: usize = 4096; // TCP_KEEPALIVE is the keepalive duration for TCP connection. pub const TCP_KEEPALIVE: Duration = Duration::from_secs(60); +// MAX_FRAME_SIZE is the max frame size for GRPC, default is 12MB. +pub const MAX_FRAME_SIZE: u32 = 12 * 1024 * 1024; + // prefetch_task prefetches the task if prefetch flag is true. pub async fn prefetch_task( socket_path: PathBuf, diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index e1306599..d54c8007 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -71,14 +71,7 @@ impl PieceCollector { interested_pieces: Vec, parents: Vec, ) -> Self { - // Initialize collected_pieces. - let collected_pieces = Arc::new(DashMap::new()); - interested_pieces - .clone() - .into_iter() - .for_each(|interested_piece| { - collected_pieces.insert(interested_piece.number, DashSet::new()); - }); + let collected_pieces = Arc::new(DashMap::with_capacity(interested_pieces.len())); Self { config, @@ -189,9 +182,10 @@ impl PieceCollector { out_stream.try_next().await.or_err(ErrorType::StreamError)? { let message = message?; - collected_pieces.entry(message.number).and_modify(|peers| { - peers.insert(parent.id.clone()); - }); + collected_pieces + .entry(message.number) + .or_insert_with(DashSet::new) + .insert(parent.id.clone()); info!( "received piece {}-{} metadata from parent {}", task_id, message.number, parent.id