From 96ce319f4dde1c11c6276c4ca29f915ae98fd36a Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Tue, 3 Sep 2024 09:45:05 +0200 Subject: [PATCH 1/3] Handle fatal errors when indexing cow amms (#2934) # Description Currently the `autopilot` maintenance loop can get stuck when the configured cow amm helper contract does not support a found contract with this error: ``` 2024-09-03T06:13:17.420Z WARN autopilot::maintenance: failed to run background task successfully err=method 'tokens(address):(address[])' failure: contract call reverted with message: None Caused by: contract call reverted with message: None ``` So far we have been retrying this error over and over although the call to detect which tokens the pool is trading will never work if an amm is misconfigured or generally not supported by the helper contract. # Changes Differentiate between retryable and fatal errors while indexing the amms and retry or skip this pool respectively. # Testplan Set up sepolia configuration locally and checked that we don't get stuck on the unsupported pools: ``` 2024-09-03T06:24:55.880Z INFO cow_amm::cache: helper contract does not support amm cow_amm=0xe4abfda4e8c02fcafc34981dafaeb426aa4186e6 err=MethodError { signature: "tokens(address):(address[])", inner: Revert(None) } 2024-09-03T06:24:55.921Z INFO cow_amm::cache: indexed new cow amm cow_amm=0xac140f325afd20a733e12580aeb22ff9bf46982f 2024-09-03T06:24:55.956Z INFO cow_amm::cache: indexed new cow amm cow_amm=0xa54442606548bf1b627662a465a40b31b7e8e711 ``` --- crates/cow-amm/src/amm.rs | 7 ++++-- crates/cow-amm/src/cache.rs | 40 +++++++++++++++++++++++----------- crates/shared/src/arguments.rs | 2 +- 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/crates/cow-amm/src/amm.rs b/crates/cow-amm/src/amm.rs index f161bd3a9f..9bb6385d51 100644 --- a/crates/cow-amm/src/amm.rs +++ b/crates/cow-amm/src/amm.rs @@ -2,7 +2,7 @@ use { anyhow::Result, app_data::AppDataHash, contracts::CowAmmLegacyHelper, - ethcontract::{Address, Bytes, U256}, + ethcontract::{errors::MethodError, Address, Bytes, U256}, model::{ interaction::InteractionData, order::{BuyTokenDestination, OrderData, OrderKind, SellTokenSource}, @@ -18,7 +18,10 @@ pub struct Amm { } impl Amm { - pub(crate) async fn new(address: Address, helper: &CowAmmLegacyHelper) -> Result { + pub(crate) async fn new( + address: Address, + helper: &CowAmmLegacyHelper, + ) -> Result { let tradeable_tokens = helper.tokens(address).call().await?; Ok(Self { diff --git a/crates/cow-amm/src/cache.rs b/crates/cow-amm/src/cache.rs index a1cd213e4b..c0d80c8b45 100644 --- a/crates/cow-amm/src/cache.rs +++ b/crates/cow-amm/src/cache.rs @@ -1,7 +1,7 @@ use { crate::Amm, contracts::{cow_amm_legacy_helper::Event as CowAmmEvent, CowAmmLegacyHelper}, - ethcontract::Address, + ethcontract::{errors::ExecutionError, Address}, ethrpc::block_stream::RangeInclusive, shared::event_handling::EventStoring, std::{collections::BTreeMap, sync::Arc}, @@ -76,22 +76,36 @@ impl EventStoring for Storage { &mut self, events: Vec>, ) -> anyhow::Result<()> { - let cache = &mut *self.0.cache.write().await; - + let mut processed_events = Vec::with_capacity(events.len()); for event in events { - let meta = event - .meta - .as_ref() - .ok_or_else(|| anyhow::anyhow!("Event missing meta"))?; + let Some(meta) = event.meta else { + tracing::warn!(?event, "event does not contain required meta data"); + continue; + }; + let CowAmmEvent::CowammpoolCreated(cow_amm) = event.data; let cow_amm = cow_amm.amm; - - cache - .entry(meta.block_number) - .or_default() - .push(Arc::new(Amm::new(cow_amm, &self.0.helper).await?)); - tracing::info!(?cow_amm, "indexed new cow amm"); + match Amm::new(cow_amm, &self.0.helper).await { + Ok(amm) => processed_events.push((meta.block_number, Arc::new(amm))), + Err(err) if matches!(&err.inner, ExecutionError::Web3(_)) => { + // Abort completely to later try the entire block range again. + // That keeps the cache in a consistent state and avoids indexing + // the same event multiple times which would result in duplicate amms. + tracing::debug!(?cow_amm, ?err, "retryable error"); + return Err(err.into()); + } + Err(err) => { + tracing::info!(?cow_amm, ?err, "helper contract does not support amm"); + continue; + } + }; } + let cache = &mut *self.0.cache.write().await; + for (block, amm) in processed_events { + tracing::info!(cow_amm = ?amm.address(), "indexed new cow amm"); + cache.entry(block).or_default().push(amm); + } + Ok(()) } diff --git a/crates/shared/src/arguments.rs b/crates/shared/src/arguments.rs index 27e97f01dd..4b07190101 100644 --- a/crates/shared/src/arguments.rs +++ b/crates/shared/src/arguments.rs @@ -99,7 +99,7 @@ pub struct OrderQuotingArguments { logging_args_with_default_filter!( LoggingArguments, - "warn,autopilot=debug,driver=debug,orderbook=debug,solver=debug,shared=debug" + "warn,autopilot=debug,driver=debug,orderbook=debug,solver=debug,shared=debug,cow_amm=debug" ); #[derive(clap::Parser)] From 5d18eb9dc71b2543d78a3a82b4bc5201d6890cf4 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Tue, 3 Sep 2024 09:51:14 +0200 Subject: [PATCH 2/3] Move refund indexing back into background task (#2935) # Description Indexing refunds is not strictly necessary to have all relevant state for building the next auction. That's why it's okay to move that logic off of the critical path. This should resolve the start up issues we've been seeing on arbitrum staging where we have to reindex lots of blocks because the blocktime is so fast and we don't do refunds regularly. However, we should still find a better way to store the blocks we already indexed to avoid these issues in general. # Changes Index refund events in a separate background task again. --- crates/autopilot/src/maintenance.rs | 19 +------------------ crates/autopilot/src/run.rs | 9 ++++++++- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/crates/autopilot/src/maintenance.rs b/crates/autopilot/src/maintenance.rs index fc2c2ec58d..c29db67205 100644 --- a/crates/autopilot/src/maintenance.rs +++ b/crates/autopilot/src/maintenance.rs @@ -3,7 +3,6 @@ use { arguments::RunLoopMode, boundary::events::settlement::{GPv2SettlementContract, Indexer}, database::{ - ethflow_events::event_retriever::EthFlowRefundRetriever, onchain_order_events::{ ethflow_events::{EthFlowData, EthFlowDataForDb}, event_retriever::CoWSwapOnchainOrdersContract, @@ -36,8 +35,6 @@ pub struct Maintenance { settlement_indexer: EventUpdater, /// Indexes ethflow orders (orders selling native ETH). ethflow_indexer: Option, - /// Indexes refunds issued for unsettled ethflow orders. - refund_indexer: Option>, /// Used for periodic cleanup tasks to not have the DB overflow with old /// data. db_cleanup: Postgres, @@ -58,7 +55,6 @@ impl Maintenance { settlement_indexer, db_cleanup, cow_amm_indexer: Default::default(), - refund_indexer: None, ethflow_indexer: None, last_processed: Default::default(), } @@ -98,7 +94,6 @@ impl Maintenance { tokio::try_join!( self.settlement_indexer.run_maintenance(), self.db_cleanup.run_maintenance(), - self.index_refunds(), self.index_ethflow_orders(), futures::future::try_join_all( self.cow_amm_indexer @@ -113,26 +108,14 @@ impl Maintenance { /// Registers all maintenance tasks that are necessary to correctly support /// ethflow orders. - pub fn with_ethflow( - &mut self, - ethflow_indexer: EthflowIndexer, - refund_indexer: EventUpdater, - ) { + pub fn with_ethflow(&mut self, ethflow_indexer: EthflowIndexer) { self.ethflow_indexer = Some(ethflow_indexer); - self.refund_indexer = Some(refund_indexer); } pub fn with_cow_amms(&mut self, registry: &cow_amm::Registry) { self.cow_amm_indexer = registry.maintenance_tasks().clone(); } - async fn index_refunds(&self) -> Result<()> { - if let Some(indexer) = &self.refund_indexer { - return indexer.run_maintenance().await; - } - Ok(()) - } - async fn index_ethflow_orders(&self) -> Result<()> { if let Some(indexer) = &self.ethflow_indexer { return indexer.run_maintenance().await; diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 92f518156e..d3880ffd51 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -41,6 +41,7 @@ use { baseline_solver::BaseTokens, code_fetching::CachedCodeFetcher, http_client::HttpClientFactory, + maintenance::ServiceMaintenance, metrics::LivenessChecking, order_quoting::{self, OrderQuoter}, price_estimation::factory::{self, PriceEstimatorFactory}, @@ -510,7 +511,13 @@ pub async fn run(args: Arguments) { .await .expect("Should be able to initialize event updater. Database read issues?"); - maintenance.with_ethflow(onchain_order_indexer, refund_event_handler); + maintenance.with_ethflow(onchain_order_indexer); + // refunds are not critical for correctness and can therefore be indexed + // sporadically in a background task + let service_maintainer = ServiceMaintenance::new(vec![Arc::new(refund_event_handler)]); + tokio::task::spawn( + service_maintainer.run_maintenance_on_new_block(eth.current_block().clone()), + ); } let run = RunLoop { From c0a77f047ea588eee085eacf71544749bc03e9de Mon Sep 17 00:00:00 2001 From: Mateo Date: Fri, 6 Sep 2024 09:39:03 +0200 Subject: [PATCH 3/3] Speed up signature check for orders with no pre-interactions --- crates/autopilot/src/infra/persistence/mod.rs | 3 +- crates/e2e/tests/e2e/eth_safe.rs | 19 +++++----- .../src/signature_validator/simulation.rs | 37 ++++++++++++++++++- 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/crates/autopilot/src/infra/persistence/mod.rs b/crates/autopilot/src/infra/persistence/mod.rs index 0e2d826f17..80cbb1dbd2 100644 --- a/crates/autopilot/src/infra/persistence/mod.rs +++ b/crates/autopilot/src/infra/persistence/mod.rs @@ -51,9 +51,8 @@ impl Persistence { self.postgres .replace_current_auction(&auction) .await - .map(|auction_id| { + .inspect(|&auction_id| { self.archive_auction(auction_id, auction); - auction_id }) .map_err(DatabaseError) } diff --git a/crates/e2e/tests/e2e/eth_safe.rs b/crates/e2e/tests/e2e/eth_safe.rs index 516cb2e9f7..43d0e40dd1 100644 --- a/crates/e2e/tests/e2e/eth_safe.rs +++ b/crates/e2e/tests/e2e/eth_safe.rs @@ -6,11 +6,9 @@ use { ethcontract::U256, model::{ order::{OrderCreation, OrderKind, BUY_ETH_ADDRESS}, - signature::EcdsaSigningScheme, + signature::{hashed_eip712_message, Signature}, }, - secp256k1::SecretKey, shared::ethrpc::Web3, - web3::signing::SecretKeyRef, }; #[tokio::test] @@ -31,6 +29,9 @@ async fn test(web3: Web3) { .await; token.mint(trader.address(), to_wei(4)).await; + safe.exec_call(token.approve(onchain.contracts().allowance, to_wei(4))) + .await; + token.mint(safe.address(), to_wei(4)).await; tx!( trader.account(), token.approve(onchain.contracts().allowance, to_wei(4)) @@ -49,7 +50,8 @@ async fn test(web3: Web3) { .await .unwrap(); assert_eq!(balance, 0.into()); - let order = OrderCreation { + let mut order = OrderCreation { + from: Some(safe.address()), sell_token: token.address(), sell_amount: to_wei(4), buy_token: BUY_ETH_ADDRESS, @@ -59,12 +61,11 @@ async fn test(web3: Web3) { kind: OrderKind::Sell, receiver: Some(safe.address()), ..Default::default() - } - .sign( - EcdsaSigningScheme::Eip712, + }; + order.signature = Signature::Eip1271(safe.sign_message(&hashed_eip712_message( &onchain.contracts().domain_separator, - SecretKeyRef::from(&SecretKey::from_slice(trader.private_key()).unwrap()), - ); + &order.data().hash_struct(), + ))); services.create_order(&order).await.unwrap(); tracing::info!("Waiting for trade."); diff --git a/crates/shared/src/signature_validator/simulation.rs b/crates/shared/src/signature_validator/simulation.rs index 34d7dc9db9..086ff0a113 100644 --- a/crates/shared/src/signature_validator/simulation.rs +++ b/crates/shared/src/signature_validator/simulation.rs @@ -7,6 +7,7 @@ use { super::{SignatureCheck, SignatureValidating, SignatureValidationError}, crate::ethcontract_error::EthcontractErrorType, anyhow::Result, + contracts::ERC1271SignatureValidator, ethcontract::Bytes, ethrpc::Web3, futures::future, @@ -18,16 +19,46 @@ pub struct Validator { signatures: contracts::support::Signatures, settlement: H160, vault_relayer: H160, + web3: Web3, } impl Validator { + /// The result returned from `isValidSignature` if the signature is correct + const IS_VALID_SIGNATURE_MAGIC_BYTES: &'static str = "1626ba7e"; + pub fn new(web3: &Web3, settlement: H160, vault_relayer: H160) -> Self { let web3 = ethrpc::instrumented::instrument_with_label(web3, "signatureValidation".into()); Self { signatures: contracts::support::Signatures::at(&web3, settlement), settlement, vault_relayer, + web3: web3.clone(), + } + } + + /// Simulate isValidSignature for the cases in which the order does not have + /// pre-interactions + async fn simulate_without_pre_interactions( + &self, + check: &SignatureCheck, + ) -> Result<(), SignatureValidationError> { + // Since there are no interactions (no dynamic conditions / complex pre-state + // change), the order's validity can be directly determined by whether + // the signature matches the expected hash of the order data, checked + // with isValidSignature method called on the owner's contract + let contract = ERC1271SignatureValidator::at(&self.web3, check.signer); + let magic_bytes = contract + .methods() + .is_valid_signature(Bytes(check.hash), Bytes(check.signature.clone())) + .call() + .await + .map(|value| hex::encode(value.0))?; + + if magic_bytes != Self::IS_VALID_SIGNATURE_MAGIC_BYTES { + return Err(SignatureValidationError::Invalid); } + + Ok(()) } async fn simulate( @@ -73,7 +104,11 @@ impl SignatureValidating for Validator { checks: Vec, ) -> Vec> { future::join_all(checks.into_iter().map(|check| async move { - self.simulate(&check).await?; + if check.interactions.is_empty() { + self.simulate_without_pre_interactions(&check).await?; + } else { + self.simulate(&check).await?; + } Ok(()) })) .await