diff --git a/src/gas_pool/gas_pool_core.rs b/src/gas_pool/gas_pool_core.rs index 4c260de..e1b5c8d 100644 --- a/src/gas_pool/gas_pool_core.rs +++ b/src/gas_pool/gas_pool_core.rs @@ -5,7 +5,7 @@ use crate::metrics::GasPoolCoreMetrics; use crate::storage::Storage; use crate::sui_client::SuiClient; use crate::tx_signer::TxSigner; -use crate::types::ReservationID; +use crate::types::{GasCoin, ReservationID}; use crate::{retry_forever, retry_with_max_attempts}; use anyhow::bail; use std::sync::Arc; @@ -107,13 +107,54 @@ impl GasPool { .await?; debug!(?reservation_id, "Reservation is ready for execution"); + // To avoid read-after-write inconsistency, we apply a trick here to calculate the + // new balance of the gas coin after the transaction. + // We first query the total balance prior to transaction execution, then execute the + // transaction, and finally derive the new gas coin balance using the gas usage from effects. + let total_gas_coin_balance = self.get_total_gas_coin_balance(payment.clone()).await; let response = self.execute_transaction_impl(tx_data, user_sig).await; + let updated_coins = match &response { + Ok(effects) => { + let new_gas_coin = effects.gas_object().reference.to_object_ref(); + let new_balance = + total_gas_coin_balance as i64 - effects.gas_cost_summary().net_gas_usage(); + debug!( + ?reservation_id, + "Total gas coin balance prior to execution: {}, new balance: {}", + total_gas_coin_balance, + new_balance + ); + #[cfg(test)] + { + assert_eq!( + self.get_total_gas_coin_balance(payment).await, + new_balance as u64 + ); + } + vec![GasCoin { + object_ref: new_gas_coin, + balance: new_balance as u64, + }] + } + Err(_) => { + debug!( + ?reservation_id, + "Querying latest gas state since transaction failed" + ); + self.sui_client + .get_latest_gas_objects(payment) + .await + .into_values() + .flatten() + .collect() + } + }; + let smashed_coin_count = payment_count - updated_coins.len(); // Regardless of whether the transaction succeeded, we need to release the coins. - // Otherwise we loose track of them. This is because `ready_for_execution` already takes + // Otherwise, we lose track of them. This is because `ready_for_execution` already takes // the coins out of the pool and will not be covered by the auto-release mechanism. - let release_count = self.release_gas_coins(payment).await; - if payment_count > release_count { - let smashed_coin_count = payment_count - release_count; + self.release_gas_coins(updated_coins).await; + if smashed_coin_count > 0 { info!( ?reservation_id, "Smashed {:?} coins after transaction execution", smashed_coin_count @@ -123,10 +164,7 @@ impl GasPool { .with_label_values(&[&sponsor.to_string()]) .inc_by(smashed_coin_count as u64); } - info!( - ?reservation_id, - "Released {:?} coins after transaction execution", release_count - ); + info!(?reservation_id, "Transaction execution finished"); response } @@ -162,6 +200,15 @@ impl GasPool { Ok(effects) } + async fn get_total_gas_coin_balance(&self, gas_coins: Vec) -> u64 { + let latest = self.sui_client.get_latest_gas_objects(gas_coins).await; + latest + .into_values() + .flatten() + .map(|coin| coin.balance) + .sum() + } + fn check_transaction_validity(tx_data: &TransactionData) -> anyhow::Result<()> { let mut all_args = vec![]; for command in tx_data.kind().iter_commands() { @@ -195,22 +242,16 @@ impl GasPool { Ok(()) } - /// Returns number of coins added back to the pool. - async fn release_gas_coins(&self, gas_coins: Vec) -> usize { + /// Release gas coins back to the gas pool, by adding them to the storage. + async fn release_gas_coins(&self, gas_coins: Vec) { debug!("Trying to release gas coins: {:?}", gas_coins); - - let latest = self.sui_client.get_latest_gas_objects(gas_coins).await; - debug!("Latest coin state: {:?}", latest); - let updated_coins: Vec<_> = latest.into_values().flatten().collect(); - retry_forever!(async { self.gas_pool_store - .add_new_coins(updated_coins.clone()) + .add_new_coins(gas_coins.clone()) .await .tap_err(|err| error!("Failed to call update_gas_coins on storage: {:?}", err)) }) .unwrap(); - updated_coins.len() } /// Performs an end-to-end flow of reserving gas, signing a transaction, and releasing the gas coins. @@ -246,7 +287,15 @@ impl GasPool { }); if !unlocked_coins.is_empty() { debug!("Coins that are expired: {:?}", unlocked_coins); - let count = self.release_gas_coins(unlocked_coins).await; + let latest_coins: Vec<_> = self + .sui_client + .get_latest_gas_objects(unlocked_coins.clone()) + .await + .into_values() + .flatten() + .collect(); + let count = latest_coins.len(); + self.release_gas_coins(latest_coins).await; info!("Released {:?} coins after expiration", count); } tokio::select! {