Skip to content

Commit

Permalink
feat: use generic_queue_responder to reduce duplicate code. (#2879)
Browse files Browse the repository at this point in the history
* feat: use generic_subscribe_responder to reduce duplicate code.

* Rename `generic_subscribe_responder` to `request_responder`

* Move `request_responder` without changes

* Inline request responder function

* Remove unnecessary `label` argument

* Small cleanups

---------

Co-authored-by: Nazar Mokrynskyi <[email protected]>
  • Loading branch information
tediou5 and nazar-pc committed Jun 27, 2024
1 parent cf16099 commit 4e3bd72
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 622 deletions.
286 changes: 39 additions & 247 deletions crates/subspace-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::cluster::nats_client::{
};
use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset};
use anyhow::anyhow;
use async_nats::Message;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::{select, stream, FutureExt, Stream, StreamExt};
Expand Down Expand Up @@ -361,99 +360,28 @@ where
caches_details
.iter()
.map(|cache_details| async move {
// Initialize with pending future so it never ends
let mut processing =
FuturesUnordered::<Pin<Box<dyn Future<Output = ()> + Send>>>::from_iter([
Box::pin(pending()) as Pin<Box<_>>,
]);
let subscription = nats_client
.queue_subscribe(
ClusterCacheWritePieceRequest::SUBJECT
.replace('*', &cache_details.cache_id_string),
cache_details.cache_id_string.clone(),
nats_client
.request_responder(
Some(cache_details.cache_id_string.as_str()),
Some(cache_details.cache_id_string.clone()),
|request: ClusterCacheWritePieceRequest| async move {
Some(
cache_details
.cache
.write_piece(request.offset, request.piece_index, &request.piece)
.await
.map_err(|error| error.to_string()),
)
},
)
.await
.map_err(|error| {
anyhow!(
"Failed to subscribe to write piece requests for cache {}: {}",
cache_details.cache_id,
error
)
})?;
debug!(?subscription, "Write piece requests subscription");
let mut subscription = subscription.fuse();

loop {
select! {
maybe_message = subscription.next() => {
let Some(message) = maybe_message else {
break;
};

// Create background task for concurrent processing
processing.push(Box::pin(process_write_piece_request(
nats_client,
cache_details,
message,
)));
}
_ = processing.next() => {
// Nothing to do here
}
}
}

Ok(())
})
.collect::<FuturesUnordered<_>>()
.next()
.await
.ok_or_else(|| anyhow!("No caches"))?
}

async fn process_write_piece_request<C>(
nats_client: &NatsClient,
cache_details: &CacheDetails<'_, C>,
message: Message,
) where
C: PieceCache,
{
let Some(reply_subject) = message.reply else {
return;
};

let ClusterCacheWritePieceRequest {
offset,
piece_index,
piece,
} = match ClusterCacheWritePieceRequest::decode(&mut message.payload.as_ref()) {
Ok(request) => request,
Err(error) => {
warn!(
%error,
message = %hex::encode(message.payload),
"Failed to decode write piece request"
);
return;
}
};

trace!(%offset, %piece_index, %reply_subject, "Write piece request");

let response: <ClusterCacheWritePieceRequest as GenericRequest>::Response = cache_details
.cache
.write_piece(offset, piece_index, &piece)
.await
.map_err(|error| error.to_string());

if let Err(error) = nats_client
.publish(reply_subject, response.encode().into())
.await
{
warn!(%error, "Failed to send write piece response");
}
}

