Skip to content

Commit

Permalink
Merge pull request #2876 from subspace/fix-cache-group
Browse files Browse the repository at this point in the history
Fix farming custer cache group handling
  • Loading branch information
nazar-pc authored Jun 25, 2024
2 parents c7e1711 + b587868 commit d3f970f
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub(super) async fn maintain_caches(
(Box::pin(ready(())) as Pin<Box<dyn Future<Output = ()>>>).fuse();

let cache_identify_subscription = pin!(nats_client
.subscribe_to_broadcasts::<ClusterCacheIdentifyBroadcast>(None, None)
.subscribe_to_broadcasts::<ClusterCacheIdentifyBroadcast>(Some(cache_group), None)
.await
.map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?);

Expand Down
13 changes: 6 additions & 7 deletions crates/subspace-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct ClusterCacheIdentifyBroadcast {
}

impl GenericBroadcast for ClusterCacheIdentifyBroadcast {
/// `*` here stands for cache group
const SUBJECT: &'static str = "subspace.cache.*.identify";
}

Expand Down Expand Up @@ -274,10 +275,7 @@ where
C: PieceCache,
{
let mut subscription = nats_client
.subscribe_to_broadcasts::<ClusterControllerCacheIdentifyBroadcast>(
Some(cache_group),
Some(cache_group.to_string()),
)
.subscribe_to_broadcasts::<ClusterControllerCacheIdentifyBroadcast>(Some(cache_group), None)
.await
.map_err(|error| {
anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
Expand Down Expand Up @@ -306,14 +304,14 @@ where
}

last_identification = Instant::now();
send_identify_broadcast(nats_client, caches_details).await;
send_identify_broadcast(nats_client, caches_details, cache_group).await;
interval.reset();
}
_ = interval.tick().fuse() => {
last_identification = Instant::now();
trace!("Cache self-identification");

send_identify_broadcast(nats_client, caches_details).await;
send_identify_broadcast(nats_client, caches_details, cache_group).await;
}
}
}
Expand All @@ -324,6 +322,7 @@ where
async fn send_identify_broadcast<C>(
nats_client: &NatsClient,
caches_details: &[CacheDetails<'_, C>],
cache_group: &str,
) where
C: PieceCache,
{
Expand All @@ -336,7 +335,7 @@ async fn send_identify_broadcast<C>(
cache_id: cache.cache_id,
max_num_elements: cache.cache.max_num_elements(),
},
&cache.cache_id_string,
cache_group,
)
.await
{
Expand Down

0 comments on commit d3f970f

Please sign in to comment.