Skip to content

Commit

Permalink
fix(mempool): add transaction to state store before execution (#625)
Browse files Browse the repository at this point in the history
Description
---
fix(mempool): add transaction to state store before execution
feat(sqlite state store): add executed_time_ms to transactions

Motivation and Context
---
Add the transaction to the state store before execution to allow missing
transactions to check if a transaction exists but is still executing.

How Has This Been Tested?
---
Existing cucumber tests, manually

What process can a PR reviewer use to test or verify this change?
---
Check that missing transaction does not request transactions that are
busy executing

Breaking Changes
---

- [ ] None
- [x] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Jul 24, 2023
1 parent 6b1036c commit fd44289
Show file tree
Hide file tree
Showing 23 changed files with 398 additions and 217 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
Expand Down Expand Up @@ -55,6 +55,7 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
state_store: MemoryStateStore,
consensus_context: ConsensusContext,
) -> Result<ExecutedTransaction, Self::Error> {
let timer = Instant::now();
// Include ownership token for the signers of this in the auth scope
let owner_token = get_auth_token(transaction.signer_public_key());
let auth_params = AuthParams {
Expand Down Expand Up @@ -82,7 +83,7 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
},
};

Ok(ExecutedTransaction::new(transaction, result))
Ok(ExecutedTransaction::new(transaction, result, timer.elapsed()))
}
}

