Skip to content

Commit

Permalink
fix: Prevent repeated calls to failed exchanges (#264)
Browse files Browse the repository at this point in the history
- Adds a filter to remove failed exchanges in subsequent out calls
within the same `get_exchange_rate` call
- Update testing
  • Loading branch information
nategurian authored Apr 10, 2024
1 parent 987d384 commit 458adc3
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 163 deletions.
226 changes: 133 additions & 93 deletions src/xrc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,88 +33,105 @@ const STABLECOIN_BASES: &[&str] = &[DAI, USDC];
const MIN_NUM_RATES_FOR_PRIVILEGED_CANISTERS: usize =
if cfg!(feature = "ipv4-support") { 3 } else { 2 };

#[derive(Clone, Debug)]
struct QueriedExchangeRateWithFailedExchanges {
queried_exchange_rate: QueriedExchangeRate,
failed_exchanges: Vec<Exchange>,
}

#[async_trait]
trait CallExchanges {
async fn get_cryptocurrency_usdt_rate(
&self,
exchanges: &[&Exchange],
asset: &Asset,
timestamp: u64,
) -> Result<QueriedExchangeRate, CallExchangeError>;
) -> Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError>;

async fn get_stablecoin_rates(
&self,
exchanges: &[&Exchange],
symbols: &[&str],
timestamp: u64,
) -> Vec<Result<QueriedExchangeRate, CallExchangeError>>;
) -> Vec<Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError>>;
}

struct CallExchangesImpl;

#[async_trait]
impl CallExchanges for CallExchangesImpl {
async fn get_cryptocurrency_usdt_rate(
&self,
asset: &Asset,
timestamp: u64,
) -> Result<QueriedExchangeRate, CallExchangeError> {
let futures = EXCHANGES.iter().filter_map(|exchange| {
if !exchange.is_available() {
return None;
}

Some(call_exchange(
exchange,
CallExchangeArgs {
timestamp,
quote_asset: usdt_asset(),
base_asset: asset.clone(),
},
))
});
let results = join_all(futures).await;

let mut rates = vec![];
let mut errors = vec![];
for result in results {
match result {
Ok(rate) => rates.push(rate),
Err(err) => {
ic_cdk::println!(
"{} Timestamp: {}, Asset: {:?}, Error: {}",
LOG_PREFIX,
timestamp,
asset,
err,
);
errors.push(err);
}
}
}

if rates.is_empty() {
return Err(CallExchangeError::NoRatesFound);
}

Ok(QueriedExchangeRate::new(
asset.clone(),
usdt_asset(),
timestamp,
&rates,
rates.len() + errors.len(),
rates.len(),
None,
))
&self,
exchanges: &[&Exchange],
asset: &Asset,
timestamp: u64,
) -> Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError> {
let futures = exchanges.iter().map(|exchange| {
call_exchange(
exchange,
CallExchangeArgs {
timestamp,
quote_asset: usdt_asset(),
base_asset: asset.clone(),
},
)
});
let results = join_all(futures).await;

let mut rates = vec![];
let mut failed_exchanges = vec![];
for result in results {
match result {
Ok(rate) => rates.push(rate),
Err(err) => {
ic_cdk::println!(
"{} Timestamp: {}, Asset: {:?}, Error: {}",
LOG_PREFIX,
timestamp,
asset,
err,
);

if let CallExchangeError::Http { exchange, error: _} = err {
if let Some(exchange) = exchanges.iter().find(|e| e.to_string() == exchange) {
failed_exchanges.push((*exchange).clone());
}
else {
ic_cdk::println!("{} Exchange not found for failed exchanges: {} @ {}", LOG_PREFIX, exchange, timestamp);
}
}
}
}
}

if rates.is_empty() {
return Err(CallExchangeError::NoRatesFound);
}

Ok(QueriedExchangeRateWithFailedExchanges {
queried_exchange_rate: QueriedExchangeRate::new(
asset.clone(),
usdt_asset(),
timestamp,
&rates,
exchanges.len(),
rates.len(),
None,
),
failed_exchanges,
})
}

async fn get_stablecoin_rates(
&self,
exchanges: &[&Exchange],
symbols: &[&str],
timestamp: u64,
) -> Vec<Result<QueriedExchangeRate, CallExchangeError>> {
) -> Vec<Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError>> {
join_all(
symbols
.iter()
.map(|symbol| get_stablecoin_rate(symbol, timestamp)),
.map(|symbol| get_stablecoin_rate(exchanges, symbol, timestamp)),
)
.await
}
Expand All @@ -136,6 +153,11 @@ pub fn usd_asset() -> Asset {
}
}

