Skip to content

Commit

Permalink
Maybe a bit of a simplification for the incremental cache (#2950)
Browse files Browse the repository at this point in the history
# Description
My attempt to hopefully simplify one function of
#2923 slightly.

# Changes
Mostly tried to condense the code a bit and make timestamp handling
easier to understand.
I believe the only logical change was that I initialize the timestamp
with a non-zero value on the first iteration. However, IIUC since we now
use the incremental cache every time when the cache is `Some` this
should cancel each other out making the whole thing a bit easier to
grasp IMO.

## How to test
e2e tests still pass (tests ran fine locally but 1 test fails in CI,
will investigate to see if it's the PR or a flaky test in general :/)
@squadgazzz could you give this a sanity check that it's actually not
breaking any logic?
  • Loading branch information
MartinquaXD authored Sep 6, 2024
1 parent 2678178 commit 990ce6c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 71 deletions.
1 change: 1 addition & 0 deletions crates/autopilot/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ pub struct SolvableOrders {
pub orders: HashMap<domain::OrderUid, model::order::Order>,
pub quotes: HashMap<domain::OrderUid, domain::Quote>,
pub latest_settlement_block: u64,
pub fetched_from_db: chrono::DateTime<chrono::Utc>,
}
2 changes: 2 additions & 0 deletions crates/autopilot/src/database/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl Postgres {
.with_label_values(&["solvable_orders"])
.start_timer();

let start = chrono::offset::Utc::now();
let mut ex = self.pool.begin().await?;
// Set the transaction isolation level to REPEATABLE READ
// so the both SELECT queries below are executed in the same database snapshot
Expand All @@ -92,6 +93,7 @@ impl Postgres {
orders,
quotes,
latest_settlement_block,
fetched_from_db: start,
})
}

Expand Down
8 changes: 6 additions & 2 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ impl Persistence {
pub async fn all_solvable_orders(
&self,
min_valid_to: u32,
) -> Result<boundary::SolvableOrders, DatabaseError> {
) -> anyhow::Result<boundary::SolvableOrders> {
self.postgres
.all_solvable_orders(min_valid_to)
.await
.map_err(DatabaseError)
.context("failed to fetch all solvable orders")
}

/// Saves the given auction to storage for debugging purposes.
Expand Down Expand Up @@ -411,6 +411,7 @@ impl Persistence {
min_valid_to: u32,
) -> anyhow::Result<boundary::SolvableOrders> {
let after_block = i64::try_from(after_block).context("block number value exceeds i64")?;
let started_at = chrono::offset::Utc::now();
let mut tx = self.postgres.pool.begin().await.context("begin")?;
// Set the transaction isolation level to REPEATABLE READ
// so all the SELECT queries below are executed in the same database snapshot
Expand Down Expand Up @@ -492,6 +493,7 @@ impl Persistence {
updated_quotes,
latest_settlement_block,
min_valid_to,
started_at,
)
}

Expand All @@ -501,6 +503,7 @@ impl Persistence {
mut next_quotes: HashMap<domain::OrderUid, domain::Quote>,
latest_settlement_block: u64,
min_valid_to: u32,
started_at: chrono::DateTime<chrono::Utc>,
) -> anyhow::Result<boundary::SolvableOrders> {
// Blindly insert all new orders into the cache.
for (uid, order) in next_orders {
Expand Down Expand Up @@ -545,6 +548,7 @@ impl Persistence {
orders: current_orders,
quotes: next_quotes,
latest_settlement_block,
fetched_from_db: started_at,
})
}

Expand Down
93 changes: 24 additions & 69 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use {
},
anyhow::{Context, Result},
bigdecimal::BigDecimal,
chrono::{DateTime, Utc},
database::order_events::OrderEventLabel,
ethrpc::block_stream::CurrentBlockWatcher,
futures::future::join_all,
futures::{future::join_all, FutureExt},
indexmap::IndexSet,
itertools::{Either, Itertools},
model::{
Expand Down Expand Up @@ -108,7 +107,6 @@ type Balances = HashMap<Query, U256>;
struct Inner {
auction: domain::Auction,
solvable_orders: boundary::SolvableOrders,
last_order_creation_timestamp: DateTime<Utc>,
}

impl SolvableOrdersCache {
Expand Down Expand Up @@ -174,7 +172,7 @@ impl SolvableOrdersCache {
pub async fn update(&self, block: u64) -> Result<()> {
let start = Instant::now();

let (db_solvable_orders, latest_creation_timestamp) = self.get_solvable_orders().await?;
let db_solvable_orders = self.get_solvable_orders().await?;

let orders = db_solvable_orders
.orders
Expand Down Expand Up @@ -306,7 +304,6 @@ impl SolvableOrdersCache {
*self.cache.lock().await = Some(Inner {
auction,
solvable_orders: db_solvable_orders,
last_order_creation_timestamp: latest_creation_timestamp,
});

tracing::debug!(%block, "updated current auction cache");
Expand Down Expand Up @@ -340,72 +337,30 @@ impl SolvableOrdersCache {
.collect()
}

/// Returns current solvable orders along with the latest order creation
/// timestamp.
async fn get_solvable_orders(&self) -> Result<(SolvableOrders, DateTime<Utc>)> {
const INITIAL_ORDER_CREATION_TIMESTAMP: DateTime<Utc> = DateTime::<Utc>::MIN_UTC;

// A new auction should be created regardless of whether new solvable orders are
// found. The incremental solvable orders cache updater should only be
// enabled after the initial full SQL query
// (`persistence::all_solvable_orders`) returned some orders. Until then,
// `MIN_UTC` is used to indicate that no orders have been found yet by
// (`persistence::all_solvable_orders`). This prevents situations where
// starting the service with a large existing DB would cause
// the incremental query to load all unfiltered orders into memory, potentially
// leading to OOM issues because incremental query doesn't filter out
// expired/invalid orders in the SQL query and basically can return the whole
// table when filters with default values are used.
let (db_solvable_orders, previous_creation_timestamp) = {
let cache_data = {
let lock = self.cache.lock().await;
match &*lock {
Some(cache)
if cache.last_order_creation_timestamp
> INITIAL_ORDER_CREATION_TIMESTAMP =>
{
Some((
cache.solvable_orders.orders.clone(),
cache.last_order_creation_timestamp,
cache.solvable_orders.latest_settlement_block,
))
}
_ => None,
}
};

let min_valid_to = now_in_epoch_seconds()
+ u32::try_from(self.min_order_validity_period.as_secs())
.context("min_order_validity_period is not u32")?;
match cache_data {
Some((current_orders, last_order_creation_timestamp, latest_settlement_block)) => (
self.persistence
.solvable_orders_after(
current_orders,
last_order_creation_timestamp,
latest_settlement_block,
min_valid_to,
)
.await?,
last_order_creation_timestamp,
),
None => (
self.persistence.all_solvable_orders(min_valid_to).await?,
INITIAL_ORDER_CREATION_TIMESTAMP,
),
}
/// Returns currently solvable orders.
async fn get_solvable_orders(&self) -> Result<SolvableOrders> {
let min_valid_to = now_in_epoch_seconds()
+ u32::try_from(self.min_order_validity_period.as_secs())
.context("min_order_validity_period is not u32")?;

// only build future while holding the lock but execute outside of lock
let lock = self.cache.lock().await;
let fetch_orders = match &*lock {
// Only use incremental query after cache already got initialized
// because it's not optimized for very long durations.
Some(cache) => self
.persistence
.solvable_orders_after(
cache.solvable_orders.orders.clone(),
cache.solvable_orders.fetched_from_db,
cache.solvable_orders.latest_settlement_block,
min_valid_to,
)
.boxed(),
None => self.persistence.all_solvable_orders(min_valid_to).boxed(),
};

let latest_creation_timestamp = db_solvable_orders
.orders
.values()
.map(|order| order.metadata.creation_date)
.max()
.map_or(previous_creation_timestamp, |max_creation_timestamp| {
std::cmp::max(max_creation_timestamp, previous_creation_timestamp)
});

Ok((db_solvable_orders, latest_creation_timestamp))
fetch_orders.await
}

/// Executed orders filtering in parallel.
Expand Down

0 comments on commit 990ce6c

Please sign in to comment.