Expand Down
6 changes: 3 additions & 3 deletions applications/tari_validator_node/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tari_crypto::tari_utilities::hex::Hex;
use tari_dan_app_utilities::template_manager::interface::TemplateManagerHandle;
use tari_dan_common_types::{optional::Optional, ShardId};
use tari_dan_storage::{
consensus_models::{ExecutedTransaction, QuorumDecision, SubstateRecord},
consensus_models::{ExecutedTransaction, QuorumDecision, SubstateRecord, TransactionRecord},
Ordering,
StateStore,
};
Expand Down Expand Up @@ -241,10 +241,10 @@ impl JsonRpcHandlers {
pub async fn get_recent_transactions(&self, value: JsonRpcExtractor) -> JrpcResult {
let answer_id = value.get_answer_id();
let mut tx = self.state_store.create_read_tx().unwrap();
match ExecutedTransaction::get_paginated(&mut tx, 1000, 0, Some(Ordering::Descending)) {
match TransactionRecord::get_paginated(&mut tx, 1000, 0, Some(Ordering::Descending)) {
Ok(recent_transactions) => {
let res = GetRecentTransactionsResponse {
transactions: recent_transactions.into_iter().map(|t| t.into_transaction()).collect(),
transactions: recent_transactions.into_iter().map(|t| t.transaction).collect(),
};
Ok(JsonRpcResponse::success(answer_id, res))
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::time::{Duration, Instant};

use log::*;
use tari_dan_app_utilities::transaction_executor::{TransactionExecutor, TransactionProcessorError};
use tari_dan_engine::{
Expand All @@ -21,7 +19,7 @@ use crate::{

const LOG_TARGET: &str = "tari::dan::mempool::executor";

pub(super) type ExecutionResult = (TransactionId, Duration, Result<ExecutedTransaction, MempoolError>);
pub(super) type ExecutionResult = (TransactionId, Result<ExecutedTransaction, MempoolError>);

pub async fn execute_transaction<TSubstateResolver, TExecutor>(
transaction: Transaction,
Expand All @@ -35,13 +33,12 @@ where
{
let mut state_db = new_state_db();

let timer = Instant::now();
match substate_resolver.resolve(&transaction, &mut state_db).await {
Ok(()) => {
let res = task::spawn_blocking(move || {
let id = *transaction.id();
let result = executor.execute(transaction, state_db, consensus_context);
(id, timer.elapsed(), result.map_err(MempoolError::from))
(id, result.map_err(MempoolError::from))
})
.await;

Expand All @@ -52,7 +49,7 @@ where
Err(err @ SubstateResolverError::InputSubstateDowned { .. }) |
Err(err @ SubstateResolverError::InputSubstateDoesNotExist { .. }) => {
warn!(target: LOG_TARGET, "One or more invalid input shards for transaction {}: {}", transaction.id(), err);
Ok((*transaction.id(), Duration::default(), Err(err.into())))
Ok((*transaction.id(), Err(err.into())))
},
// Some other issue - network, db, etc
Err(err) => Err(err.into()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ use tari_dan_app_utilities::transaction_executor::{TransactionExecutor, Transact
use tari_dan_common_types::{Epoch, ShardId};
use tari_dan_engine::runtime::ConsensusContext;
use tari_dan_p2p::{DanMessage, OutboundService};
use tari_dan_storage::{consensus_models::ExecutedTransaction, StateStore};
use tari_dan_storage::{
consensus_models::{ExecutedTransaction, TransactionRecord},
StateStore,
StateStoreWriteTransaction,
};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_state_store_sqlite::SqliteStateStore;
use tari_transaction::{Transaction, TransactionId};
Expand Down Expand Up @@ -160,22 +164,24 @@ where
return Ok(());
}

let transaction_processed = self.state_store.with_read_tx(|tx| {
let exists = ExecutedTransaction::exists(tx, transaction.id())?;
Ok::<_, MempoolError>(exists)
})?;
let transaction_exists = self
.state_store
.with_read_tx(|tx| TransactionRecord::exists(tx, transaction.id()))?;

if transaction_processed {
if transaction_exists {
info!(
target: LOG_TARGET,
"🎱 Transaction {} already processed. Ignoring",
"🎱 Transaction {} already exists. Ignoring",
transaction.id()
);
return Ok(());
}

self.validator.validate(&transaction).await?;

self.state_store
.with_write_tx(|tx| tx.transactions_insert(&transaction))?;

if transaction.num_involved_shards() == 0 {
warn!(target: LOG_TARGET, "⚠ No involved shards for payload");
}
Expand Down Expand Up @@ -227,7 +233,7 @@ where
result: Result<ExecutionResult, MempoolError>,
) -> Result<(), MempoolError> {
// This is due to a bug or possibly db failure only
let (transaction_id, time_taken, exec_result) = result?;
let (transaction_id, exec_result) = result?;

self.transactions.remove(&transaction_id);
let executed = match exec_result {
Expand All @@ -237,7 +243,7 @@ where
"✅ Transaction {} executed successfully ({}) in {:?}",
executed.transaction().id(),
executed.result().finalize.result,
time_taken
executed.execution_time()
);
// We refuse to process the transaction if any input_refs are downed
self.check_input_refs(&executed)?;
Expand All @@ -253,6 +259,7 @@ where
transaction_id,
e.to_string()
);

return Ok(());
},
};
Expand Down
3 changes: 3 additions & 0 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tari_dan_storage::{
};
use tari_epoch_manager::EpochManagerError;
use tari_mmr::BalancedBinaryMerkleProofError;
use tari_transaction::TransactionId;

#[derive(Debug, thiserror::Error)]
pub enum HotStuffError {
Expand Down Expand Up @@ -40,6 +41,8 @@ pub enum HotStuffError {
InvalidVoteSignature { signer_public_key: PublicKey },
#[error("Transaction pool error: {0}")]
TransactionPoolError(#[from] TransactionPoolError),
#[error("Transaction {transaction_id} does not exist")]
TransactionDoesNotExist { transaction_id: TransactionId },
#[error("Received vote for unknown block {block_id} from {sent_by}")]
ReceivedVoteForUnknownBlock { block_id: BlockId, sent_by: String },
}
Expand Down
3 changes: 2 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_receive_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tari_dan_storage::{
SubstateRecord,
TransactionPool,
TransactionPoolStage,
TransactionRecord,
},
StateStore,
StateStoreReadTransaction,
Expand Down Expand Up @@ -130,7 +131,7 @@ where TConsensusSpec: ConsensusSpec
let mut missing_tx_ids = Vec::new();
self.store.with_read_tx(|tx| {
for tx_id in block.all_transaction_ids() {
if !ExecutedTransaction::exists(tx, tx_id)? {
if !TransactionRecord::exists(tx, tx_id)? {
missing_tx_ids.push(*tx_id);
}
}
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ where
executed.transaction().id()
);
self.state_store.with_write_tx(|tx| {
executed.insert(tx)?;
executed.upsert(tx)?;

let decision = if executed.result().finalize.is_accept() {
Decision::Commit
Expand Down
134 changes: 44 additions & 90 deletions dan_layer/consensus_tests/src/support/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::iter;
use std::{iter, time::Duration};

use tari_common_types::types::PrivateKey;
use tari_dan_storage::consensus_models::{Decision, ExecutedTransaction};
Expand All @@ -17,40 +17,43 @@ use crate::support::helpers::random_shard_in_bucket;

pub fn build_transaction_from(tx: Transaction, decision: Decision, fee: u64) -> ExecutedTransaction {
let tx_id = *tx.id();
ExecutedTransaction::new(tx, ExecuteResult {
finalize: FinalizeResult::new(
tx_id.into_array().into(),
vec![],
vec![],
if decision.is_commit() {
TransactionResult::Accept(SubstateDiff::new())
} else {
TransactionResult::Reject(RejectReason::ExecutionFailure("Test failure".to_string()))
},
FeeCostBreakdown {
total_fees_charged: fee.try_into().unwrap(),
breakdown: vec![],
},
),
transaction_failure: None,
fee_receipt: Some(FeeReceipt {
total_fee_payment: fee.try_into().unwrap(),
fee_resource: ResourceContainer::Confidential {
address: "resource_0000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
commitments: Default::default(),
revealed_amount: fee.try_into().unwrap(),
},
cost_breakdown: vec![],
}),
})
ExecutedTransaction::new(
tx,
ExecuteResult {
finalize: FinalizeResult::new(
tx_id.into_array().into(),
vec![],
vec![],
if decision.is_commit() {
TransactionResult::Accept(SubstateDiff::new())
} else {
TransactionResult::Reject(RejectReason::ExecutionFailure("Test failure".to_string()))
},
FeeCostBreakdown {
total_fees_charged: fee.try_into().unwrap(),
breakdown: vec![],
},
),
transaction_failure: None,
fee_receipt: Some(FeeReceipt {
total_fee_payment: fee.try_into().unwrap(),
fee_resource: ResourceContainer::Confidential {
address: "resource_0000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
commitments: Default::default(),
revealed_amount: fee.try_into().unwrap(),
},
cost_breakdown: vec![],
}),
},
Duration::from_secs(0),
)
}

pub fn build_transaction(decision: Decision, fee: u64, num_shards: usize, num_committees: u32) -> ExecutedTransaction {
let k = PrivateKey::default();

let mut tx = tari_transaction::Transaction::builder().sign(&k).build();
let mut tx = Transaction::builder().sign(&k).build();
for bucket in 0..num_committees {
// We fill these outputs so that the test VNs dont have to have any UP substates
// Equal potion of shards to each committee
Expand All @@ -60,66 +63,17 @@ pub fn build_transaction(decision: Decision, fee: u64, num_shards: usize, num_co
);
}

let tx_id = *tx.id();
ExecutedTransaction::new(tx, ExecuteResult {
finalize: FinalizeResult::new(
tx_id.into_array().into(),
vec![],
vec![],
if decision.is_commit() {
TransactionResult::Accept(SubstateDiff::new())
} else {
TransactionResult::Reject(RejectReason::ExecutionFailure("Test failure".to_string()))
},
FeeCostBreakdown {
total_fees_charged: fee.try_into().unwrap(),
breakdown: vec![],
},
),
transaction_failure: None,
fee_receipt: Some(FeeReceipt {
total_fee_payment: fee.try_into().unwrap(),
fee_resource: ResourceContainer::Confidential {
address: "resource_0000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
commitments: Default::default(),
revealed_amount: fee.try_into().unwrap(),
},
cost_breakdown: vec![],
}),
})
build_transaction_from(tx, decision, fee)
}

pub fn change_decision(tx: ExecutedTransaction, new_decision: Decision) -> ExecutedTransaction {
let total_fees_charged = tx.result().fee_receipt.as_ref().unwrap().total_fee_payment;
let tx_id = *tx.transaction().id();
ExecutedTransaction::new(tx.into_transaction(), ExecuteResult {
finalize: FinalizeResult::new(
tx_id.into_array().into(),
vec![],
vec![],
if new_decision.is_commit() {
TransactionResult::Accept(SubstateDiff::new())
} else {
TransactionResult::Reject(RejectReason::ExecutionFailure("Test failure".to_string()))
},
FeeCostBreakdown {
total_fees_charged,
breakdown: vec![],
},
),
transaction_failure: None,
fee_receipt: Some(FeeReceipt {
total_fee_payment: total_fees_charged,
fee_resource: ResourceContainer::Confidential {
address: "resource_0000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
commitments: Default::default(),
revealed_amount: total_fees_charged,
},
cost_breakdown: vec![],
}),
})
let total_fees_charged = tx
.result()
.fee_receipt
.as_ref()
.unwrap()
.total_fee_payment
.as_u64_checked()
.unwrap();
build_transaction_from(tx.into_transaction(), new_decision, total_fees_charged)
}
2 changes: 1 addition & 1 deletion dan_layer/indexer_lib/src/substate_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ where

error!(
target: LOG_TARGET,
"Could not get substate {} from any of the validator nodes", shard,
"Could not get substate for shard {} from any of the validator nodes", shard,
);

if let Some(e) = last_error {
Expand Down
Loading

0 comments on commit fd44289

Please sign in to comment.