Skip to content

Commit

Permalink
Fix read-after-write inconsistency
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed May 23, 2024
1 parent e35338e commit 7ab8afe
Showing 1 changed file with 68 additions and 19 deletions.
87 changes: 68 additions & 19 deletions src/gas_pool/gas_pool_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::metrics::GasPoolCoreMetrics;
use crate::storage::Storage;
use crate::sui_client::SuiClient;
use crate::tx_signer::TxSigner;
use crate::types::ReservationID;
use crate::types::{GasCoin, ReservationID};
use crate::{retry_forever, retry_with_max_attempts};
use anyhow::bail;
use std::sync::Arc;
Expand Down Expand Up @@ -107,13 +107,54 @@ impl GasPool {
.await?;
debug!(?reservation_id, "Reservation is ready for execution");

// To avoid read-after-write inconsistency, we apply a trick here to calculate the
// new balance of the gas coin after the transaction.
// We first query the total balance prior to transaction execution, then execute the
// transaction, and finally derive the new gas coin balance using the gas usage from effects.
let total_gas_coin_balance = self.get_total_gas_coin_balance(payment.clone()).await;
let response = self.execute_transaction_impl(tx_data, user_sig).await;
let updated_coins = match &response {
Ok(effects) => {
let new_gas_coin = effects.gas_object().reference.to_object_ref();
let new_balance =
total_gas_coin_balance as i64 - effects.gas_cost_summary().net_gas_usage();
debug!(
?reservation_id,
"Total gas coin balance prior to execution: {}, new balance: {}",
total_gas_coin_balance,
new_balance
);
#[cfg(test)]
{
assert_eq!(
self.get_total_gas_coin_balance(payment).await,
new_balance as u64
);
}
vec![GasCoin {
object_ref: new_gas_coin,
balance: new_balance as u64,
}]
}
Err(_) => {
debug!(
?reservation_id,
"Querying latest gas state since transaction failed"
);
self.sui_client
.get_latest_gas_objects(payment)
.await
.into_values()
.flatten()
.collect()
}
};
let smashed_coin_count = payment_count - updated_coins.len();
// Regardless of whether the transaction succeeded, we need to release the coins.
// Otherwise we loose track of them. This is because `ready_for_execution` already takes
// Otherwise, we lose track of them. This is because `ready_for_execution` already takes
// the coins out of the pool and will not be covered by the auto-release mechanism.
let release_count = self.release_gas_coins(payment).await;
if payment_count > release_count {
let smashed_coin_count = payment_count - release_count;
self.release_gas_coins(updated_coins).await;
if smashed_coin_count > 0 {
info!(
?reservation_id,
"Smashed {:?} coins after transaction execution", smashed_coin_count
Expand All @@ -123,10 +164,7 @@ impl GasPool {
.with_label_values(&[&sponsor.to_string()])
.inc_by(smashed_coin_count as u64);
}
info!(
?reservation_id,
"Released {:?} coins after transaction execution", release_count
);
info!(?reservation_id, "Transaction execution finished");

response
}
Expand Down Expand Up @@ -162,6 +200,15 @@ impl GasPool {
Ok(effects)
}

async fn get_total_gas_coin_balance(&self, gas_coins: Vec<ObjectID>) -> u64 {
let latest = self.sui_client.get_latest_gas_objects(gas_coins).await;
latest
.into_values()
.flatten()
.map(|coin| coin.balance)
.sum()
}

fn check_transaction_validity(tx_data: &TransactionData) -> anyhow::Result<()> {
let mut all_args = vec![];
for command in tx_data.kind().iter_commands() {
Expand Down Expand Up @@ -195,22 +242,16 @@ impl GasPool {
Ok(())
}

/// Returns number of coins added back to the pool.
async fn release_gas_coins(&self, gas_coins: Vec<ObjectID>) -> usize {
/// Release gas coins back to the gas pool, by adding them to the storage.
async fn release_gas_coins(&self, gas_coins: Vec<GasCoin>) {
debug!("Trying to release gas coins: {:?}", gas_coins);

let latest = self.sui_client.get_latest_gas_objects(gas_coins).await;
debug!("Latest coin state: {:?}", latest);
let updated_coins: Vec<_> = latest.into_values().flatten().collect();

retry_forever!(async {
self.gas_pool_store
.add_new_coins(updated_coins.clone())
.add_new_coins(gas_coins.clone())
.await
.tap_err(|err| error!("Failed to call update_gas_coins on storage: {:?}", err))
})
.unwrap();
updated_coins.len()
}

/// Performs an end-to-end flow of reserving gas, signing a transaction, and releasing the gas coins.
Expand Down Expand Up @@ -246,7 +287,15 @@ impl GasPool {
});
if !unlocked_coins.is_empty() {
debug!("Coins that are expired: {:?}", unlocked_coins);
let count = self.release_gas_coins(unlocked_coins).await;
let latest_coins: Vec<_> = self
.sui_client
.get_latest_gas_objects(unlocked_coins.clone())
.await
.into_values()
.flatten()
.collect();
let count = latest_coins.len();
self.release_gas_coins(latest_coins).await;
info!("Released {:?} coins after expiration", count);
}
tokio::select! {
Expand Down

0 comments on commit 7ab8afe

Please sign in to comment.