Skip to content

Commit

Permalink
consensus(refactor): use sub_dag index to keep application/consensus …
Browse files Browse the repository at this point in the history
…in sync
  • Loading branch information
daltoncoder committed May 29, 2024
1 parent e777dc7 commit a957416
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 68 deletions.
4 changes: 2 additions & 2 deletions core/application/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ impl Env<UpdatePerm> {
*/
response.txn_receipts.push(receipt);
}
// Set the last executed block hash
app.set_last_block(block.digest);
// Set the last executed block hash and sub dag index
app.set_last_block(block.digest, block.sub_dag_index);

// Return the response
response
Expand Down
13 changes: 12 additions & 1 deletion core/application/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,9 +1067,11 @@ impl<B: Backend> State<B> {
}

// This function should only be called in the `run` method on `Env`.
pub fn set_last_block(&self, block_hash: [u8; 32]) {
pub fn set_last_block(&self, block_hash: [u8; 32], sub_dag_index: u64) {
self.metadata
.set(Metadata::LastBlockHash, Value::Hash(block_hash));
self.metadata
.set(Metadata::SubDagIndex, Value::SubDagIndex(sub_dag_index));
}

fn add_service(
Expand Down Expand Up @@ -1870,6 +1872,15 @@ impl<B: Backend> State<B> {
}
}

/// Gets subdag index, returns 0 if not set in state table
pub fn get_sub_dag_index(&self) -> u64 {
if let Some(Value::SubDagIndex(value)) = self.metadata.get(&Metadata::SubDagIndex) {
value
} else {
0
}
}

fn _get_epoch(&self) -> u64 {
if let Some(Value::Epoch(epoch)) = self.metadata.get(&Metadata::Epoch) {
epoch
Expand Down
1 change: 1 addition & 0 deletions core/application/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,7 @@ async fn run_transaction(
.run(Block {
transactions: requests,
digest: [0; 32],
sub_dag_index: 0,
})
.await
.map_err(|r| anyhow!(format!("{r:?}")))?;
Expand Down
10 changes: 7 additions & 3 deletions core/consensus/src/edge_node/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,18 @@ impl<T: BroadcastEventInterface<PubSubMsg>> TransactionStore<T> {
while let Some(parcel) = self.get_parcel(&last_digest) {
parcel_chain.push(last_digest);

txn_chain.push_front((parcel.inner.transactions.clone(), last_digest));
txn_chain.push_front((
parcel.inner.transactions.clone(),
parcel.inner.sub_dag_index,
last_digest,
));

if parcel.inner.last_executed == head {
let mut epoch_changed = false;

// We connected the chain now execute all the transactions
for (batch, digest) in txn_chain {
if execution.submit_batch(batch, digest).await {
for (batch, sub_dag_index, digest) in txn_chain {
if execution.submit_batch(batch, digest, sub_dag_index).await {
epoch_changed = true;
}
}
Expand Down
133 changes: 76 additions & 57 deletions core/consensus/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use lightning_interfaces::ExecutionEngineSocket;
use lightning_utils::application::QueryRunnerExt;
use narwhal_crypto::DefaultHashFunction;
use narwhal_executor::ExecutionState;
use narwhal_types::{BatchAPI, BatchDigest, ConsensusOutput, Transaction};
use narwhal_types::{Batch, BatchDigest, ConsensusOutput, Transaction};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Notify};
use tracing::{error, info};
Expand All @@ -21,6 +21,7 @@ pub struct AuthenticStampedParcel {
pub transactions: Vec<Transaction>,
pub last_executed: Digest,
pub epoch: Epoch,
pub sub_dag_index: u64,
}

impl ToDigest for AuthenticStampedParcel {
Expand Down Expand Up @@ -88,19 +89,32 @@ impl<Q: SyncQueryRunnerInterface, NE: Emitter> Execution<Q, NE> {
}

// Returns true if the epoch changed
pub(crate) async fn submit_batch(&self, payload: Vec<Transaction>, digest: Digest) -> bool {
pub(crate) async fn submit_batch(
&self,
payload: Vec<Transaction>,
digest: Digest,
sub_dag_index: u64,
) -> bool {
let transactions = payload
.into_iter()
.filter_map(|txn| TransactionRequest::try_from(txn.as_ref()).ok())
.filter_map(|txn| {
// Filter out transactions that wont serialize or have already been executed
if let Ok(txn) = TransactionRequest::try_from(txn.as_ref()) {
if !self.query_runner.has_executed_digest(txn.hash()) {
Some(txn)
} else {
None
}
} else {
None
}
})
.collect::<Vec<_>>();

if transactions.is_empty() {
return false;
}

let block = Block {
transactions,
digest,
sub_dag_index,
transactions,
};

let archive_block = block.clone();
Expand Down Expand Up @@ -159,62 +173,67 @@ impl<Q: SyncQueryRunnerInterface, NE: Emitter> Execution<Q, NE> {
#[async_trait]
impl<Q: SyncQueryRunnerInterface, NE: Emitter> ExecutionState for Execution<Q, NE> {
async fn handle_consensus_output(&self, consensus_output: ConsensusOutput) {
for (cert, batches) in consensus_output.batches {
let current_epoch = self.query_runner.get_current_epoch();
if cert.epoch() != current_epoch {
// If the certificate epoch does not match the current epoch in the application
// state do not execute this transaction, This could only happen in
// certain race conditions at the end of an epoch and we need this to ensure all
// nodes execute the same transactions
continue;
}

if !batches.is_empty() {
let mut batch_payload =
Vec::with_capacity(batches.iter().fold(0, |acc, batch| acc + batch.size()));

for batch in batches {
for tx_bytes in batch.transactions() {
if let Ok(tx) = TransactionRequest::try_from(tx_bytes.as_ref()) {
if !self.query_runner.has_executed_digest(tx.hash()) {
batch_payload.push(tx_bytes.to_owned());
}
}
}
}
let current_epoch = self.query_runner.get_current_epoch();

if batch_payload.is_empty() {
continue;
let sub_dag_index = consensus_output.sub_dag.sub_dag_index;
println!("subdag index: {sub_dag_index}");
let batch_payload: Vec<Vec<u8>> = consensus_output
.batches
.into_iter()
.filter_map(|(cert, batch)| {
// Skip over the ones that have a different epoch. Shouldnt ever happen
if cert.epoch() != current_epoch {
error!("we recieved a consensus cert from an epoch we are not on");
None
} else {
// Map the batch to just the transactions
Some(
batch
.into_iter()
.flat_map(|batch| match batch {
// work around because batch.transactions() would require clone
Batch::V1(btch) => btch.transactions,
Batch::V2(btch) => btch.transactions,
})
.collect::<Vec<Vec<u8>>>(),
)
}
})
.flatten()
.collect();

// We have batches in the payload send them over broadcast along with an attestion
// of them
let last_executed = self.query_runner.get_last_block();
let parcel = AuthenticStampedParcel {
transactions: batch_payload.clone(),
last_executed,
epoch: current_epoch,
};

let epoch_changed = self.submit_batch(batch_payload, parcel.to_digest()).await;

if let Err(e) = self.tx_narwhal_batches.send((parcel, epoch_changed)).await {
// This shouldn't ever happen. But if it does there is no critical tasks
// happening on the other end of this that would require a
// panic
error!("Narwhal failed to send batch payload to edge consensus: {e:?}");
}
if batch_payload.is_empty() {
return;
}
// We have batches in the payload send them over broadcast along with an attestion
// of them
let last_executed = self.query_runner.get_last_block();
let parcel = AuthenticStampedParcel {
transactions: batch_payload.clone(),
last_executed,
epoch: current_epoch,
sub_dag_index,
};

// Submit the batches to application layer and if the epoch changed reset last
// executed
if epoch_changed {
self.reconfigure_notify.notify_waiters();
}
}
let epoch_changed = self
.submit_batch(batch_payload, parcel.to_digest(), sub_dag_index)
.await;

if let Err(e) = self.tx_narwhal_batches.send((parcel, epoch_changed)).await {
// This shouldn't ever happen. But if it does there is no critical tasks
// happening on the other end of this that would require a
// panic
error!("Narwhal failed to send batch payload to edge consensus: {e:?}");
}

// Submit the batches to application layer and if the epoch changed reset last
// executed
if epoch_changed {
self.reconfigure_notify.notify_waiters();
}
}

async fn last_executed_sub_dag_index(&self) -> u64 {
0
self.query_runner.get_sub_dag_index()
}
}
1 change: 1 addition & 0 deletions core/consensus/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ fn generate_random_parcel(
transactions,
last_executed: last_executed.unwrap_or([0; 32]),
epoch: 1,
sub_dag_index: 0,
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/test-utils/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ async fn group_worker(
Block {
transactions: vec![req.unwrap()],
digest: [0; 32],
sub_dag_index: 0
}
},
Some(req) = req_rx.recv() => {
Expand Down Expand Up @@ -262,6 +263,7 @@ async fn group_worker(
Block {
transactions: vec![],
digest: [0; 32],
sub_dag_index: 0
}
},
else => {
Expand Down
2 changes: 2 additions & 0 deletions core/types/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub enum Metadata {
LastEpochHash,
LastBlockHash,
GenesisCommittee,
SubDagIndex,
}

/// The Value enum is a data type used to represent values in a key-value pair for a metadata table
Expand All @@ -108,6 +109,7 @@ pub enum Value {
NextNodeIndex(u32),
Hash([u8; 32]),
GenesisCommittee(Vec<NodeIndex>),
SubDagIndex(u64),
}

impl Value {
Expand Down
24 changes: 19 additions & 5 deletions core/types/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,12 @@ impl From<EthersTransactionWrapper> for EthersTransaction {
/// block.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Block {
pub transactions: Vec<TransactionRequest>,
// Digest of the narwhal certificate that included this
/// Digest of the narwhal certificate that included this
pub digest: [u8; 32],
/// The narwhal subdag index that included these transactions
pub sub_dag_index: u64,
/// List of transactions to be executed in this block
pub transactions: Vec<TransactionRequest>,
}

/// An update transaction, sent from users to the consensus to migrate the application
Expand Down Expand Up @@ -234,9 +237,14 @@ impl TryFrom<&Block> for Vec<u8> {

fn try_from(value: &Block) -> Result<Self, Self::Error> {
let mut bytes = Vec::new();
// First 32 bytes are digest
bytes.extend_from_slice(&value.digest);
// Next 8 bytes are the subdag index
bytes.extend_from_slice(&value.sub_dag_index.to_le_bytes());
// Next 8 bytes are the number of transactions that are following
let num_txns = value.transactions.len() as u64;
bytes.extend_from_slice(&num_txns.to_le_bytes());
// the rest of the bytes are the transactions
for tx in &value.transactions {
// TODO(matthias): would be good to serialize to borrowed bytes here instead
let tx_bytes: Vec<u8> = tx.try_into()?;
Expand All @@ -253,9 +261,14 @@ impl TryFrom<Vec<u8>> for Block {

fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
let digest: [u8; 32] = value.get(0..32).context("Out of bounds")?.try_into()?;
let num_txns_bytes: [u8; 8] = value.get(32..40).context("Out of bounds")?.try_into()?;

let sub_dag_bytes = value.get(32..40).context("Out of bounds")?.try_into()?;
let sub_dag_index = u64::from_le_bytes(sub_dag_bytes);

let num_txns_bytes: [u8; 8] = value.get(40..48).context("Out of bounds")?.try_into()?;
let num_txns = u64::from_le_bytes(num_txns_bytes);
let mut pointer = 40;

let mut pointer = 48;
let mut transactions = Vec::with_capacity(num_txns as usize);
for _ in 0..num_txns {
let tx_len_bytes: [u8; 8] = value
Expand All @@ -273,8 +286,9 @@ impl TryFrom<Vec<u8>> for Block {
pointer += tx_len;
}
Ok(Block {
transactions,
digest,
sub_dag_index,
transactions,
})
}
}
Expand Down
9 changes: 9 additions & 0 deletions core/utils/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ pub trait QueryRunnerExt: SyncQueryRunnerInterface {
}
}

/// Returns the current sub dag index
fn get_sub_dag_index(&self) -> u64 {
if let Some(Value::SubDagIndex(value)) = self.get_metadata(&Metadata::SubDagIndex) {
value
} else {
0
}
}

/// Returns a full copy of the entire node-registry,
/// Paging Params - filtering nodes that are still a valid node and have enough stake; Takes
/// from starting index and specified amount.
Expand Down

0 comments on commit a957416

Please sign in to comment.