Skip to content

Commit

Permalink
Merge pull request #2893 from subspace/improve-nats-client
Browse files Browse the repository at this point in the history
Improve NATS client
  • Loading branch information
nazar-pc committed Jul 9, 2024
2 parents 2c80beb + 4d11f61 commit ba8cf9c
Showing 1 changed file with 149 additions and 120 deletions.
269 changes: 149 additions & 120 deletions crates/subspace-farmer/src/cluster/nats_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, mem};
use thiserror::Error;
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn, Instrument};
use ulid::Ulid;

const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
Expand Down Expand Up @@ -275,38 +275,37 @@ impl<Response> StreamResponseSubscriber<Response> {
let (acknowledgement_sender, mut acknowledgement_receiver) =
mpsc::unbounded::<(String, u32)>();

let background_task = AsyncJoinOnDrop::new(
tokio::spawn({
let response_subject = response_subject.clone();

async move {
while let Some((subject, index)) = acknowledgement_receiver.next().await {
trace!(
let ack_publisher_fut = {
let response_subject = response_subject.clone();

async move {
while let Some((subject, index)) = acknowledgement_receiver.next().await {
trace!(
%subject,
%index,
%response_subject,
%index,
"Sending stream response acknowledgement"
);
if let Err(error) = nats_client
.publish(subject.clone(), index.to_le_bytes().to_vec().into())
.await
{
warn!(
%error,
%subject,
%index,
%response_subject,
%index,
"Sending stream response acknowledgement"
"Failed to send stream response acknowledgement"
);
if let Err(error) = nats_client
.publish(subject.clone(), index.to_le_bytes().to_vec().into())
.await
{
warn!(
%error,
%subject,
%index,
%response_subject,
%index,
"Failed to send stream response acknowledgement"
);
return;
}
return;
}
}
}),
true,
);
}
};
let background_task =
AsyncJoinOnDrop::new(tokio::spawn(ack_publisher_fut.in_current_span()), true);

Self {
response_subject,
Expand Down Expand Up @@ -703,6 +702,7 @@ impl NatsClient {
.await
{
warn!(
%response_subject,
%error,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
Expand All @@ -713,18 +713,15 @@ impl NatsClient {
return;
}
};
let max_responses_per_message =
self.approximate_max_message_size() / first_element.encoded_size();

// Initialize buffer that will be reused for responses
let mut buffer = VecDeque::with_capacity(max_responses_per_message);
buffer.push_back(first_element);
let approximate_max_message_size = self.approximate_max_message_size();
let max_responses_per_message = approximate_max_message_size / first_element.encoded_size();

let ack_subject = format!("stream-response-ack.{}", Ulid::new());
let mut ack_subscription = match self.subscribe(ack_subject.clone()).await {
Ok(ack_subscription) => ack_subscription,
Err(error) => {
warn!(
%response_subject,
%error,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
Expand All @@ -734,12 +731,17 @@ impl NatsClient {
}
};
debug!(
%response_subject,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
?ack_subscription,
"Ack subscription subscription"
);
let mut index = 0;
// Initialize buffer that will be reused for responses
let mut buffer = VecDeque::with_capacity(max_responses_per_message);
buffer.push_back(first_element);
let mut overflow_buffer = VecDeque::new();

loop {
// Try to fill the buffer
Expand All @@ -755,118 +757,145 @@ impl NatsClient {
buffer.push_back(element);
}

let is_done = response_stream.is_done();
debug!(
%response_subject,
num_messages = buffer.len(),
%index,
%is_done,
"Publishing stream response messages",
);
let response = if is_done {
Response::<Request>::Last {
index,
responses: buffer,
}
} else {
Response::<Request>::Continue {
index,
responses: buffer,
ack_subject: ack_subject.clone(),
while !buffer.is_empty() {
let is_done = response_stream.is_done() && overflow_buffer.is_empty();
let num_messages = buffer.len();
let response = if is_done {
Response::<Request>::Last {
index,
responses: buffer,
}
} else {
Response::<Request>::Continue {
index,
responses: buffer,
ack_subject: ack_subject.clone(),
}
};
let encoded_response = response.encode();
// When encoded response is too large, remove one of the responses from it and try
// again
if encoded_response.len() > approximate_max_message_size {
buffer = response.into();
if let Some(element) = buffer.pop_back() {
overflow_buffer.push_front(element);
continue;
} else {
error!(
%response_subject,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
"Empty response overflown message size, this should never happen"
);
return;
}
}
};

if let Err(error) = self
.publish(response_subject.clone(), response.encode().into())
.await
{
warn!(
%error,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
"Failed to send stream response"
debug!(
%response_subject,
num_messages,
%index,
%is_done,
"Publishing stream response messages",
);
return;
}

if is_done {
return;
} else {
buffer = response.into();
buffer.clear();
}
if let Err(error) = self
.publish(response_subject.clone(), encoded_response.into())
.await
{
warn!(
%response_subject,
%error,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
"Failed to send stream response"
);
return;
}

if index >= 1 {
// Acknowledgements are received with delay
let expected_index = index - 1;
if is_done {
return;
} else {
buffer = response.into();
buffer.clear();
// Fill buffer with any overflown responses that may have been stored
buffer.extend(overflow_buffer.drain(..));
}

trace!(
%response_subject,
%expected_index,
"Waiting for acknowledgement"
);
match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next()).await {
Ok(Some(message)) => {
if let Some(received_index) = message
.payload
.split_at_checked(mem::size_of::<u32>())
.map(|(bytes, _)| {
u32::from_le_bytes(
bytes.try_into().expect("Correctly chunked slice; qed"),
)
})
{
debug!(
%response_subject,
%received_index,
"Received acknowledgement"
);
if received_index != expected_index {
warn!(
if index >= 1 {
// Acknowledgements are received with delay
let expected_index = index - 1;

trace!(
%response_subject,
%expected_index,
"Waiting for acknowledgement"
);
match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next())
.await
{
Ok(Some(message)) => {
if let Some(received_index) = message
.payload
.split_at_checked(mem::size_of::<u32>())
.map(|(bytes, _)| {
u32::from_le_bytes(
bytes.try_into().expect("Correctly chunked slice; qed"),
)
})
{
debug!(
%response_subject,
%received_index,
%expected_index,
"Received acknowledgement"
);
if received_index != expected_index {
warn!(
%response_subject,
%received_index,
%expected_index,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
message = %hex::encode(message.payload),
"Unexpected acknowledgement index"
);
return;
}
} else {
warn!(
%response_subject,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
message = %hex::encode(message.payload),
"Unexpected acknowledgement index"
"Unexpected acknowledgement message"
);
return;
}
} else {
}
Ok(None) => {
warn!(
%response_subject,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
message = %hex::encode(message.payload),
"Unexpected acknowledgement message"
"Acknowledgement stream ended unexpectedly"
);
return;
}
Err(_error) => {
warn!(
%response_subject,
%expected_index,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
"Acknowledgement wait timed out"
);
return;
}
}
Ok(None) => {
warn!(
%response_subject,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
"Acknowledgement stream ended unexpectedly"
);
return;
}
Err(_error) => {
warn!(
%response_subject,
%expected_index,
request_type = %type_name::<Request>(),
response_type = %type_name::<Request::Response>(),
"Acknowledgement wait timed out"
);
return;
}
}
}

index += 1;
index += 1;
}
}
}

Expand Down

0 comments on commit ba8cf9c

Please sign in to comment.