Skip to content

Commit

Permalink
feat(contract-distribution): Move sending contract accesses to Partia…
Browse files Browse the repository at this point in the history
…lWitnessActor (#12389)

Currently the contract accesses are sent by client actor and witness and
contract deploys are sent by PartialWitnessActor.

This PR addresses a TODO to also move the sending of contract accesses
to PartialWitnessActor to gather all the logic in the same place.

In this way, it is clearer the order of sending contract accesses,
witness, and deployments, and all managed in the same function. We also
get and pass the ordered chunk validators to different functions to send
messages to the validators.

TODO: Testing will likely be done in a follow-up PR, as I am still
figuring out the best way to test it. Currently relying on the existing
testloop tests for contract distribution.
  • Loading branch information
tayfunelmas authored Nov 5, 2024
1 parent eea3d61 commit 38d7e05
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use near_primitives::reed_solomon::{ReedSolomonEncoder, ReedSolomonEncoderCache}
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::stateless_validation::contract_distribution::{
ChunkContractAccesses, ChunkContractDeploys, CodeBytes, CodeHash, ContractCodeRequest,
ContractCodeResponse, PartialEncodedContractDeploys, PartialEncodedContractDeploysPart,
ContractCodeResponse, ContractUpdates, PartialEncodedContractDeploys,
PartialEncodedContractDeploysPart,
};
use near_primitives::stateless_validation::partial_witness::PartialEncodedStateWitness;
use near_primitives::stateless_validation::state_witness::{
Expand Down Expand Up @@ -78,7 +79,7 @@ pub struct DistributeStateWitnessRequest {
pub epoch_id: EpochId,
pub chunk_header: ShardChunkHeader,
pub state_witness: ChunkStateWitness,
pub contract_deploys: Vec<ContractCode>,
pub contract_updates: ContractUpdates,
}

#[derive(Clone, MultiSend, MultiSenderFrom)]
Expand Down Expand Up @@ -185,7 +186,7 @@ impl PartialWitnessActor {
epoch_id,
chunk_header,
state_witness,
contract_deploys,
contract_updates: ContractUpdates { contract_accesses, contract_deploys },
} = msg;

tracing::debug!(
Expand All @@ -194,16 +195,43 @@ impl PartialWitnessActor {
"distribute_chunk_state_witness",
);

// We send the state-witness and contract-updates in the following order:
// 1. We send the hashes of the contract code accessed (if contract code is excluded from witness and any contracts are called)
// before the state witness in order to allow validators to check and request missing contract code, while waiting for witness parts.
// 2. We send the state witness parts to witness-part owners.
// 3. We send the contract deploys parts to other validators (that do not validate the witness in this turn). This is lower priority
// since the newly-deployed contracts will be needed by other validators in later turns.

let signer = self.my_validator_signer()?;
let witness_bytes = compress_witness(&state_witness)?;
let key = state_witness.chunk_production_key();
let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created)
.expect("Chunk validators must be defined")
.ordered_chunk_validators();

self.send_state_witness_parts(epoch_id, chunk_header, witness_bytes, &signer)?;
if !contract_accesses.is_empty() {
self.send_contract_accesses_to_chunk_validators(
key.clone(),
contract_accesses,
&chunk_validators,
&signer,
);
}

self.send_chunk_contract_deploys_parts(
state_witness.chunk_production_key(),
contract_deploys,
let witness_bytes = compress_witness(&state_witness)?;
self.send_state_witness_parts(
epoch_id,
chunk_header,
witness_bytes,
&chunk_validators,
&signer,
)?;

if !contract_deploys.is_empty() {
self.send_chunk_contract_deploys_parts(key, contract_deploys)?;
}

Ok(())
}

Expand All @@ -213,17 +241,9 @@ impl PartialWitnessActor {
epoch_id: EpochId,
chunk_header: ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: &[AccountId],
signer: &ValidatorSigner,
) -> Result<Vec<(AccountId, PartialEncodedStateWitness)>, Error> {
let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
&epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?
.ordered_chunk_validators();

tracing::debug!(
target: "client",
chunk_hash=?chunk_header.chunk_hash(),
Expand Down Expand Up @@ -298,6 +318,7 @@ impl PartialWitnessActor {
epoch_id: EpochId,
chunk_header: ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: &[AccountId],
signer: &ValidatorSigner,
) -> Result<(), Error> {
// Capture these values first, as the sources are consumed before calling record_witness_sent.
Expand All @@ -309,8 +330,13 @@ impl PartialWitnessActor {
let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let validator_witness_tuple =
self.generate_state_witness_parts(epoch_id, chunk_header, witness_bytes, signer)?;
let validator_witness_tuple = self.generate_state_witness_parts(
epoch_id,
chunk_header,
witness_bytes,
chunk_validators,
signer,
)?;
encode_timer.observe_duration();

// Record the witness in order to match the incoming acks for measuring round-trip times.
Expand Down Expand Up @@ -546,6 +572,37 @@ impl PartialWitnessActor {
Ok(())
}

/// Sends the contract accesses to the same chunk validators
/// (except for the chunk producers that track the same shard),
/// which will receive the state witness for the new chunk.
fn send_contract_accesses_to_chunk_validators(
&self,
key: ChunkProductionKey,
contract_accesses: HashSet<CodeHash>,
chunk_validators: &[AccountId],
my_signer: &ValidatorSigner,
) {
let chunk_producers: HashSet<AccountId> = self
.epoch_manager
.get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)
.expect("Chunk producers must be defined")
.into_iter()
.collect();

// Exclude chunk producers that track the same shard from the target list, since they track the state that contains the respective code.
let target_chunk_validators = chunk_validators
.iter()
.filter(|validator| !chunk_producers.contains(*validator))
.cloned()
.collect();
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkContractAccesses(
target_chunk_validators,
ChunkContractAccesses::new(key, contract_accesses, my_signer),
),
));
}

/// Retrieves the code for the given contract hashes and distributes them to validator in parts.
///
/// This implements the first step of distributing contract code to validators where the contract codes
Expand All @@ -557,9 +614,6 @@ impl PartialWitnessActor {
key: ChunkProductionKey,
contract_codes: Vec<ContractCode>,
) -> Result<(), Error> {
if contract_codes.is_empty() {
return Ok(());
}
let contracts = contract_codes.into_iter().map(|contract| contract.into()).collect();
let compressed_deploys = ChunkContractDeploys::compress_contracts(&contracts)?;
let validator_parts = self.generate_contract_deploys_parts(&key, compressed_deploys)?;
Expand Down
69 changes: 8 additions & 61 deletions chain/client/src/stateless_validation/state_witness_producer.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;

use near_async::messaging::{CanSend, IntoSender};
use near_chain::{BlockHeader, Chain, ChainStoreAccess};
use near_chain_primitives::Error;
use near_network::types::{NetworkRequests, PeerManagerMessageRequest};
use near_o11y::log_assert_fail;
use near_primitives::challenge::PartialState;
use near_primitives::checked_feature;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunk, ShardChunkHeader};
use near_primitives::stateless_validation::contract_distribution::{
ChunkContractAccesses, CodeHash, ContractUpdates,
};
use near_primitives::stateless_validation::contract_distribution::ContractUpdates;
use near_primitives::stateless_validation::state_witness::{
ChunkStateTransition, ChunkStateWitness,
};
use near_primitives::stateless_validation::stored_chunk_state_transition_data::{
StoredChunkStateTransitionData, StoredChunkStateTransitionDataV1,
StoredChunkStateTransitionDataV2, StoredChunkStateTransitionDataV3,
};
use near_primitives::stateless_validation::ChunkProductionKey;
use near_primitives::types::{AccountId, EpochId, ShardId};
use near_primitives::validator_signer::ValidatorSigner;
use near_primitives::version::ProtocolFeature;
Expand Down Expand Up @@ -101,30 +97,17 @@ impl Client {
);
}

let ContractUpdates { contract_accesses, contract_deploys } =
if ProtocolFeature::ExcludeContractCodeFromStateWitness.enabled(protocol_version) {
contract_updates
} else {
Default::default()
};

// We send the hashes of the contract code accessed (if applicable) before the state witness in order to
// allow validators to check and request missing contract code, while waiting for witness parts.

// TODO(#11099): Consider moving this also to partial witness actor by passing ContractUpdates in DistributeStateWitnessRequest.
if !contract_accesses.is_empty() {
self.send_contract_accesses_to_chunk_validators(
state_witness.chunk_production_key(),
contract_accesses,
my_signer.as_ref(),
);
}
// Pass the contract changes to PartialWitnessActor only if we exclude contract code from state witness.
let contract_updates = ProtocolFeature::ExcludeContractCodeFromStateWitness
.enabled(protocol_version)
.then_some(contract_updates)
.unwrap_or_default();

self.partial_witness_adapter.send(DistributeStateWitnessRequest {
epoch_id: *epoch_id,
chunk_header,
state_witness,
contract_deploys,
contract_updates,
});
Ok(())
}
Expand Down Expand Up @@ -423,40 +406,4 @@ impl Client {
}
Ok(source_receipt_proofs)
}

