Skip to content

Commit

Permalink
Fix that there are no other competing changes to the value of the sam…
Browse files Browse the repository at this point in the history
…e key in DashMap (#587)

fix: ensure that there are no other competing changes to the value of the same key in DashMap

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jul 12, 2024
1 parent 0c9af2f commit 38c3fad
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 13 deletions.
2 changes: 1 addition & 1 deletion dragonfly-client/src/bin/dfdaemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async fn main() -> Result<(), anyhow::Error> {
info!("proxy server exited");
},

_ = tokio::spawn(async move { manager_announcer.run().await }) => {
_ = tokio::spawn(async move { manager_announcer.run().await.unwrap_or_else(|err| error!("announcer manager failed: {}", err))}) => {
info!("announcer manager exited");
},

Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl DfdaemonDownloadServer {
let uds = UnixListener::bind(&self.socket_path).unwrap();
let uds_stream = UnixListenerStream::new(uds);
Server::builder()
.max_frame_size(super::MAX_FRAME_SIZE)
.concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.add_service(reflection.clone())
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl DfdaemonUploadServer {
// Start upload grpc server.
info!("upload server listening on {}", self.addr);
Server::builder()
.max_frame_size(super::MAX_FRAME_SIZE)
.concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.add_service(reflection.clone())
Expand Down
5 changes: 4 additions & 1 deletion dragonfly-client/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);

// CONCURRENCY_LIMIT_PER_CONNECTION is the limit of concurrency for each connection.
pub const CONCURRENCY_LIMIT_PER_CONNECTION: usize = 1048;
pub const CONCURRENCY_LIMIT_PER_CONNECTION: usize = 4096;

// TCP_KEEPALIVE is the keepalive duration for TCP connection.
pub const TCP_KEEPALIVE: Duration = Duration::from_secs(60);

// MAX_FRAME_SIZE is the max frame size for GRPC, default is 12MB.
pub const MAX_FRAME_SIZE: u32 = 12 * 1024 * 1024;

// prefetch_task prefetches the task if prefetch flag is true.
pub async fn prefetch_task(
socket_path: PathBuf,
Expand Down
16 changes: 5 additions & 11 deletions dragonfly-client/src/resource/piece_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,7 @@ impl PieceCollector {
interested_pieces: Vec<metadata::Piece>,
parents: Vec<Peer>,
) -> Self {
// Initialize collected_pieces.
let collected_pieces = Arc::new(DashMap::new());
interested_pieces
.clone()
.into_iter()
.for_each(|interested_piece| {
collected_pieces.insert(interested_piece.number, DashSet::new());
});
let collected_pieces = Arc::new(DashMap::with_capacity(interested_pieces.len()));

Self {
config,
Expand Down Expand Up @@ -189,9 +182,10 @@ impl PieceCollector {
out_stream.try_next().await.or_err(ErrorType::StreamError)?
{
let message = message?;
collected_pieces.entry(message.number).and_modify(|peers| {
peers.insert(parent.id.clone());
});
collected_pieces
.entry(message.number)
.or_insert_with(DashSet::new)
.insert(parent.id.clone());
info!(
"received piece {}-{} metadata from parent {}",
task_id, message.number, parent.id
Expand Down

0 comments on commit 38c3fad

Please sign in to comment.