Skip to content

Commit

Permalink
Merge branch 'main' into seeder-timekeeper-test
Browse files Browse the repository at this point in the history
  • Loading branch information
dariolina committed Oct 18, 2024
2 parents d2d0f0c + 70178b8 commit fe56043
Show file tree
Hide file tree
Showing 38 changed files with 669 additions and 491 deletions.
109 changes: 61 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/pallet-subspace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ subspace-verification = { version = "0.1.0", path = "../subspace-verification",

[dev-dependencies]
env_logger = "0.11.5"
futures = "0.3.30"
futures = "0.3.31"
pallet-balances = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
rand = { version = "0.8.5", features = ["min_const_gen"] }
sp-io = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
async-oneshot = "0.5.9"
futures = "0.3.30"
futures = "0.3.31"
futures-timer = "3.0.3"
jsonrpsee = { version = "0.24.5", features = ["server", "macros"] }
parking_lot = "0.12.2"
Expand Down
4 changes: 2 additions & 2 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub trait SubspaceRpcApi {
#[method(name = "subspace_lastSegmentHeaders")]
async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;

/// Block/transaction object mappings subscription
/// DSN object mappings subscription
#[subscription(
name = "subspace_subscribeObjectMappings" => "subspace_object_mappings",
unsubscribe = "subspace_unsubscribeObjectMappings",
Expand All @@ -181,7 +181,7 @@ pub trait SubspaceRpcApi {
)]
fn subscribe_object_mappings(&self);

/// Filtered block/transaction object mappings subscription
/// Filtered DSN object mappings subscription
#[subscription(
name = "subspace_subscribeFilteredObjectMappings" => "subspace_filtered_object_mappings",
unsubscribe = "subspace_unsubscribeFilteredObjectMappings",
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
async-trait = "0.1.83"
codec = { package = "parity-scale-codec", version = "3.6.12", features = ["derive"] }
futures = "0.3.30"
futures = "0.3.31"
parking_lot = "0.12.2"
rand = "0.8.5"
rand_chacha = "0.3.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-proof-of-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ include = [
[dependencies]
core_affinity = "0.8.1"
derive_more = { version = "1.0.0", features = ["full"] }
futures = "0.3.30"
futures = "0.3.31"
parity-scale-codec = { version = "3.6.12", features = ["derive"] }
parking_lot = "0.12.2"
rayon = "1.10.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-subspace-block-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-channel = "1.9.0"
async-trait = "0.1.83"
codec = { package = "parity-scale-codec", version = "3.6.12", default-features = false, features = ["derive"] }
derive_more = { version = "1.0.0", features = ["full"] }
futures = "0.3.30"
futures = "0.3.31"
parking_lot = "0.12.2"
sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
sc-network = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
Expand Down
2 changes: 1 addition & 1 deletion crates/sp-domains-fraud-proof/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fp-rpc = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier",
fp-self-contained = { version = "1.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968", features = ['default'] }
frame-support = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
frame-system = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
futures = "0.3.30"
futures = "0.3.31"
libsecp256k1 = { version = "0.7.1", features = ["static-context", "hmac"] }
pallet-balances = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
pallet-ethereum = { git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968", features = ['default'] }
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bitvec = "1.0.1"
# TODO: Switch to fs4 once https://github.com/al8n/fs4-rs/issues/15 is resolved
fs2 = "0.4.3"
futures = "0.3.30"
futures = "0.3.31"
hex = "0.4.3"
libc = "0.2.159"
parity-scale-codec = "3.6.12"
Expand All @@ -48,7 +48,7 @@ winapi = "0.3.9"

[dev-dependencies]
criterion = "0.5.1"
futures = "0.3.30"
futures = "0.3.31"
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" }
subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space" }

Expand Down
11 changes: 10 additions & 1 deletion crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ where
pieces_to_download
.entry(piece_index)
.or_default()
.push((record, metadata))
.push((record, metadata));
}
// This map will be mutated, removing piece indices we have already processed
let pieces_to_download = AsyncMutex::new(pieces_to_download);
Expand Down Expand Up @@ -788,6 +788,15 @@ where
}
}

if final_result.is_ok() && !pieces_to_download.is_empty() {
return Err(PlottingError::FailedToRetrievePieces {
error: anyhow::anyhow!(
"Successful result, but not all pieces were downloaded, this is likely a piece \
getter implementation bug"
),
});
}