/// Sends the contract accesses to the same chunk validators
/// (except for the chunk producers that track the same shard),
/// which will receive the state witness for the new chunk.
fn send_contract_accesses_to_chunk_validators(
&self,
key: ChunkProductionKey,
contract_accesses: HashSet<CodeHash>,
my_signer: &ValidatorSigner,
) {
let chunk_validators: HashSet<AccountId> = self
.epoch_manager
.get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created)
.expect("Chunk validators must be defined")
.assignments()
.iter()
.map(|(id, _)| id.clone())
.collect();

let chunk_producers: HashSet<AccountId> = self
.epoch_manager
.get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)
.expect("Chunk producers must be defined")
.into_iter()
.collect();

// Exclude chunk producers that track the same shard from the target list, since they track the state that contains the respective code.
let target_chunk_validators =
chunk_validators.difference(&chunk_producers).cloned().collect();
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkContractAccesses(
target_chunk_validators,
ChunkContractAccesses::new(key, contract_accesses, my_signer),
),
));
}
}
2 changes: 1 addition & 1 deletion chain/network/src/state_witness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct ContractCodeRequestMessage(pub ContractCodeRequest);
#[rtype(result = "()")]
pub struct ContractCodeResponseMessage(pub ContractCodeResponse);

// TODO(#11099) Rename this to generalize beyond partial witness.
/// Multi-sender for forwarding messages received from network to PartialWitnessActor.
#[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)]
#[multi_send_message_derive(Debug)]
#[multi_send_input_derive(Debug, Clone, PartialEq, Eq)]
Expand Down

0 comments on commit 38d7e05

Please sign in to comment.