Skip to content

Commit

Permalink
Native price fetching finishing touches (#3030)
Browse files Browse the repository at this point in the history
# Description
This PR addresses the last known issues with native price fetching: lock
contention and spikes of expired orders.

# Changes
## Reduce Lock Contention
#2987 made it such that we
can now fetch missing prices while building the auction. However, the
implementation resulted in too much lock contention. Previously we
locked the cache once and returned all relevant prices and now we lock
it at least once per fetched price.
To fix that we first use the optimized function which only takes one
lock and then we use the generic implementation that locks on every
token. Since we started initializing the native price cache by loading
prices from the DB we should rarely have many prices expire together so
this should be fine.
Already confirmed to be working with a hotfixed prod release.


## Reduce Spikes of Expired Prices
So far all prices we initialized the cache with expire together. This
can lead to a few spikes of prices expiring together where the update
logic can't refresh them in time which leads to expired orders.
To address that we generate random `updated_at` timestamps between 50%
and 90% expired. That way the background task should never get
overwhelmed by updating too many prices at once.

## How to test
Tested in prod
  • Loading branch information
MartinquaXD authored Oct 3, 2024
1 parent c34f150 commit 7d68eff
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ order-validation = { path = "../order-validation" }
primitive-types = { workspace = true }
prometheus = { workspace = true }
prometheus-metric-storage = { workspace = true }
rand = { workspace = true }
rate-limit = { path = "../rate-limit" }
reqwest = { workspace = true, features = ["cookies", "gzip", "json"] }
rust_decimal = { version = "1.35.0", features = ["maths"] }
Expand Down
51 changes: 38 additions & 13 deletions crates/shared/src/price_estimation/native_price_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use {
indexmap::IndexSet,
primitive_types::H160,
prometheus::{IntCounter, IntCounterVec, IntGauge},
rand::Rng,
std::{
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex, MutexGuard, Weak},
Expand Down Expand Up @@ -226,14 +227,18 @@ impl UpdateTask {

impl CachingNativePriceEstimator {
pub async fn initialize_cache(&self, prices: HashMap<H160, BigDecimal>) {
let now = Instant::now();
// Update the cache to half max age time, so it gets fetched sooner, since we
// don't know exactly how old the price was
let updated_at = now - (self.0.max_age / 2);
let mut rng = rand::thread_rng();
let now = std::time::Instant::now();

let cache = prices
.into_iter()
.filter_map(|(token, price)| {
// Generate random `updated_at` timestamp
// to avoid spikes of expired prices.
let percent_expired = rng.gen_range(50..=90);
let age = self.0.max_age.as_secs() * percent_expired / 100;
let updated_at = now - Duration::from_secs(age);

Some((
token,
CachedResult {
Expand Down Expand Up @@ -317,26 +322,31 @@ impl CachingNativePriceEstimator {
tokens: &'a [H160],
timeout: Duration,
) -> HashMap<H160, NativePriceEstimateResult> {
let mut prices = self.get_cached_prices(tokens);
if timeout.is_zero() {
return self.get_cached_prices(tokens);
return prices;
}

let mut collected_prices = HashMap::new();
let uncached_tokens: Vec<_> = tokens
.iter()
.filter(|t| !prices.contains_key(t))
.copied()
.collect();
let price_stream = self
.0
.estimate_prices_and_update_cache(tokens, self.0.max_age);
.estimate_prices_and_update_cache(&uncached_tokens, self.0.max_age);

let _ = time::timeout(timeout, async {
let mut price_stream = price_stream;

while let Some((token, result)) = price_stream.next().await {
collected_prices.insert(token, result);
prices.insert(token, result);
}
})
.await;

// Return whatever was collected up to that point, regardless of the timeout
collected_prices
prices
}
}

Expand Down Expand Up @@ -394,19 +404,34 @@ mod tests {
let mut inner = MockNativePriceEstimating::new();
inner.expect_estimate_native_price().never();

let prices = HashMap::from([(token(0), BigDecimal::try_from(1e18).unwrap())]);
const MAX_AGE_SECS: u64 = 600;
let min_age = Duration::from_secs(MAX_AGE_SECS * 49 / 100);
let max_age = Duration::from_secs(MAX_AGE_SECS * 91 / 100);

let prices =
HashMap::from_iter((0..10).map(|t| (token(t), BigDecimal::try_from(1e18).unwrap())));
let estimator = CachingNativePriceEstimator::new(
Box::new(inner),
Duration::from_millis(30),
Duration::from_secs(MAX_AGE_SECS),
Default::default(),
None,
Default::default(),
1,
);
estimator.initialize_cache(prices).await;

for _ in 0..10 {
let result = estimator.estimate_native_price(token(0)).await;
{
// Check that `updated_at` timestamps are initialized with
// reasonable values.
let cache = estimator.0.cache.lock().unwrap();
for value in cache.values() {
let elapsed = value.updated_at.elapsed();
assert!(elapsed >= min_age && elapsed <= max_age);
}
}

for i in 0..10 {
let result = estimator.estimate_native_price(token(i)).await;
assert_eq!(result.as_ref().unwrap().to_i64().unwrap(), 1);
}
}
Expand Down

0 comments on commit 7d68eff

Please sign in to comment.