/// Helper function to get a list of available exchanges.
fn get_available_exchanges() -> Vec<&'static Exchange> {
EXCHANGES.iter().filter(|e| e.is_available()).collect::<Vec<_>>()
}

/// This function retrieves the requested rate from the exchanges. The median rate of all collected
/// rates is used as the exchange rate and a set of metadata is returned giving information on
/// how the rate was retrieved.
Expand Down Expand Up @@ -489,10 +511,11 @@ fn charge_cycles(
async fn handle_cryptocurrency_pair(
env: &impl Environment,
call_exchanges_impl: &impl CallExchanges,
request: &GetExchangeRateRequest,
request: &GetExchangeRateRequest
) -> Result<QueriedExchangeRate, ExchangeRateError> {
let requested_timestamp = get_normalized_timestamp(env, request);

let mut failed_exchanges = vec![];
let mut exchanges = get_available_exchanges();
let caller = env.caller();
let (maybe_base_rate, maybe_quote_rate) = with_cache_mut(|cache| {
(
Expand Down Expand Up @@ -544,37 +567,41 @@ async fn handle_cryptocurrency_pair(
let base_rate = match maybe_base_rate {
Some(base_rate) => base_rate,
None => {
let base_rate = call_exchanges_impl
let response = call_exchanges_impl
.get_cryptocurrency_usdt_rate(
&exchanges,
&request.base_asset,
requested_timestamp.value,
)
.await
.map_err(|_| ExchangeRateError::CryptoBaseAssetNotFound)?;
with_cache_mut(|cache| {
cache.insert(&base_rate);
cache.insert(&response.queried_exchange_rate);
});
base_rate
failed_exchanges.extend(response.failed_exchanges);
response.queried_exchange_rate
}
};
exchanges.retain(|exchange| !failed_exchanges.contains(exchange));

let quote_rate = match maybe_quote_rate {
Some(quote_rate) => quote_rate,
None => {
let quote_rate = call_exchanges_impl
let response = call_exchanges_impl
.get_cryptocurrency_usdt_rate(
&exchanges,
&request.quote_asset,
requested_timestamp.value,
)
.await
.map_err(|_| ExchangeRateError::CryptoQuoteAssetNotFound)?;
with_cache_mut(|cache| {
cache.insert(&quote_rate);
cache.insert(&response.queried_exchange_rate);
});
quote_rate
failed_exchanges.extend(response.failed_exchanges);
response.queried_exchange_rate
}
};

(base_rate / quote_rate).validate()
}),
)
Expand All @@ -588,6 +615,8 @@ async fn handle_crypto_base_fiat_quote_pair(
) -> Result<QueriedExchangeRate, ExchangeRateError> {
let requested_timestamp = get_normalized_timestamp(env, request);
let caller = env.caller();
let mut failed_exchanges_list = vec![];
let mut exchanges = get_available_exchanges();
let maybe_crypto_base_rate = with_cache_mut(|cache| {
get_rate_from_cache(
cache,
Expand All @@ -596,7 +625,6 @@ async fn handle_crypto_base_fiat_quote_pair(
requested_timestamp.value,
)
});

let mut num_rates_needed: usize = 0;
if maybe_crypto_base_rate.is_none() {
num_rates_needed = num_rates_needed.saturating_add(1);
Expand Down Expand Up @@ -657,17 +685,21 @@ async fn handle_crypto_base_fiat_quote_pair(
// Retrieve the missing stablecoin results. For each rate retrieved, cache it and add it to the
// stablecoin rates vector.
let stablecoin_results = call_exchanges_impl
.get_stablecoin_rates(&missed_stablecoin_symbols, requested_timestamp.value)
.get_stablecoin_rates(&exchanges, &missed_stablecoin_symbols, requested_timestamp.value)
.await;

stablecoin_results
.iter()
.into_iter()
.zip(missed_stablecoin_symbols)
.for_each(|(result, symbol)| match result {
Ok(rate) => {
stablecoin_rates.push(rate.clone());
Ok(QueriedExchangeRateWithFailedExchanges {
failed_exchanges,
queried_exchange_rate,
}) => {
failed_exchanges_list.extend(failed_exchanges);
stablecoin_rates.push(queried_exchange_rate.clone());
with_cache_mut(|cache| {
cache.insert(rate);
cache.insert(&queried_exchange_rate);
});
}
Err(error) => {
Expand All @@ -681,27 +713,29 @@ async fn handle_crypto_base_fiat_quote_pair(
}
});

exchanges.retain(|exchange| !failed_exchanges_list.contains(exchange));
let crypto_base_rate = match maybe_crypto_base_rate {
Some(base_rate) => base_rate,
None => {
let base_rate = call_exchanges_impl
let response = call_exchanges_impl
.get_cryptocurrency_usdt_rate(
&exchanges,
&request.base_asset,
requested_timestamp.value,
)
.await
.map_err(|_| ExchangeRateError::CryptoBaseAssetNotFound)?;
with_cache_mut(|cache| {
cache.insert(&base_rate);
cache.insert(&response.queried_exchange_rate);
});
base_rate
failed_exchanges_list.extend(response.failed_exchanges);
response.queried_exchange_rate
}
};

let stablecoin_rate = stablecoin::get_stablecoin_rate(&stablecoin_rates, &usd_asset())
.map_err(ExchangeRateError::from)?;
let crypto_usd_base_rate = crypto_base_rate * stablecoin_rate;

(crypto_usd_base_rate / forex_rate).validate()
}),
)
Expand Down Expand Up @@ -736,15 +770,12 @@ fn handle_fiat_pair(
}

async fn get_stablecoin_rate(
exchanges: &[&Exchange],
symbol: &str,
timestamp: u64,
) -> Result<QueriedExchangeRate, CallExchangeError> {
) -> Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError> {
let mut futures = vec![];
EXCHANGES.iter().for_each(|exchange| {
if !cfg!(feature = "ipv4-support") && !exchange.supports_ipv6() {
return;
}

exchanges.iter().for_each(|exchange| {
let maybe_pair = exchange
.supported_stablecoin_pairs()
.iter()
Expand All @@ -769,8 +800,7 @@ async fn get_stablecoin_rate(
let results = join_all(futures).await;

let mut rates = vec![];
let mut errors = vec![];

let mut failed_exchanges = vec![];
for result in results {
match result {
Ok(rate) => rates.push(rate),
Expand All @@ -782,7 +812,14 @@ async fn get_stablecoin_rate(
timestamp,
error
);
errors.push(error);
if let CallExchangeError::Http { exchange, error: _} = error {
if let Some(exchange) = exchanges.iter().find(|e| e.to_string() == exchange) {
failed_exchanges.push((*exchange).clone());
}
else {
ic_cdk::println!("{} Exchange not found for failed exchanges: {} @ {}", LOG_PREFIX, exchange, timestamp);
}
}
}
}
}
Expand All @@ -791,18 +828,21 @@ async fn get_stablecoin_rate(
return Err(CallExchangeError::NoRatesFound);
}

Ok(QueriedExchangeRate::new(
Asset {
Ok(QueriedExchangeRateWithFailedExchanges {
queried_exchange_rate: QueriedExchangeRate::new(
Asset {
symbol: symbol.to_string(),
class: AssetClass::Cryptocurrency,
},
usdt_asset(),
timestamp,
&rates,
rates.len() + errors.len(),
rates.len(),
None,
))
},
usdt_asset(),
timestamp,
&rates,
exchanges.len(),
rates.len(),
None,
),
failed_exchanges,
})
}

async fn call_exchange_for_stablecoin(
Expand Down
Loading

0 comments on commit 458adc3

Please sign in to comment.