diff --git a/Cargo.lock b/Cargo.lock index b73be93223..ae73c17a6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10140,6 +10140,7 @@ dependencies = [ "rooch-integration-test-runner", "rooch-key", "rooch-oracle", + "rooch-pipeline-processor", "rooch-rpc-api", "rooch-rpc-client", "rooch-rpc-server", diff --git a/crates/rooch-pipeline-processor/src/actor/processor.rs b/crates/rooch-pipeline-processor/src/actor/processor.rs index b9d893a498..a48dfc7c87 100644 --- a/crates/rooch-pipeline-processor/src/actor/processor.rs +++ b/crates/rooch-pipeline-processor/src/actor/processor.rs @@ -114,6 +114,7 @@ impl PipelineProcessorActor { .get_transaction_by_hash(tx_hash) .await? .ok_or_else(|| anyhow::anyhow!("The tx with hash {} should exists", tx_hash))?; + let tx_order = ledger_tx.sequence_info.tx_order; match &ledger_tx.data { LedgerTxData::L1Block(block) => { debug!("process_sequenced_tx_on_startup l1_block_tx: {:?}", block); @@ -124,14 +125,11 @@ impl PipelineProcessorActor { bitcoin::block::BlockHash::from_slice(&block_hash_vec)?; let btc_block = bitcoin_client_proxy.get_block(block_hash).await?; let block_body = BitcoinBlock::from(btc_block); - let moveos_tx = self - .executor - .validate_l1_block(L1BlockWithBody::new( - block.clone(), - block_body.encode(), - )) - .await?; - self.execute_tx(ledger_tx.clone(), moveos_tx).await?; + self.execute_l1_block(L1BlockWithBody::new( + block.clone(), + block_body.encode(), + )) + .await?; } None => { return Err(anyhow::anyhow!( @@ -142,13 +140,23 @@ impl PipelineProcessorActor { } LedgerTxData::L1Tx(l1_tx) => { debug!("process_sequenced_tx_on_startup l1_tx: {:?}", l1_tx); - let moveos_tx = self.executor.validate_l1_tx(l1_tx.clone()).await?; - self.execute_tx(ledger_tx.clone(), moveos_tx).await?; + self.execute_l1_tx(l1_tx.clone()).await?; } LedgerTxData::L2Tx(l2_tx) => { debug!("process_sequenced_tx_on_startup l2_tx: {:?}", l2_tx); - let moveos_tx = self.executor.validate_l2_tx(l2_tx.clone()).await?; - self.execute_tx(ledger_tx.clone(), moveos_tx).await?; + + match self.execute_l2_tx(l2_tx.clone()).await { + Ok(_v) => {} + Err(err) => { + if is_vm_panic_error(&err) { + tracing::error!( + "Execute L2 Tx failed while VM panic occurred in process_sequenced_tx_on_startup. error: {:?}; tx_order: {}, tx_hash {:?}", + err, tx_order, tx_hash + ); + return Err(err); + } + } + }; } } } @@ -418,7 +426,7 @@ impl Handler for PipelineProcessorActor { } } -fn is_vm_panic_error(error: &Error) -> bool { +pub fn is_vm_panic_error(error: &Error) -> bool { if let Some(vm_error) = error.downcast_ref::() { match vm_error { VMPanicError::VerifierPanicError(_) | VMPanicError::SystemCallPanicError(_) => true, diff --git a/crates/rooch/Cargo.toml b/crates/rooch/Cargo.toml index fa344e2cd5..7345e0ef05 100644 --- a/crates/rooch/Cargo.toml +++ b/crates/rooch/Cargo.toml @@ -98,6 +98,7 @@ rooch-integration-test-runner = { workspace = true } rooch-indexer = { workspace = true } rooch-event = { workspace = true } rooch-db = { workspace = true } +rooch-pipeline-processor = { workspace = true } rooch-common = { workspace = true } rooch-store = { workspace = true } rooch-faucet = { workspace = true } diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index 25e0902561..a2db9dcbb7 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -25,6 +25,7 @@ use rooch_event::actor::EventActor; use rooch_executor::actor::executor::ExecutorActor; use rooch_executor::actor::reader_executor::ReaderExecutorActor; use rooch_executor::proxy::ExecutorProxy; +use rooch_pipeline_processor::actor::processor::is_vm_panic_error; use rooch_types::bitcoin::types::Block as BitcoinBlock; use rooch_types::error::RoochResult; use rooch_types::rooch_network::RoochChainID; @@ -438,10 +439,43 @@ impl ExecInner { ledger_tx, l1_block_with_body, } = msg; + let is_l2_tx = ledger_tx.data.is_l2_tx(); let moveos_tx = self .validate_ledger_transaction(ledger_tx, l1_block_with_body) .await?; - self.execute_moveos_tx(tx_order, moveos_tx).await + if let Err(err) = self.execute_moveos_tx(tx_order, moveos_tx).await { + self.handle_execution_error(err, is_l2_tx, tx_order)?; + } + + Ok(()) + } + + fn handle_execution_error( + &self, + error: anyhow::Error, + is_l2_tx: bool, + tx_order: u64, + ) -> anyhow::Result<()> { + if is_l2_tx { + return if is_vm_panic_error(&error) { + tracing::error!( + "Execute L2 Tx failed while VM panic occurred, error: {:?}; tx_order: {}", + error, + tx_order + ); + Err(error) + } else { + tracing::warn!( + "L2 Tx execution failed with a non-VM panic error. Ignoring and returning Ok; tx_order: {}, error: {:?}", + tx_order, + error + ); + Ok(()) // Gracefully handle non-VM panic L2Tx errors. + }; + } + + // Default error handling for non-L2Tx transactions and other cases. + Err(error) } async fn validate_ledger_transaction( @@ -449,15 +483,28 @@ impl ExecInner { ledger_tx: LedgerTransaction, l1block_with_body: Option, ) -> anyhow::Result { - let mut moveos_tx = match &ledger_tx.data { + let moveos_tx_result = match &ledger_tx.data { LedgerTxData::L1Block(_block) => { self.executor .validate_l1_block(l1block_with_body.unwrap()) - .await? + .await } - LedgerTxData::L1Tx(l1_tx) => self.executor.validate_l1_tx(l1_tx.clone()).await?, - LedgerTxData::L2Tx(l2_tx) => self.executor.validate_l2_tx(l2_tx.clone()).await?, + LedgerTxData::L1Tx(l1_tx) => self.executor.validate_l1_tx(l1_tx.clone()).await, + LedgerTxData::L2Tx(l2_tx) => self.executor.validate_l2_tx(l2_tx.clone()).await, }; + + let mut moveos_tx = match moveos_tx_result { + Ok(tx) => tx, + Err(err) => { + tracing::error!( + "Error validating transaction: tx_order: {}, error: {:?}", + ledger_tx.sequence_info.tx_order, + err + ); + return Err(err); + } + }; + moveos_tx.ctx.add(ledger_tx.sequence_info.clone())?; Ok(moveos_tx) }