Skip to content

Commit

Permalink
Merge pull request #1809 from subspace/remove-node-piece-cache
Browse files Browse the repository at this point in the history
Remove node piece cache
  • Loading branch information
nazar-pc authored Aug 15, 2023
2 parents 2695aed + a1cf9f8 commit a3d3706
Show file tree
Hide file tree
Showing 20 changed files with 264 additions and 550 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sc-consensus-subspace-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ sp-consensus-subspace = { version = "0.1.0", path = "../sp-consensus-subspace" }
sp-consensus-slots = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" }
sp-blockchain = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" }
sp-core = { version = "21.0.0", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" }
sp-objects = { version = "0.1.0", path = "../sp-objects" }
sp-runtime = { version = "24.0.0", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" }
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" }
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
Expand Down
170 changes: 123 additions & 47 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use jsonrpsee::SubscriptionSink;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use sc_client_api::{AuxStore, BlockBackend};
use sc_consensus_subspace::archiver::{recreate_genesis_segment, SegmentHeadersStore};
use sc_consensus_subspace::notification::SubspaceNotificationStream;
use sc_consensus_subspace::{
ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification,
SegmentHeadersStore, SubspaceSyncOracle,
ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle,
};
use sc_rpc::{DenyUnsafe, SubscriptionTaskExecutor};
use sc_utils::mpsc::TracingUnboundedSender;
Expand All @@ -41,6 +41,7 @@ use sp_consensus_slots::Slot;
use sp_consensus_subspace::{FarmerPublicKey, FarmerSignature, SubspaceApi as SubspaceRuntimeApi};
use sp_core::crypto::ByteArray;
use sp_core::H256;
use sp_objects::ObjectsApi;
use sp_runtime::traits::Block as BlockT;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
Expand All @@ -49,6 +50,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use subspace_archiving::archiver::NewArchivedSegment;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{PieceIndex, SegmentHeader, SegmentIndex, Solution};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_networking::libp2p::Multiaddr;
Expand Down Expand Up @@ -141,31 +143,76 @@ struct BlockSignatureSenders {
senders: Vec<async_oneshot::Sender<RewardSignatureResponse>>,
}

/// In-memory cache of last archived segment, such that when request comes back right after
/// archived segment notification, RPC server is able to answer quickly.
///
/// We store weak reference, such that archived segment is not persisted for longer than
/// necessary occupying RAM.
enum CachedArchivedSegment {
/// Special case for genesis segment when requested over RPC
Genesis(Arc<NewArchivedSegment>),
Weak(Weak<NewArchivedSegment>),
}

impl CachedArchivedSegment {
fn get(&self) -> Option<Arc<NewArchivedSegment>> {
match self {
CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
}
}
}

/// Subspace RPC configuration
pub struct SubspaceRpcConfig<Client, SO, AS>
where
SO: SyncOracle + Send + Sync + Clone + 'static,
AS: AuxStore + Send + Sync + 'static,
{
/// Substrate client
pub client: Arc<Client>,
/// Task executor that is being used by RPC subscriptions
pub subscription_executor: SubscriptionTaskExecutor,
/// New slot notification stream
pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
/// Reward signing notification stream
pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
/// Archived segment notification stream
pub archived_segment_notification_stream:
SubspaceNotificationStream<ArchivedSegmentNotification>,
/// DSN bootstrap nodes
pub dsn_bootstrap_nodes: Vec<Multiaddr>,
/// Segment headers store
pub segment_headers_store: SegmentHeadersStore<AS>,
/// Subspace sync oracle
pub sync_oracle: SubspaceSyncOracle<SO>,
/// Signifies whether a potentially unsafe RPC should be denied
pub deny_unsafe: DenyUnsafe,
/// Kzg instance
pub kzg: Kzg,
}

/// Implements the [`SubspaceRpcApiServer`] trait for interacting with Subspace.
pub struct SubspaceRpc<Block, Client, SO, AS>
where
Block: BlockT,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
client: Arc<Client>,
executor: SubscriptionTaskExecutor,
subscription_executor: SubscriptionTaskExecutor,
new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
archived_segment_notification_stream: SubspaceNotificationStream<ArchivedSegmentNotification>,
solution_response_senders: Arc<Mutex<SolutionResponseSenders>>,
reward_signature_senders: Arc<Mutex<BlockSignatureSenders>>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
segment_headers_store: SegmentHeadersStore<AS>,
/// In-memory piece cache of last archived segment, such that when request comes back right
/// after archived segment notification, RPC server is able to answer quickly.
///
/// We store weak reference, such that archived segment is not persisted for longer than
/// necessary occupying RAM.
piece_cache: Arc<Mutex<Option<Weak<NewArchivedSegment>>>>,
cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
archived_segment_acknowledgement_senders:
Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
next_subscription_id: AtomicU64,
sync_oracle: SubspaceSyncOracle<SO>,
kzg: Kzg,
deny_unsafe: DenyUnsafe,
_block: PhantomData<Block>,
}
Expand All @@ -183,36 +230,24 @@ where
SO: SyncOracle + Send + Sync + Clone + 'static,
AS: AuxStore + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
/// Creates a new instance of the `SubspaceRpc` handler.
pub fn new(
client: Arc<Client>,
executor: SubscriptionTaskExecutor,
new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
archived_segment_notification_stream: SubspaceNotificationStream<
ArchivedSegmentNotification,
>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
segment_headers_store: SegmentHeadersStore<AS>,
sync_oracle: SubspaceSyncOracle<SO>,
deny_unsafe: DenyUnsafe,
) -> Self {
pub fn new(config: SubspaceRpcConfig<Client, SO, AS>) -> Self {
Self {
client,
executor,
new_slot_notification_stream,
reward_signing_notification_stream,
archived_segment_notification_stream,
client: config.client,
subscription_executor: config.subscription_executor,
new_slot_notification_stream: config.new_slot_notification_stream,
reward_signing_notification_stream: config.reward_signing_notification_stream,
archived_segment_notification_stream: config.archived_segment_notification_stream,
solution_response_senders: Arc::default(),
reward_signature_senders: Arc::default(),
dsn_bootstrap_nodes,
segment_headers_store,
piece_cache: Arc::default(),
dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
segment_headers_store: config.segment_headers_store,
cached_archived_segment: Arc::default(),
archived_segment_acknowledgement_senders: Arc::default(),
next_subscription_id: AtomicU64::default(),
sync_oracle,
deny_unsafe,
sync_oracle: config.sync_oracle,
kzg: config.kzg,
deny_unsafe: config.deny_unsafe,
_block: PhantomData,
}
}
Expand All @@ -228,7 +263,7 @@ where
+ Send
+ Sync
+ 'static,
Client::Api: SubspaceRuntimeApi<Block, FarmerPublicKey>,
Client::Api: SubspaceRuntimeApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
SO: SyncOracle + Send + Sync + Clone + 'static,
AS: AuxStore + Send + Sync + 'static,
{
Expand Down Expand Up @@ -290,7 +325,7 @@ where
}

fn subscribe_slot_info(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let executor = self.executor.clone();
let executor = self.subscription_executor.clone();
let solution_response_senders = self.solution_response_senders.clone();
let allow_solutions = self.deny_unsafe.check_if_safe().is_ok();

Expand Down Expand Up @@ -384,8 +419,11 @@ where
sink.pipe_from_stream(stream).await;
};

self.executor
.spawn("subspace-slot-info-subscription", Some("rpc"), fut.boxed());
self.subscription_executor.spawn(
"subspace-slot-info-subscription",
Some("rpc"),
fut.boxed(),
);

Ok(())
}
Expand All @@ -395,7 +433,7 @@ where
.check_if_safe()
.map_err(|_error| SubscriptionEmptyError)?;

let executor = self.executor.clone();
let executor = self.subscription_executor.clone();
let reward_signature_senders = self.reward_signature_senders.clone();

let stream = self.reward_signing_notification_stream.subscribe().map(
Expand Down Expand Up @@ -469,7 +507,7 @@ where
sink.pipe_from_stream(stream).await;
};

self.executor.spawn(
self.subscription_executor.spawn(
"subspace-block-signing-subscription",
Some("rpc"),
fut.boxed(),
Expand Down Expand Up @@ -500,7 +538,7 @@ where
let archived_segment_acknowledgement_senders =
self.archived_segment_acknowledgement_senders.clone();

let piece_cache = Arc::clone(&self.piece_cache);
let cached_archived_segment = Arc::clone(&self.cached_archived_segment);
let subscription_id = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
let allow_acknowledgements = self.deny_unsafe.check_if_safe().is_ok();

Expand Down Expand Up @@ -543,9 +581,11 @@ where
}
};

piece_cache
cached_archived_segment
.lock()
.replace(Arc::downgrade(&archived_segment));
.replace(CachedArchivedSegment::Weak(Arc::downgrade(
&archived_segment,
)));

maybe_archived_segment_header
} else {
Expand All @@ -570,7 +610,7 @@ where
.remove(&subscription_id);
};

