Skip to content

Commit

Permalink
Handle ethflow_refunds empty table case (#2931)
Browse files Browse the repository at this point in the history
# Description
#2906 moved refund indexer
maintenance into the solvable orders cache update loop. When there are
no ethflow refunds in the DB or the last refund is very old, the refund
indexer tries to fetch all the blocks, which might execute for more than
10 minutes, resulting in autopilot restarts and the whole protocol being
stuck.

# Changes

Change the behavior of calculating the starting index for refund
indexer. If a recent eth-flow refund exists within the last day, it
prioritizes this. Otherwise, it falls back to the most recent block from
broadcasted orders. Then select the highest value among the value
calculated in the previous step, settlement contract block and
configured `ethflow_indexing_start`.

## How to test
Staging.
<img width="598" alt="image"
src="https://github.com/user-attachments/assets/fed96a1b-2523-4ca5-9fb4-767b8c87e536">

(cherry picked from commit 68815a3)
  • Loading branch information
squadgazzz committed Sep 2, 2024
1 parent d3e8690 commit d67a79f
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 5 deletions.
105 changes: 104 additions & 1 deletion crates/autopilot/src/database/onchain_order_events/ethflow_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
super::{OnchainOrderCustomData, OnchainOrderParsing},
crate::database::events::meta_to_event_index,
anyhow::{anyhow, Context, Result},
chrono::Duration,
contracts::cowswap_onchain_orders::{
event_data::OrderPlacement as ContractOrderPlacement,
Event as ContractEvent,
Expand All @@ -16,10 +17,11 @@ use {
},
ethcontract::Event as EthContractEvent,
ethrpc::{
block_stream::{block_number_to_block_number_hash, BlockNumberHash},
block_stream::{block_by_number, block_number_to_block_number_hash, BlockNumberHash},
Web3,
},
hex_literal::hex,
model::time::now_in_epoch_seconds,
shared::contracts::settlement_deployment_block_number_hash,
sqlx::types::BigDecimal,
std::{collections::HashMap, convert::TryInto},
Expand Down Expand Up @@ -167,6 +169,107 @@ pub async fn determine_ethflow_indexing_start(
})
}

/// Determines the starting block number and hash for indexing eth-flow refund
/// events.
///
/// This function computes the most appropriate starting block by evaluating
/// several potential sources:
/// 1. If `skip_event_sync_start` is provided, it uses this value directly and
/// returns early.
/// 2. Otherwise, it evaluates optional start blocks provided by
/// `ethflow_indexing_start`, the last known block processed by the database,
/// and a block determined by the chain's settlement deployment.
/// 3. The function selects the block with the highest number among these
/// sources.
///
/// # Panics
/// Note that this function is expected to be used at the start of the services
/// and will panic if it cannot retrieve the information it needs.
pub async fn determine_ethflow_refund_indexing_start(
skip_event_sync_start: &Option<BlockNumberHash>,
ethflow_indexing_start: Option<u64>,
web3: &Web3,
chain_id: u64,
db: crate::database::Postgres,
) -> BlockNumberHash {
if let Some(block_number_hash) = skip_event_sync_start {
return *block_number_hash;
}

let start_block = match ethflow_indexing_start {
Some(start_block) => Some(
block_number_to_block_number_hash(web3, start_block.into())
.await
.expect("Should be able to find block at specified indexing start"),
),
None => None,
};
let last_db_ethflow_block = last_db_ethflow_block(web3, db).await;
let settlement_block = settlement_deployment_block_number_hash(web3, chain_id)
.await
.unwrap_or_else(|err| {
panic!("Should be able to find settlement deployment block. Error: {err}")
});

vec![start_block, last_db_ethflow_block, Some(settlement_block)]
.into_iter()
.flatten()
.max_by_key(|(block_number, _)| *block_number)
.expect("Should be able to find a valid start block")
}

