Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus)!: read-only resource + other fixes #1134

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl ExecutionOutput {
inputs
.iter()
.map(|(substate_req, substate)| {
let requested_specific_version = substate_req.version().is_some();
let lock_flag = if diff.down_iter().any(|(id, _)| id == substate_req.substate_id()) {
// Update all inputs that were DOWNed to be write locked
SubstateLockType::Write
Expand All @@ -64,6 +65,7 @@ impl ExecutionOutput {
VersionedSubstateIdLockIntent::new(
VersionedSubstateId::new(substate_req.substate_id().clone(), substate.version()),
lock_flag,
requested_specific_version,
)
})
.collect()
Expand All @@ -76,6 +78,7 @@ impl ExecutionOutput {
VersionedSubstateIdLockIntent::new(
VersionedSubstateId::new(substate_req.substate_id().clone(), substate.version()),
SubstateLockType::Read,
true,
)
})
.collect()
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_swarm_daemon/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use fern::FormatCallback;
pub fn init_logger() -> Result<(), log::SetLoggerError> {
fn should_skip(target: &str) -> bool {
const SKIP: [&str; 3] = ["hyper::", "h2::", "tower::"];
SKIP.iter().any(|s| target.starts_with(s))
target.is_empty() || SKIP.iter().any(|s| target.starts_with(s))
}

let colors = fern::colors::ColoredLevelConfig::new().info(fern::colors::Color::Green);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ const LOG_TARGET: &str = "tari::validator_node::mempool::service";

#[derive(Debug)]
pub struct MempoolService<TValidator> {
num_preshards: NumPreshards,
transactions: HashSet<TransactionId>,
mempool_requests: mpsc::Receiver<MempoolRequest>,
epoch_manager: EpochManagerHandle<PeerAddress>,
Expand All @@ -82,7 +81,6 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
#[cfg(feature = "metrics")] metrics: PrometheusMempoolMetrics,
) -> Self {
Self {
num_preshards,
gossip: MempoolGossip::new(num_preshards, epoch_manager.clone(), gossip),
transactions: Default::default(),
mempool_requests,
Expand Down Expand Up @@ -158,9 +156,7 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
}
info!(
target: LOG_TARGET,
"🎱 Received NEW transaction from local: {} {:?}",
transaction.id(),
transaction
"🎱 Received NEW transaction from local: {transaction}",
);

self.handle_new_transaction(transaction, vec![], None).await?;
Expand Down Expand Up @@ -200,34 +196,34 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
);

let current_epoch = self.consensus_handle.current_view().get_epoch();
let maybe_sender_shard_group = self
let maybe_sender_committee_info = self
.epoch_manager
.get_committee_info_by_validator_address(current_epoch, &from)
.await
.optional()?
.map(|c| c.shard_group());
.optional()?;

// Only input shards propagate transactions to output shards. Check that this is true.
if !unverified_output_shards.is_empty() {
let Some(sender_shard) = maybe_sender_shard_group else {
let Some(sender_committee_info) = maybe_sender_committee_info else {
debug!(target: LOG_TARGET, "Sender {from} isn't registered but tried to send a new transaction with
output shards");
return Ok(());
};

let is_input_shard = transaction
.all_inputs_iter()
.filter_map(|s| s.to_shard(self.num_preshards))
.any(|s| sender_shard.contains(&s));
let is_input_shard = transaction.is_involved_inputs(&sender_committee_info);
if !is_input_shard {
warn!(target: LOG_TARGET, "Sender {from} sent a message with output shards but was not an input
shard. Ignoring message.");
return Ok(());
}
}

self.handle_new_transaction(transaction, unverified_output_shards, maybe_sender_shard_group)
.await?;
self.handle_new_transaction(
transaction,
unverified_output_shards,
maybe_sender_committee_info.map(|c| c.shard_group()),
)
.await?;

Ok(())
}
Expand Down Expand Up @@ -276,8 +272,7 @@ where TValidator: Validator<Transaction, Context = (), Error = TransactionValida
let tx_substate_address = SubstateAddress::for_transaction_receipt(transaction.id().into_receipt_address());

let local_committee_shard = self.epoch_manager.get_local_committee_info(current_epoch).await?;
let transaction_inputs = transaction.all_inputs_iter().filter_map(|i| i.to_substate_address());
let is_input_shard = local_committee_shard.includes_any_address(transaction_inputs);
let is_input_shard = transaction.is_involved_inputs(&local_committee_shard);
let is_output_shard = local_committee_shard.includes_any_address(
// Known output shards
// This is to allow for the txreceipt output
Expand Down
3 changes: 1 addition & 2 deletions bindings/dist/types/FeeBreakdown.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { FeeSource } from "./FeeSource";
export interface FeeBreakdown {
source: FeeSource;
amount: number;
breakdown: Record<FeeSource, bigint>;
}
2 changes: 1 addition & 1 deletion bindings/dist/types/FeeCostBreakdown.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import type { Amount } from "./Amount";
import type { FeeBreakdown } from "./FeeBreakdown";
export interface FeeCostBreakdown {
total_fees_charged: Amount;
breakdown: Array<FeeBreakdown>;
breakdown: FeeBreakdown;
}
2 changes: 1 addition & 1 deletion bindings/dist/types/FeeReceipt.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import type { FeeBreakdown } from "./FeeBreakdown";
export interface FeeReceipt {
total_fee_payment: Amount;
total_fees_paid: Amount;
cost_breakdown: Array<FeeBreakdown>;
cost_breakdown: FeeBreakdown;
}
1 change: 1 addition & 0 deletions bindings/dist/types/VersionedSubstateIdLockIntent.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ import type { VersionedSubstateId } from "./VersionedSubstateId";
export interface VersionedSubstateIdLockIntent {
versioned_substate_id: VersionedSubstateId;
lock_type: SubstateLockType;
require_version: boolean;
}
3 changes: 1 addition & 2 deletions bindings/src/types/FeeBreakdown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
import type { FeeSource } from "./FeeSource";

export interface FeeBreakdown {
source: FeeSource;
amount: number;
breakdown: Record<FeeSource, bigint>;
}
2 changes: 1 addition & 1 deletion bindings/src/types/FeeCostBreakdown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import type { FeeBreakdown } from "./FeeBreakdown";

export interface FeeCostBreakdown {
total_fees_charged: Amount;
breakdown: Array<FeeBreakdown>;
breakdown: FeeBreakdown;
}
2 changes: 1 addition & 1 deletion bindings/src/types/FeeReceipt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import type { FeeBreakdown } from "./FeeBreakdown";
export interface FeeReceipt {
total_fee_payment: Amount;
total_fees_paid: Amount;
cost_breakdown: Array<FeeBreakdown>;
cost_breakdown: FeeBreakdown;
}
1 change: 1 addition & 0 deletions bindings/src/types/VersionedSubstateIdLockIntent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import type { VersionedSubstateId } from "./VersionedSubstateId";
export interface VersionedSubstateIdLockIntent {
versioned_substate_id: VersionedSubstateId;
lock_type: SubstateLockType;
require_version: boolean;
}
7 changes: 4 additions & 3 deletions dan_layer/consensus/src/hotstuff/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn calculate_dummy_blocks<TAddr: NodeAddressable, TLeaderStrategy: LeaderStr
let mut dummies = Vec::new();
with_dummy_blocks(
candidate_block.network(),
justify_block.epoch(),
candidate_block.epoch(),
justify_block.shard_group(),
candidate_block.justify(),
*justify_block.merkle_root(),
Expand Down Expand Up @@ -141,7 +141,7 @@ fn with_dummy_blocks<TAddr, TLeaderStrategy, F>(
let mut parent_block = high_qc.as_leaf_block();
let mut current_height = high_qc.block_height() + NodeHeight(1);
if current_height > new_height {
warn!(
error!(
target: LOG_TARGET,
"BUG: 🍼 no dummy blocks to calculate. current height {} is greater than new height {}",
current_height,
Expand All @@ -152,7 +152,8 @@ fn with_dummy_blocks<TAddr, TLeaderStrategy, F>(

debug!(
target: LOG_TARGET,
"🍼 calculating dummy blocks from {} to {}",
"🍼 calculating dummy blocks in epoch {} from {} to {}",
epoch,
current_height,
new_height,
);
Expand Down
11 changes: 11 additions & 0 deletions dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ use tari_dan_storage::{
TransactionAtom,
TransactionPoolRecord,
TransactionPoolStage,
TransactionRecord,
},
StateStoreReadTransaction,
};
use tari_engine_types::commit_result::RejectReason;
use tari_transaction::TransactionId;

use crate::hotstuff::{block_change_set::ProposedBlockChangeSet, error::HotStuffError, ProposalValidationError};
Expand Down Expand Up @@ -112,6 +114,15 @@ pub fn process_foreign_block<TTx: StateStoreReadTransaction>(
"⚠️ Foreign committee ABORT transaction {}. Update overall decision to ABORT. Local stage: {}, Leaf: {}",
tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf
);

// Add an abort execution since we previously decided to commit
let mut transaction = TransactionRecord::get(tx, tx_rec.transaction_id())?;
transaction.set_abort_reason(RejectReason::ForeignShardGroupDecidedToAbort(format!(
"Foreign shard group {} decided to abort the transaction",
foreign_committee_info.shard_group()
)));
let exec = transaction.into_execution().expect("ABORT set above");
proposed_block_change_set.add_transaction_execution(exec)?;
}

// We need to add the justify QC to the evidence because the all prepare block could not include it
Expand Down
72 changes: 39 additions & 33 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tari_dan_storage::{
use tari_engine_types::{commit_result::RejectReason, substate::Substate};
use tari_epoch_manager::EpochManagerReader;
use tari_transaction::TransactionId;
use tokio::task;

use crate::{
hotstuff::{
Expand Down Expand Up @@ -78,6 +79,7 @@ type NextBlock = (
HashMap<TransactionId, TransactionExecution>,
);

#[derive(Debug, Clone)]
pub struct OnPropose<TConsensusSpec: ConsensusSpec> {
config: HotstuffConfig,
store: TConsensusSpec::StateStore,
Expand Down Expand Up @@ -119,7 +121,7 @@ where TConsensusSpec: ConsensusSpec
&mut self,
epoch: Epoch,
local_committee: &Committee<TConsensusSpec::Addr>,
local_committee_info: &CommitteeInfo,
local_committee_info: CommitteeInfo,
leaf_block: LeafBlock,
is_newview_propose: bool,
propose_epoch_end: bool,
Expand Down Expand Up @@ -168,41 +170,45 @@ where TConsensusSpec: ConsensusSpec
let base_layer_block_hash = current_base_layer_block_hash;
let base_layer_block_height = current_base_layer_block_height;

let (next_block, foreign_proposals) = self.store.with_write_tx(|tx| {
let high_qc = HighQc::get(&**tx, epoch)?;
let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?;
let on_propose = self.clone();
let (next_block, foreign_proposals) = task::spawn_blocking(move || {
on_propose.store.with_write_tx(|tx| {
let high_qc = HighQc::get(&**tx, epoch)?;
let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?;

let (next_block, foreign_proposals, executed_transactions) = self.build_next_block(
tx,
epoch,
&leaf_block,
high_qc_cert,
validator.public_key,
local_committee_info,
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this
// is a good idea.
is_newview_propose,
base_layer_block_height,
base_layer_block_hash,
propose_epoch_end,
)?;
let (next_block, foreign_proposals, executed_transactions) = on_propose.build_next_block(
tx,
epoch,
&leaf_block,
high_qc_cert,
validator.public_key,
&local_committee_info,
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if
// this is a good idea.
is_newview_propose,
base_layer_block_height,
base_layer_block_hash,
propose_epoch_end,
)?;

// Add executions for this block
if !executed_transactions.is_empty() {
debug!(
target: LOG_TARGET,
"Saving {} executed transaction(s) for block {}",
executed_transactions.len(),
next_block.id()
);
}
for executed in executed_transactions.into_values() {
executed.for_block(*next_block.id()).insert_if_required(tx)?;
}
// Add executions for this block
if !executed_transactions.is_empty() {
debug!(
target: LOG_TARGET,
"Saving {} executed transaction(s) for block {}",
executed_transactions.len(),
next_block.id()
);
}
for executed in executed_transactions.into_values() {
executed.for_block(*next_block.id()).insert_if_required(tx)?;
}

next_block.as_last_proposed().set(tx)?;
Ok::<_, HotStuffError>((next_block, foreign_proposals))
})?;
next_block.as_last_proposed().set(tx)?;
Ok::<_, HotStuffError>((next_block, foreign_proposals))
})
})
.await??;

info!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,8 @@ where TConsensusSpec: ConsensusSpec
let execution = self.execute_transaction(tx, block.id(), block.epoch(), tx_rec.transaction_id())?;
let mut execution = execution.into_transaction_execution();

// TODO: check the diff is valid against the provided input evidence (correct locks etc).

// TODO: can we modify the locks at this point? For multi-shard input transactions, we locked all inputs
// as Write due to lack of information. We now know what locks are necessary, and this
// block has the correct evidence (TODO: verify the atom) so this should be fine.
Expand Down
Loading
Loading