self.executor.spawn(
self.subscription_executor.spawn(
"subspace-archived-segment-header-subscription",
Some("rpc"),
fut.boxed(),
Expand Down Expand Up @@ -612,7 +652,7 @@ where
}
};

self.executor.spawn(
self.subscription_executor.spawn(
"subspace-node-sync-status-change-subscription",
Some("rpc"),
fut.boxed(),
Expand Down Expand Up @@ -683,9 +723,45 @@ where
}

fn piece(&self, requested_piece_index: PieceIndex) -> RpcResult<Option<Vec<u8>>> {
let Some(archived_segment) = self.piece_cache.lock().as_ref().and_then(Weak::upgrade)
else {
return Ok(None);
self.deny_unsafe.check_if_safe()?;

let archived_segment = {
let mut cached_archived_segment = self.cached_archived_segment.lock();

match cached_archived_segment
.as_ref()
.and_then(CachedArchivedSegment::get)
{
Some(archived_segment) => archived_segment,
None => {
if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
return Ok(None);
}

debug!(%requested_piece_index, "Re-creating genesis segment on demand");

// Try to re-create genesis segment on demand
match recreate_genesis_segment(&*self.client, self.kzg.clone()) {
Ok(Some(archived_segment)) => {
let archived_segment = Arc::new(archived_segment);
cached_archived_segment.replace(CachedArchivedSegment::Genesis(
Arc::clone(&archived_segment),
));
archived_segment
}
Ok(None) => {
return Ok(None);
}
Err(error) => {
error!(%error, "Failed to re-create genesis segment");

return Err(JsonRpseeError::Custom(
"Failed to re-create genesis segment".to_string(),
));
}
}
}
}
};

let indices = archived_segment
Expand Down
Loading

0 comments on commit a3d3706

Please sign in to comment.