Skip to content

Commit

Permalink
Merge pull request #2906 from subspace/farming-cluser-improvements
Browse files Browse the repository at this point in the history
Farming cluster improvements
  • Loading branch information
nazar-pc authored Jul 10, 2024
2 parents b8367e9 + 633c1ff commit 265db19
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 12 deletions.
36 changes: 28 additions & 8 deletions crates/subspace-farmer/src/cluster/nats_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,13 @@ impl NatsClient {
return;
};

let message_payload_size = message.payload.len();
let request = match Request::decode(&mut message.payload.as_ref()) {
Ok(request) => request,
Ok(request) => {
// Free allocation early
drop(message.payload);
request
}
Err(error) => {
warn!(
request_type = %type_name::<Request>(),
Expand All @@ -620,12 +625,22 @@ impl NatsClient {
}
};

trace!(
request_type = %type_name::<Request>(),
?request,
%reply_subject,
"Processing request"
);
// Avoid printing large messages in logs
if message_payload_size > 1024 {
trace!(
request_type = %type_name::<Request>(),
%reply_subject,
"Processing request"
);
} else {
trace!(
request_type = %type_name::<Request>(),
?request,
%reply_subject,
"Processing request"
);
}

if let Some(response) = process(request).await
&& let Err(error) = self.publish(reply_subject, response.encode().into()).await
{
Expand Down Expand Up @@ -757,7 +772,7 @@ impl NatsClient {
buffer.push_back(element);
}

while !buffer.is_empty() {
loop {
let is_done = response_stream.is_done() && overflow_buffer.is_empty();
let num_messages = buffer.len();
let response = if is_done {
Expand Down Expand Up @@ -895,6 +910,11 @@ impl NatsClient {
}

index += 1;

// Unless `overflow_buffer` wasn't empty abort inner loop
if buffer.is_empty() {
break;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/cluster/plotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ async fn process_plot_sector_request<P>(
return;
}

let (mut response_proxy_sender, response_proxy_receiver) = mpsc::channel(0);
let (mut response_proxy_sender, response_proxy_receiver) = mpsc::channel(10);

let response_streaming_fut = nats_client
.stream_response::<ClusterPlotterPlotSectorRequest, _>(
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/disk_piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl farm::PieceCache for DiskPieceCache {
FarmError,
> {
let this = self.clone();
let (mut sender, receiver) = mpsc::channel(1);
let (mut sender, receiver) = mpsc::channel(100_000);
let read_contents = task::spawn_blocking(move || {
let contents = this.contents();
for (piece_cache_offset, maybe_piece) in contents {
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ where
break farmer_app_info;
};

let (progress_sender, mut progress_receiver) = mpsc::channel(0);
let (progress_sender, mut progress_receiver) = mpsc::channel(10);

// Initiate plotting
plotter
Expand Down Expand Up @@ -408,7 +408,7 @@ where
}
}

let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(0);
let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(10);
progress_receiver = retry_progress_receiver;

// Initiate plotting
Expand Down

0 comments on commit 265db19

Please sign in to comment.