/// This function attempts to find the latest block that has processed eth-flow
/// orders or broadcasted orders. If a recent eth-flow refund exists within the
/// last day, it prioritizes this. Otherwise, it falls back to the most recent
/// block from broadcasted orders.
///
/// # Panics
/// Note that this function is expected to be used at the start of the services
/// and will panic if it cannot retrieve the information it needs.
async fn last_db_ethflow_block(
web3: &Web3,
db: crate::database::Postgres,
) -> Option<BlockNumberHash> {
let mut ex = db
.pool
.acquire()
.await
.expect("Should be able to acquire connection");
let last_refund_block_number = database::ethflow_orders::last_indexed_block(&mut ex)
.await
.expect("Should be able to find last indexed block for ethflow orders")
.unwrap_or_default() as u64;

if last_refund_block_number > 0 {
let last_refund_block = block_by_number(web3, last_refund_block_number.into())
.await
.expect("Should be able to find last refund block");

if last_refund_block.timestamp.as_u64()
> (now_in_epoch_seconds() as u64) - (Duration::days(1).num_seconds() as u64)
{
return Some((
last_refund_block_number,
last_refund_block.hash.expect("Should have hash"),
));
}
}

let last_order_block_number = database::onchain_broadcasted_orders::last_block(&mut ex)
.await
.expect("Should be able to find last onchain broadcasted order block")
as u64;

if last_order_block_number > 0 {
return Some(
block_number_to_block_number_hash(web3, last_order_block_number.into())
.await
.expect("Should be able to find last order block"),
);
}
None
}

#[cfg(test)]
mod test {
use {
Expand Down
21 changes: 17 additions & 4 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use {
database::{
ethflow_events::event_retriever::EthFlowRefundRetriever,
onchain_order_events::{
ethflow_events::{determine_ethflow_indexing_start, EthFlowOnchainOrderParser},
ethflow_events::{
determine_ethflow_indexing_start,
determine_ethflow_refund_indexing_start,
EthFlowOnchainOrderParser,
},
event_retriever::CoWSwapOnchainOrdersContract,
OnchainOrderParser,
},
Expand Down Expand Up @@ -457,11 +461,12 @@ pub async fn run(args: Arguments) {
maintenance.with_cow_amms(&cow_amm_registry);

if let Some(ethflow_contract) = args.ethflow_contract {
let start_block = determine_ethflow_indexing_start(
let ethflow_refund_start_block = determine_ethflow_refund_indexing_start(
&skip_event_sync_start,
args.ethflow_indexing_start,
&web3,
chain_id,
db.clone(),
)
.await;

Expand All @@ -471,7 +476,7 @@ pub async fn run(args: Arguments) {
EthFlowRefundRetriever::new(web3.clone(), ethflow_contract),
db.clone(),
block_retriever.clone(),
start_block,
ethflow_refund_start_block,
)
.await
.unwrap();
Expand All @@ -486,13 +491,21 @@ pub async fn run(args: Arguments) {
eth.contracts().settlement().address(),
);

let ethflow_start_block = determine_ethflow_indexing_start(
&skip_event_sync_start,
args.ethflow_indexing_start,
&web3,
chain_id,
)
.await;

let onchain_order_indexer = EventUpdater::new_skip_blocks_before(
// The events from the ethflow contract are read with the more generic contract
// interface called CoWSwapOnchainOrders.
CoWSwapOnchainOrdersContract::new(web3.clone(), ethflow_contract),
onchain_order_event_parser,
block_retriever,
start_block,
ethflow_start_block,
)
.await
.expect("Should be able to initialize event updater. Database read issues?");
Expand Down
8 changes: 8 additions & 0 deletions crates/ethrpc/src/block_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ pub async fn block_number_to_block_number_hash(
})
}

pub async fn block_by_number(web3: &Web3, block_number: BlockNumber) -> Option<Block<H256>> {
web3.eth()
.block(BlockId::Number(block_number))
.await
.ok()
.flatten()
}

#[derive(prometheus_metric_storage::MetricStorage)]
pub struct Metrics {
/// How much a new block number differs from the current block number.
Expand Down

0 comments on commit d67a79f

Please sign in to comment.