final_result
}

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ event-listener = "5.3.1"
event-listener-primitives = "2.0.1"
fdlimit = { version = "0.3.0", optional = true }
fs4 = "0.9.1"
futures = "0.3.30"
futures = "0.3.31"
hex = { version = "0.4.3", features = ["serde"] }
hwlocality = { version = "1.0.0-alpha.6", features = ["vendored"], optional = true }
jsonrpsee = { version = "0.24.5", features = ["ws-client"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::{select, FutureExt, StreamExt};
use parking_lot::Mutex;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::future::{pending, ready, Future};
use std::future::{ready, Future};
use std::mem;
use std::pin::{pin, Pin};
use std::sync::Arc;
Expand Down Expand Up @@ -139,9 +139,7 @@ pub(super) async fn maintain_farms(
// Farm that is being added/removed right now (if any)
let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture).fuse();
// Initialize with pending future so it never ends
let mut farms = FuturesUnordered::from_iter([
Box::pin(pending()) as Pin<Box<dyn Future<Output = (FarmIndex, anyhow::Result<()>)>>>
]);
let mut farms = FuturesUnordered::new();

let farmer_identify_subscription = pin!(nats_client
.subscribe_to_broadcasts::<ClusterFarmerIdentifyFarmBroadcast>(None, None)
Expand Down Expand Up @@ -253,7 +251,7 @@ pub(super) async fn maintain_farms(
}
result = farm_add_remove_in_progress => {
if let Some((farm_index, expired_receiver, farm)) = result {
farms.push(Box::pin(async move {
farms.push(async move {
select! {
result = farm.run().fuse() => {
(farm_index, result)
Expand All @@ -263,7 +261,7 @@ pub(super) async fn maintain_farms(
(farm_index, Ok(()))
}
}
}));
});
}
}
}
Expand Down
108 changes: 24 additions & 84 deletions crates/subspace-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@

use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
use crate::cluster::nats_client::{
GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest,
GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
};
use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset};
use anyhow::anyhow;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::{select, stream, FutureExt, Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use std::future::{pending, Future};
use std::pin::{pin, Pin};
use std::pin::Pin;
use std::time::{Duration, Instant};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use tokio::time::MissedTickBehavior;
Expand Down Expand Up @@ -377,6 +376,7 @@ where
)
},
)
.instrument(info_span!("", cache_id = %cache_details.cache_id))
.await
})
.collect::<FuturesUnordered<_>>()
Expand Down Expand Up @@ -409,6 +409,7 @@ where
)
},
)
.instrument(info_span!("", cache_id = %cache_details.cache_id))
.await
})
.collect::<FuturesUnordered<_>>()
Expand Down Expand Up @@ -441,6 +442,7 @@ where
)
},
)
.instrument(info_span!("", cache_id = %cache_details.cache_id))
.await
})
.collect::<FuturesUnordered<_>>()
Expand All @@ -458,93 +460,31 @@ where
{
caches_details
.iter()
.enumerate()
.map(|(cache_index, cache_details)| async move {
// Initialize with pending future so it never ends
let mut processing = FuturesUnordered::from_iter([
Box::pin(pending()) as Pin<Box<dyn Future<Output = ()> + Send>>
]);
let mut subscription = nats_client
.subscribe_to_stream_requests(
Some(&cache_details.cache_id_string),
.map(|cache_details| async move {
nats_client
.stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
Some(cache_details.cache_id_string.as_str()),
Some(cache_details.cache_id_string.clone()),
|_request: ClusterCacheContentsRequest| async move {
Some(match cache_details.cache.contents().await {
Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
maybe_cache_element.map_err(|error| error.to_string())
})) as _,
Err(error) => {
error!(%error, "Failed to get contents");

Box::pin(stream::once(async move {
Err(format!("Failed to get contents: {error}"))
})) as _
}
})
},
)
.instrument(info_span!("", cache_id = %cache_details.cache_id))
.await
.map_err(|error| {
anyhow!(
"Failed to subscribe to contents requests for cache {}: {}",
cache_details.cache_id,
error
)
})?
.fuse();

loop {
select! {
maybe_message = subscription.next() => {
let Some(message) = maybe_message else {
break;
};

// Create background task for concurrent processing
processing.push(Box::pin(
process_contents_request(
nats_client,
cache_details,
message,
)
.instrument(info_span!("", %cache_index))
));
}
_ = processing.next() => {
// Nothing to do here
}
}
}

Ok(())
})
.collect::<FuturesUnordered<_>>()
.next()
.await
.ok_or_else(|| anyhow!("No caches"))?
}

async fn process_contents_request<C>(
nats_client: &NatsClient,
cache_details: &CacheDetails<'_, C>,
request: StreamRequest<ClusterCacheContentsRequest>,
) where
C: PieceCache,
{
trace!(?request, "Contents request");

match cache_details.cache.contents().await {
Ok(contents) => {
nats_client
.stream_response::<ClusterCacheContentsRequest, _>(
request.response_subject,
contents.map(|maybe_cache_element| {
maybe_cache_element.map_err(|error| error.to_string())
}),
)
.await;
}
Err(error) => {
error!(
%error,
cache_id = %cache_details.cache_id,
"Failed to get contents"
);

nats_client
.stream_response::<ClusterCacheContentsRequest, _>(
request.response_subject,
pin!(stream::once(async move {
Err(format!("Failed to get contents: {error}"))
})),
)
.await;
}
}
}
Loading

0 comments on commit fe56043

Please sign in to comment.