async fn read_piece_index_responder<C>(
nats_client: &NatsClient,
caches_details: &[CacheDetails<'_, C>],
Expand All @@ -464,96 +392,28 @@ where
caches_details
.iter()
.map(|cache_details| async move {
// Initialize with pending future so it never ends
let mut processing =
FuturesUnordered::<Pin<Box<dyn Future<Output = ()> + Send>>>::from_iter([
Box::pin(pending()) as Pin<Box<_>>,
]);
let subscription = nats_client
.queue_subscribe(
ClusterCacheReadPieceIndexRequest::SUBJECT
.replace('*', &cache_details.cache_id_string),
cache_details.cache_id_string.clone(),
nats_client
.request_responder(
Some(cache_details.cache_id_string.as_str()),
Some(cache_details.cache_id_string.clone()),
|request: ClusterCacheReadPieceIndexRequest| async move {
Some(
cache_details
.cache
.read_piece_index(request.offset)
.await
.map_err(|error| error.to_string()),
)
},
)
.await
.map_err(|error| {
anyhow!(
"Failed to subscribe to read piece index requests for cache {}: {}",
cache_details.cache_id,
error
)
})?;
debug!(?subscription, "Read piece index requests subscription");
let mut subscription = subscription.fuse();

loop {
select! {
maybe_message = subscription.next() => {
let Some(message) = maybe_message else {
break;
};

// Create background task for concurrent processing
processing.push(Box::pin(process_read_piece_index_request(
nats_client,
cache_details,
message,
)));
}
_ = processing.next() => {
// Nothing to do here
}
}
}

Ok(())
})
.collect::<FuturesUnordered<_>>()
.next()
.await
.ok_or_else(|| anyhow!("No caches"))?
}

async fn process_read_piece_index_request<C>(
nats_client: &NatsClient,
cache_details: &CacheDetails<'_, C>,
message: Message,
) where
C: PieceCache,
{
let Some(reply_subject) = message.reply else {
return;
};

let ClusterCacheReadPieceIndexRequest { offset } =
match ClusterCacheReadPieceIndexRequest::decode(&mut message.payload.as_ref()) {
Ok(request) => request,
Err(error) => {
warn!(
%error,
message = %hex::encode(message.payload),
"Failed to decode read piece index request"
);
return;
}
};

trace!(%offset, %reply_subject, "Read piece index request");

let response: <ClusterCacheReadPieceIndexRequest as GenericRequest>::Response = cache_details
.cache
.read_piece_index(offset)
.await
.map_err(|error| error.to_string());

if let Err(error) = nats_client
.publish(reply_subject, response.encode().into())
.await
{
warn!(%error, "Failed to send read piece index response");
}
}

async fn read_piece_responder<C>(
nats_client: &NatsClient,
caches_details: &[CacheDetails<'_, C>],
Expand All @@ -564,96 +424,28 @@ where
caches_details
.iter()
.map(|cache_details| async move {
// Initialize with pending future so it never ends
let mut processing =
FuturesUnordered::<Pin<Box<dyn Future<Output = ()> + Send>>>::from_iter([
Box::pin(pending()) as Pin<Box<_>>,
]);
let subscription = nats_client
.queue_subscribe(
ClusterCacheReadPieceRequest::SUBJECT
.replace('*', &cache_details.cache_id_string),
cache_details.cache_id_string.clone(),
nats_client
.request_responder(
Some(cache_details.cache_id_string.as_str()),
Some(cache_details.cache_id_string.clone()),
|request: ClusterCacheReadPieceRequest| async move {
Some(
cache_details
.cache
.read_piece(request.offset)
.await
.map_err(|error| error.to_string()),
)
},
)
.await
.map_err(|error| {
anyhow!(
"Failed to subscribe to read piece requests for cache {}: {}",
cache_details.cache_id,
error
)
})?;
debug!(?subscription, "Read piece requests subscription");
let mut subscription = subscription.fuse();

loop {
select! {
maybe_message = subscription.next() => {
let Some(message) = maybe_message else {
break;
};

// Create background task for concurrent processing
processing.push(Box::pin(process_read_piece_request(
nats_client,
cache_details,
message,
)));
}
_ = processing.next() => {
// Nothing to do here
}
}
}

Ok(())
})
.collect::<FuturesUnordered<_>>()
.next()
.await
.ok_or_else(|| anyhow!("No caches"))?
}

async fn process_read_piece_request<C>(
nats_client: &NatsClient,
cache_details: &CacheDetails<'_, C>,
message: Message,
) where
C: PieceCache,
{
let Some(reply_subject) = message.reply else {
return;
};

let ClusterCacheReadPieceRequest { offset } =
match ClusterCacheReadPieceRequest::decode(&mut message.payload.as_ref()) {
Ok(request) => request,
Err(error) => {
warn!(
%error,
message = %hex::encode(message.payload),
"Failed to decode read piece request"
);
return;
}
};

trace!(%offset, %reply_subject, "Read piece request");

let response: <ClusterCacheReadPieceRequest as GenericRequest>::Response = cache_details
.cache
.read_piece(offset)
.await
.map_err(|error| error.to_string());

if let Err(error) = nats_client
.publish(reply_subject, response.encode().into())
.await
{
warn!(%error, "Failed to send read piece response");
}
}

async fn contents_responder<C>(
nats_client: &NatsClient,
caches_details: &[CacheDetails<'_, C>],
Expand Down
Loading

0 comments on commit 4e3bd72

Please sign in to comment.