Skip to content

Commit

Permalink
Merge pull request #2266 from subspace/operator_id_cli
Browse files Browse the repository at this point in the history
Introduce `operator_id` cli arg that replaces `operator` arg.
  • Loading branch information
vedhavyas authored Nov 23, 2023
2 parents 64fe9df + a694025 commit 3b97f85
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 72 deletions.
3 changes: 3 additions & 0 deletions crates/sp-domains/src/bundle_producer_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub fn is_below_threshold(vrf_output: &VrfOutput, threshold: u128) -> bool {

#[derive(Debug, Decode, Encode, TypeInfo, PartialEq, Eq, Clone)]
pub struct BundleProducerElectionParams<Balance> {
// TODO: current operators is not required anymore.
// This is not removed right now in order to not break runtime api with new version
// marking a todo to remove it before next network.
pub current_operators: Vec<OperatorId>,
pub total_domain_stake: Balance,
pub bundle_slot_probability: (u64, u64),
Expand Down
24 changes: 17 additions & 7 deletions crates/subspace-node/src/domain/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sc_service::{BasePath, Configuration};
use sp_blockchain::HeaderBackend;
use sp_domain_digests::AsPredigest;
use sp_domains::storage::RawGenesis;
use sp_domains::DomainId;
use sp_domains::{DomainId, OperatorId};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::Header;
use sp_runtime::{BuildStorage, DigestItem};
Expand Down Expand Up @@ -64,6 +64,10 @@ fn parse_domain_id(s: &str) -> std::result::Result<DomainId, ParseIntError> {
s.parse::<u32>().map(Into::into)
}

fn parse_operator_id(s: &str) -> std::result::Result<OperatorId, ParseIntError> {
s.parse::<u64>().map(OperatorId::from)
}

#[derive(Debug, Parser)]
pub struct DomainCli {
/// Run a domain node.
Expand All @@ -73,9 +77,9 @@ pub struct DomainCli {
#[clap(long, value_parser = parse_domain_id)]
pub domain_id: DomainId,

/// Run the node as an Operator
#[arg(long, conflicts_with = "validator")]
pub operator: bool,
/// Use provider operator id to submit bundles.
#[arg(long, value_parser = parse_operator_id)]
pub operator_id: Option<OperatorId>,

/// Additional args for domain.
#[clap(raw = true)]
Expand Down Expand Up @@ -214,9 +218,15 @@ impl CliConfiguration<Self> for DomainCli {
self.run.chain_id(is_dev)
}

fn role(&self, is_dev: bool) -> Result<sc_service::Role> {
// is authority when operator is enabled or in dev mode
let is_authority = self.operator || self.run.validator || is_dev;
fn role(&self, _is_dev: bool) -> Result<sc_service::Role> {
if self.run.validator {
return Err(sc_cli::Error::Input(
"use `--operator-id` argument to run as operator".to_string(),
));
}

// is authority when operator_id is passed.
let is_authority = self.operator_id.is_some();

Ok(if is_authority {
Role::Authority
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-node/src/domain/domain_instance_starter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl DomainInstanceStarter {
domain_message_receiver,
provider: eth_provider,
skip_empty_bundle_production: true,
maybe_operator_id: domain_cli.operator_id,
};

let mut domain_node = domain_service::new_full::<
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use sp_api::ProvideRuntimeApi;
use sp_consensus_slots::Slot;
use sp_core::bytes::to_hex;
use sp_core::ByteArray;
use sp_domains::bundle_producer_election::{
calculate_threshold, is_below_threshold, make_transcript, BundleProducerElectionParams,
};
use sp_domains::{BundleProducerElectionApi, DomainId, OperatorPublicKey, ProofOfElection};
use sp_domains::{
BundleProducerElectionApi, DomainId, OperatorId, OperatorPublicKey, ProofOfElection,
};
use sp_keystore::{Keystore, KeystorePtr};
use sp_runtime::traits::Block as BlockT;
use sp_runtime::RuntimeAppPublic;
use std::marker::PhantomData;
use std::sync::Arc;
use subspace_core_primitives::Randomness;
use subspace_runtime_primitives::Balance;
use tracing::log;

pub(super) struct BundleProducerElectionSolver<Block, CBlock, CClient> {
keystore: KeystorePtr,
Expand Down Expand Up @@ -48,57 +53,69 @@ where
slot: Slot,
consensus_block_hash: CBlock::Hash,
domain_id: DomainId,
maybe_operator_id: Option<OperatorId>,
global_randomness: Randomness,
) -> sp_blockchain::Result<Option<(ProofOfElection<CBlock::Hash>, OperatorPublicKey)>> {
let BundleProducerElectionParams {
current_operators,
total_domain_stake,
bundle_slot_probability,
} = match self
.consensus_client
.runtime_api()
.bundle_producer_election_params(consensus_block_hash, domain_id)?
{
Some(params) => params,
None => return Ok(None),
};
if let Some(operator_id) = maybe_operator_id {
let BundleProducerElectionParams {
total_domain_stake,
bundle_slot_probability,
..
} = match self
.consensus_client
.runtime_api()
.bundle_producer_election_params(consensus_block_hash, domain_id)?
{
Some(params) => params,
None => return Ok(None),
};

let global_challenge = global_randomness.derive_global_challenge(slot.into());
let vrf_sign_data = make_transcript(domain_id, &global_challenge).into_sign_data();
let global_challenge = global_randomness.derive_global_challenge(slot.into());
let vrf_sign_data = make_transcript(domain_id, &global_challenge).into_sign_data();

// TODO: The runtime API may take 10~20 microseonds each time, looping the operator set
// could take too long for the bundle production, track a mapping of signing_key to
// operator_id in the runtime and then we can update it to loop the keys in the keystore.
for operator_id in current_operators {
// Ideally, we can already cache operator signing key since we do not allow changing such key
// in the protocol right now. Leaving this as is since we anyway need to need to fetch operator's
// latest stake and this also returns the signing key with it.
if let Some((operator_signing_key, operator_stake)) = self
.consensus_client
.runtime_api()
.operator(consensus_block_hash, operator_id)?
{
if let Ok(Some(vrf_signature)) = Keystore::sr25519_vrf_sign(
if let Ok(maybe_vrf_signature) = Keystore::sr25519_vrf_sign(
&*self.keystore,
OperatorPublicKey::ID,
&operator_signing_key.clone().into(),
&vrf_sign_data,
) {
let threshold = calculate_threshold(
operator_stake,
total_domain_stake,
bundle_slot_probability,
);
if let Some(vrf_signature) = maybe_vrf_signature {
let threshold = calculate_threshold(
operator_stake,
total_domain_stake,
bundle_slot_probability,
);

if is_below_threshold(&vrf_signature.output, threshold) {
let proof_of_election = ProofOfElection {
domain_id,
slot_number: slot.into(),
global_randomness,
vrf_signature,
operator_id,
consensus_block_hash,
};
return Ok(Some((proof_of_election, operator_signing_key)));
if is_below_threshold(&vrf_signature.output, threshold) {
let proof_of_election = ProofOfElection {
domain_id,
slot_number: slot.into(),
global_randomness,
vrf_signature,
operator_id,
consensus_block_hash,
};
return Ok(Some((proof_of_election, operator_signing_key)));
}
} else {
log::warn!(
"Operator[{operator_id}]'s Signing key[{}] pair is not available in keystore.",
to_hex(operator_signing_key.as_slice(), false)
);
return Ok(None);
}
}
} else {
log::warn!("Operator[{operator_id}] is not registered on the Runtime",);
return Ok(None);
}
}

Expand Down
9 changes: 7 additions & 2 deletions domains/client/domain-operator/src/domain_bundle_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use sp_api::{NumberFor, ProvideRuntimeApi};
use sp_block_builder::BlockBuilder;
use sp_blockchain::{HashAndNumber, HeaderBackend};
use sp_domains::{
Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorPublicKey, OperatorSignature,
SealedBundleHeader,
Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey,
OperatorSignature, SealedBundleHeader,
};
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Zero};
Expand All @@ -34,6 +34,7 @@ where
CBlock: BlockT,
{
domain_id: DomainId,
maybe_operator_id: Option<OperatorId>,
consensus_client: Arc<CClient>,
client: Arc<Client>,
bundle_sender: Arc<BundleSender<Block, CBlock>>,
Expand All @@ -52,6 +53,7 @@ where
fn clone(&self) -> Self {
Self {
domain_id: self.domain_id,
maybe_operator_id: self.maybe_operator_id,
consensus_client: self.consensus_client.clone(),
client: self.client.clone(),
bundle_sender: self.bundle_sender.clone(),
Expand Down Expand Up @@ -79,6 +81,7 @@ where
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
domain_id: DomainId,
maybe_operator_id: Option<OperatorId>,
consensus_client: Arc<CClient>,
client: Arc<Client>,
domain_bundle_proposer: DomainBundleProposer<
Expand All @@ -98,6 +101,7 @@ where
);
Self {
domain_id,
maybe_operator_id,
consensus_client,
client,
bundle_sender,
Expand Down Expand Up @@ -146,6 +150,7 @@ where
slot,
consensus_block_info.hash,
self.domain_id,
self.maybe_operator_id,
global_randomness,
)?
{
Expand Down
46 changes: 23 additions & 23 deletions domains/client/domain-operator/src/domain_worker_starter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use sp_block_builder::BlockBuilder;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_core::traits::{CodeExecutor, SpawnEssentialNamed};
use sp_core::H256;
use sp_domains::{BundleProducerElectionApi, DomainsApi};
use sp_domains::{BundleProducerElectionApi, DomainsApi, OperatorId};
use sp_domains_fraud_proof::FraudProofApi;
use sp_messenger::MessengerApi;
use sp_runtime::traits::NumberFor;
Expand All @@ -57,7 +57,7 @@ pub(super) async fn start_worker<
spawn_essential: Box<dyn SpawnEssentialNamed>,
consensus_client: Arc<CClient>,
consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
is_authority: bool,
maybe_operator_id: Option<OperatorId>,
bundle_producer: DomainBundleProducer<Block, CBlock, Client, CClient, TransactionPool>,
bundle_processor: BundleProcessor<Block, CBlock, Client, CClient, Backend, E>,
operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
Expand Down Expand Up @@ -118,27 +118,8 @@ pub(super) async fn start_worker<
consensus_block_import_throttling_buffer_size,
);

if !is_authority {
info!("🧑‍ Running as Full node...");
drop(new_slot_notification_stream);
drop(acknowledgement_sender_stream);
while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await {
if let Some(block_info) = maybe_block_info {
if let Err(error) = bundle_processor
.clone()
.process_bundles((block_info.hash, block_info.number, block_info.is_new_best))
.instrument(span.clone())
.await
{
tracing::error!(?error, "Failed to process consensus block");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
break;
}
}
}
} else {
info!("🧑‍🌾 Running as Operator...");
if let Some(operator_id) = maybe_operator_id {
info!("👷 Running as Operator[{operator_id}]...");
let bundler_fn = {
let span = span.clone();
move |consensus_block_info: sp_blockchain::HashAndNumber<CBlock>, slot_info| {
Expand Down Expand Up @@ -215,5 +196,24 @@ pub(super) async fn start_worker<
}
}
}
} else {
info!("🧑‍ Running as Full node...");
drop(new_slot_notification_stream);
drop(acknowledgement_sender_stream);
while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await {
if let Some(block_info) = maybe_block_info {
if let Err(error) = bundle_processor
.clone()
.process_bundles((block_info.hash, block_info.number, block_info.is_new_best))
.instrument(span.clone())
.await
{
tracing::error!(?error, "Failed to process consensus block");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
break;
}
}
}
}
}
4 changes: 2 additions & 2 deletions domains/client/domain-operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use sp_blockchain::HeaderBackend;
use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use sp_domain_digests::AsPredigest;
use sp_domains::{Bundle, DomainId, ExecutionReceipt};
use sp_domains::{Bundle, DomainId, ExecutionReceipt, OperatorId};
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use sp_runtime::DigestItem;
Expand Down Expand Up @@ -169,7 +169,7 @@ pub struct OperatorParams<
pub transaction_pool: Arc<TransactionPool>,
pub backend: Arc<Backend>,
pub code_executor: Arc<E>,
pub is_authority: bool,
pub maybe_operator_id: Option<OperatorId>,
pub keystore: KeystorePtr,
pub bundle_sender: Arc<BundleSender<Block, CBlock>>,
pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
Expand Down
3 changes: 2 additions & 1 deletion domains/client/domain-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ where

let bundle_producer = DomainBundleProducer::new(
params.domain_id,
params.maybe_operator_id,
params.consensus_client.clone(),
params.client.clone(),
domain_bundle_proposer,
Expand Down Expand Up @@ -179,7 +180,7 @@ where
spawn_essential.clone(),
params.consensus_client.clone(),
params.consensus_offchain_tx_pool_factory.clone(),
params.is_authority,
params.maybe_operator_id,
bundle_producer,
bundle_processor.clone(),
params.operator_streams,
Expand Down
6 changes: 4 additions & 2 deletions domains/service/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use sp_core::traits::SpawnEssentialNamed;
use sp_core::{Decode, Encode};
use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi};
use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId};
use sp_domains_fraud_proof::FraudProofApi;
use sp_messenger::messages::ChainId;
use sp_messenger::{MessengerApi, RelayerApi};
Expand Down Expand Up @@ -223,6 +223,7 @@ where
pub domain_id: DomainId,
pub domain_config: ServiceConfiguration,
pub domain_created_at: NumberFor<CBlock>,
pub maybe_operator_id: Option<OperatorId>,
pub consensus_client: Arc<CClient>,
pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
pub consensus_network_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
Expand Down Expand Up @@ -324,6 +325,7 @@ where
{
let DomainParams {
domain_id,
maybe_operator_id,
mut domain_config,
domain_created_at,
consensus_client,
Expand Down Expand Up @@ -450,7 +452,7 @@ where
transaction_pool: transaction_pool.clone(),
backend: backend.clone(),
code_executor: code_executor.clone(),
is_authority,
maybe_operator_id,
keystore: params.keystore_container.keystore(),
bundle_sender: Arc::new(bundle_sender),
operator_streams,
Expand Down
Loading

0 comments on commit 3b97f85

Please sign in to comment.