From df7f3d15936903f55d1c313d25a55e835b58e631 Mon Sep 17 00:00:00 2001 From: Rigidity Date: Sat, 23 Nov 2024 17:05:55 -0500 Subject: [PATCH 1/2] Improve syncevent --- Cargo.lock | 20 ++++++++-------- .../src/queues/transaction_queue.rs | 17 +++++++++++-- .../src/sync_manager/sync_event.rs | 17 +++++++++---- .../src/sync_manager/wallet_sync.rs | 24 ++++++++++++++----- crates/sage-wallet/src/test.rs | 21 ++++++++++++---- crates/sage-wallet/src/wallet/cats.rs | 8 +++---- crates/sage-wallet/src/wallet/did_assign.rs | 10 ++++---- crates/sage-wallet/src/wallet/dids.rs | 6 ++--- crates/sage-wallet/src/wallet/nfts.rs | 10 ++++---- crates/sage-wallet/src/wallet/offer.rs | 24 +++++++++---------- .../src/wallet/p2_coin_management.rs | 6 ++--- crates/sage-wallet/src/wallet/p2_send.rs | 6 ++--- rust-toolchain.toml | 2 +- src-tauri/src/app_state.rs | 6 +++-- 14 files changed, 112 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b30aeb2..52d95d11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3258,9 +3258,9 @@ checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "litemap" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" +checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" [[package]] name = "lock_api" @@ -7562,9 +7562,9 @@ dependencies = [ [[package]] name = "yoke" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" dependencies = [ "serde", "stable_deref_trait", @@ -7574,9 +7574,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", @@ -7607,18 +7607,18 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" +checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", diff --git a/crates/sage-wallet/src/queues/transaction_queue.rs b/crates/sage-wallet/src/queues/transaction_queue.rs index 8d19a1f9..88e3ec72 100644 --- a/crates/sage-wallet/src/queues/transaction_queue.rs +++ b/crates/sage-wallet/src/queues/transaction_queue.rs @@ -118,6 +118,13 @@ impl TransactionQueue { } if resolved { + self.sync_sender + .send(SyncEvent::TransactionEnded { + transaction_id, + success: true, + }) + .await + .ok(); continue; } @@ -141,11 +148,17 @@ impl TransactionQueue { let mut tx = self.db.tx().await?; safely_remove_transaction(&mut tx, transaction_id).await?; tx.commit().await?; + + self.sync_sender + .send(SyncEvent::TransactionEnded { + transaction_id, + success: false, + }) + .await + .ok(); } } - self.sync_sender.send(SyncEvent::Transaction).await.ok(); - Ok(()) } } diff --git a/crates/sage-wallet/src/sync_manager/sync_event.rs b/crates/sage-wallet/src/sync_manager/sync_event.rs index cf287a28..899f180f 100644 --- a/crates/sage-wallet/src/sync_manager/sync_event.rs +++ b/crates/sage-wallet/src/sync_manager/sync_event.rs @@ -1,13 +1,22 @@ use std::net::IpAddr; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +use chia::protocol::{Bytes32, CoinState}; + +#[derive(Debug, Clone, PartialEq, Eq)] pub enum SyncEvent { Start(IpAddr), Stop, Subscribed, - Derivation, - CoinState, - Transaction, + DerivationIndex { + next_index: u32, + }, + CoinsUpdated { + coin_states: Vec, + }, + TransactionEnded { + transaction_id: Bytes32, + success: bool, + }, PuzzleBatchSynced, CatInfo, DidInfo, diff --git a/crates/sage-wallet/src/sync_manager/wallet_sync.rs b/crates/sage-wallet/src/sync_manager/wallet_sync.rs index 7b12b1ba..aaa34403 100644 --- a/crates/sage-wallet/src/sync_manager/wallet_sync.rs +++ b/crates/sage-wallet/src/sync_manager/wallet_sync.rs @@ -94,7 +94,12 @@ pub async fn sync_wallet( } tx.commit().await?; - sync_sender.send(SyncEvent::Derivation).await.ok(); + sync_sender + .send(SyncEvent::DerivationIndex { + next_index: start_index, + }) + .await + .ok(); for batch in p2_puzzle_hashes.chunks(500) { derive_more |= sync_puzzle_hashes( @@ -216,7 +221,7 @@ pub async fn incremental_sync( ) -> Result<(), WalletError> { let mut tx = wallet.db.tx().await?; - for coin_state in coin_states { + for &coin_state in &coin_states { upsert_coin(&mut tx, coin_state, None).await?; if coin_state.spent_height.is_some() { @@ -224,14 +229,20 @@ pub async fn incremental_sync( } } + sync_sender + .send(SyncEvent::CoinsUpdated { coin_states }) + .await + .ok(); + let mut derived = false; + let mut next_index = tx.derivation_index(false).await?; + if derive_automatically { let max_index = tx .max_used_derivation_index(false) .await? .map_or(0, |index| index + 1); - let mut next_index = tx.derivation_index(false).await?; while next_index < max_index + 500 { wallet @@ -245,10 +256,11 @@ pub async fn incremental_sync( tx.commit().await?; - sync_sender.send(SyncEvent::CoinState).await.ok(); - if derived { - sync_sender.send(SyncEvent::Derivation).await.ok(); + sync_sender + .send(SyncEvent::DerivationIndex { next_index }) + .await + .ok(); } Ok(()) diff --git a/crates/sage-wallet/src/test.rs b/crates/sage-wallet/src/test.rs index 66f16602..97f37254 100644 --- a/crates/sage-wallet/src/test.rs +++ b/crates/sage-wallet/src/test.rs @@ -131,7 +131,8 @@ impl TestWallet { index: key_index, }; - test.consume_until(SyncEvent::Subscribed).await; + test.consume_until(|event| matches!(event, SyncEvent::Subscribed)) + .await; assert_eq!(test.wallet.db.balance().await?, balance as u128); Ok(test) @@ -170,18 +171,28 @@ impl TestWallet { Ok(()) } - pub async fn consume_until(&mut self, event: SyncEvent) { + pub async fn consume_until(&mut self, f: impl Fn(SyncEvent) -> bool) { loop { let next = timeout(Duration::from_secs(10), self.events.recv()) .await - .unwrap_or_else(|_| panic!("timed out listening for {event:?}")) - .unwrap_or_else(|| panic!("missing {event:?}")); + .unwrap_or_else(|_| panic!("timed out listening for event")) + .unwrap_or_else(|| panic!("missing next event")); debug!("Consuming event: {next:?}"); - if event == next { + if f(next) { return; } } } + + pub async fn wait_for_coins(&mut self) { + self.consume_until(|event| matches!(event, SyncEvent::CoinsUpdated { .. })) + .await; + } + + pub async fn wait_for_puzzles(&mut self) { + self.consume_until(|event| matches!(event, SyncEvent::PuzzleBatchSynced)) + .await; + } } diff --git a/crates/sage-wallet/src/wallet/cats.rs b/crates/sage-wallet/src/wallet/cats.rs index 70ea62df..8009ee05 100644 --- a/crates/sage-wallet/src/wallet/cats.rs +++ b/crates/sage-wallet/src/wallet/cats.rs @@ -140,7 +140,7 @@ impl Wallet { mod tests { use test_log::test; - use crate::{SyncEvent, TestWallet}; + use crate::TestWallet; #[test(tokio::test)] async fn test_send_cat() -> anyhow::Result<()> { @@ -150,7 +150,7 @@ mod tests { assert_eq!(coin_spends.len(), 2); test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; assert_eq!(test.wallet.db.balance().await?, 500); assert_eq!(test.wallet.db.spendable_coins().await?.len(), 1); @@ -164,7 +164,7 @@ mod tests { assert_eq!(coin_spends.len(), 1); test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; assert_eq!(test.wallet.db.cat_balance(asset_id).await?, 1000); assert_eq!(test.wallet.db.spendable_cat_coins(asset_id).await?.len(), 2); @@ -176,7 +176,7 @@ mod tests { assert_eq!(coin_spends.len(), 3); test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; assert_eq!(test.wallet.db.balance().await?, 0); assert_eq!(test.wallet.db.spendable_coins().await?.len(), 0); diff --git a/crates/sage-wallet/src/wallet/did_assign.rs b/crates/sage-wallet/src/wallet/did_assign.rs index 633f70a1..456a59de 100644 --- a/crates/sage-wallet/src/wallet/did_assign.rs +++ b/crates/sage-wallet/src/wallet/did_assign.rs @@ -132,7 +132,7 @@ mod tests { use chia::puzzles::nft::NftMetadata; use test_log::test; - use crate::{SyncEvent, TestWallet, WalletNftMint}; + use crate::{TestWallet, WalletNftMint}; use super::*; @@ -142,7 +142,7 @@ mod tests { let (coin_spends, did) = test.wallet.create_did(0, false, true).await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; let (coin_spends, mut nfts, _did) = test .wallet @@ -159,7 +159,7 @@ mod tests { ) .await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::PuzzleBatchSynced).await; + test.wait_for_puzzles().await; let nft = nfts.remove(0); @@ -168,7 +168,7 @@ mod tests { .assign_nfts(vec![nft.info.launcher_id], None, 0, false, true) .await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; let coin_spends = test .wallet @@ -181,7 +181,7 @@ mod tests { ) .await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; Ok(()) } diff --git a/crates/sage-wallet/src/wallet/dids.rs b/crates/sage-wallet/src/wallet/dids.rs index 08007a1f..af0284a3 100644 --- a/crates/sage-wallet/src/wallet/dids.rs +++ b/crates/sage-wallet/src/wallet/dids.rs @@ -124,7 +124,7 @@ impl Wallet { #[cfg(test)] mod tests { - use crate::{SyncEvent, TestWallet}; + use crate::TestWallet; use test_log::test; @@ -134,7 +134,7 @@ mod tests { let (coin_spends, did) = test.wallet.create_did(0, false, true).await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; for _ in 0..2 { let coin_spends = test @@ -142,7 +142,7 @@ mod tests { .transfer_dids(vec![did.info.launcher_id], test.puzzle_hash, 0, false, true) .await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; } assert_ne!( diff --git a/crates/sage-wallet/src/wallet/nfts.rs b/crates/sage-wallet/src/wallet/nfts.rs index 914584a7..2d6c78ce 100644 --- a/crates/sage-wallet/src/wallet/nfts.rs +++ b/crates/sage-wallet/src/wallet/nfts.rs @@ -232,7 +232,7 @@ impl Wallet { mod tests { use test_log::test; - use crate::{SyncEvent, TestWallet}; + use crate::TestWallet; use super::*; @@ -242,7 +242,7 @@ mod tests { let (coin_spends, did) = test.wallet.create_did(0, false, true).await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; let (coin_spends, mut nfts, _did) = test .wallet @@ -259,7 +259,7 @@ mod tests { ) .await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::PuzzleBatchSynced).await; + test.wait_for_puzzles().await; let puzzle_hash = test.wallet.p2_puzzle_hash(false, true).await?; @@ -275,7 +275,7 @@ mod tests { .add_nft_uri(nft.info.launcher_id, 0, item, false, true) .await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; } for _ in 0..2 { @@ -284,7 +284,7 @@ mod tests { .transfer_nfts(vec![nft.info.launcher_id], puzzle_hash, 0, false, true) .await?; test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; } Ok(()) diff --git a/crates/sage-wallet/src/wallet/offer.rs b/crates/sage-wallet/src/wallet/offer.rs index 653964ee..5f557b28 100644 --- a/crates/sage-wallet/src/wallet/offer.rs +++ b/crates/sage-wallet/src/wallet/offer.rs @@ -25,7 +25,7 @@ mod tests { use indexmap::{indexmap, IndexMap}; use test_log::test; - use crate::{MakerSide, RequestedNft, SyncEvent, TakerSide, TestWallet, WalletNftMint}; + use crate::{MakerSide, RequestedNft, TakerSide, TestWallet, WalletNftMint}; #[test(tokio::test)] async fn test_offer_xch_for_cat() -> anyhow::Result<()> { @@ -35,7 +35,7 @@ mod tests { // Issue CAT let (coin_spends, asset_id) = bob.wallet.issue_cat(1000, 0, None, false, true).await?; bob.transact(coin_spends).await?; - bob.consume_until(SyncEvent::CoinState).await; + bob.wait_for_coins().await; // Create offer let offer = alice @@ -70,8 +70,8 @@ mod tests { bob.push_bundle(spend_bundle).await?; // We need to wait for both wallets to sync in this case - bob.consume_until(SyncEvent::CoinState).await; - alice.consume_until(SyncEvent::PuzzleBatchSynced).await; + bob.wait_for_coins().await; + alice.wait_for_puzzles().await; // Check balances assert_eq!(alice.wallet.db.cat_balance(asset_id).await?, 1000); @@ -87,7 +87,7 @@ mod tests { let (coin_spends, did) = bob.wallet.create_did(0, false, true).await?; bob.transact(coin_spends).await?; - bob.consume_until(SyncEvent::CoinState).await; + bob.wait_for_coins().await; let (coin_spends, mut nfts, _did) = bob .wallet @@ -104,7 +104,7 @@ mod tests { ) .await?; bob.transact(coin_spends).await?; - bob.consume_until(SyncEvent::PuzzleBatchSynced).await; + bob.wait_for_puzzles().await; let nft = nfts.remove(0); @@ -152,8 +152,8 @@ mod tests { bob.push_bundle(spend_bundle).await?; // We need to wait for both wallets to sync in this case - bob.consume_until(SyncEvent::CoinState).await; - alice.consume_until(SyncEvent::PuzzleBatchSynced).await; + bob.wait_for_coins().await; + alice.wait_for_puzzles().await; // Check balances assert_ne!( @@ -172,7 +172,7 @@ mod tests { let (coin_spends, did) = alice.wallet.create_did(0, false, true).await?; alice.transact(coin_spends).await?; - alice.consume_until(SyncEvent::CoinState).await; + alice.wait_for_coins().await; let (coin_spends, mut nfts, _did) = alice .wallet @@ -189,7 +189,7 @@ mod tests { ) .await?; alice.transact(coin_spends).await?; - alice.consume_until(SyncEvent::PuzzleBatchSynced).await; + alice.wait_for_puzzles().await; let nft = nfts.remove(0); @@ -226,8 +226,8 @@ mod tests { bob.push_bundle(spend_bundle).await?; // We need to wait for both wallets to sync in this case - alice.consume_until(SyncEvent::CoinState).await; - bob.consume_until(SyncEvent::CoinState).await; + alice.wait_for_coins().await; + bob.wait_for_coins().await; // Check balances assert_eq!(alice.wallet.db.balance().await?, 1000); diff --git a/crates/sage-wallet/src/wallet/p2_coin_management.rs b/crates/sage-wallet/src/wallet/p2_coin_management.rs index 9c999fd0..f13c9447 100644 --- a/crates/sage-wallet/src/wallet/p2_coin_management.rs +++ b/crates/sage-wallet/src/wallet/p2_coin_management.rs @@ -189,7 +189,7 @@ impl Wallet { mod tests { use test_log::test; - use crate::{SyncEvent, TestWallet}; + use crate::TestWallet; #[test(tokio::test)] async fn test_xch_coin_management() -> anyhow::Result<()> { @@ -200,7 +200,7 @@ mod tests { assert_eq!(coin_spends.len(), 1); test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; assert_eq!(test.wallet.db.balance().await?, 1000); assert_eq!(test.wallet.db.spendable_coins().await?.len(), 3); @@ -210,7 +210,7 @@ mod tests { assert_eq!(coin_spends.len(), 3); test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; assert_eq!(test.wallet.db.balance().await?, 1000); assert_eq!(test.wallet.db.spendable_coins().await?.len(), 1); diff --git a/crates/sage-wallet/src/wallet/p2_send.rs b/crates/sage-wallet/src/wallet/p2_send.rs index c85c4282..c659f726 100644 --- a/crates/sage-wallet/src/wallet/p2_send.rs +++ b/crates/sage-wallet/src/wallet/p2_send.rs @@ -48,7 +48,7 @@ impl Wallet { mod tests { use test_log::test; - use crate::{SyncEvent, TestWallet}; + use crate::TestWallet; #[test(tokio::test)] async fn test_send_xch() -> anyhow::Result<()> { @@ -62,7 +62,7 @@ mod tests { assert_eq!(coin_spends.len(), 1); test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; assert_eq!(test.wallet.db.balance().await?, 1000); assert_eq!(test.wallet.db.spendable_coins().await?.len(), 1); @@ -82,7 +82,7 @@ mod tests { assert_eq!(coin_spends.len(), 1); test.transact(coin_spends).await?; - test.consume_until(SyncEvent::CoinState).await; + test.wait_for_coins().await; assert_eq!(test.wallet.db.balance().await?, 750); assert_eq!(test.wallet.db.spendable_coins().await?.len(), 2); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 85cb9e7b..f19c7df4 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.80.0" +channel = "1.81.0" components = ["rustfmt", "clippy"] diff --git a/src-tauri/src/app_state.rs b/src-tauri/src/app_state.rs index 4bcba4c7..e66e5731 100644 --- a/src-tauri/src/app_state.rs +++ b/src-tauri/src/app_state.rs @@ -59,9 +59,11 @@ impl AppStateInner { SyncEvent::Start(ip) => ApiEvent::Start { ip: ip.to_string() }, SyncEvent::Stop => ApiEvent::Stop, SyncEvent::Subscribed => ApiEvent::Subscribed, - SyncEvent::Derivation => ApiEvent::Derivation, + SyncEvent::DerivationIndex { .. } => ApiEvent::Derivation, // TODO: New event? - SyncEvent::CoinState | SyncEvent::Transaction => ApiEvent::CoinState, + SyncEvent::CoinsUpdated { .. } | SyncEvent::TransactionEnded { .. } => { + ApiEvent::CoinState + } SyncEvent::PuzzleBatchSynced => ApiEvent::PuzzleBatchSynced, SyncEvent::CatInfo => ApiEvent::CatInfo, SyncEvent::DidInfo => ApiEvent::DidInfo, From 9661b7294ec42a6bd09ebda6dbd8ef8065933a6f Mon Sep 17 00:00:00 2001 From: Rigidity Date: Sat, 23 Nov 2024 17:46:26 -0500 Subject: [PATCH 2/2] Finish offer queue --- crates/sage-database/src/offers.rs | 62 ++++++-- crates/sage-wallet/src/queues.rs | 2 + crates/sage-wallet/src/queues/offer_queue.rs | 142 ++++++++++++++++++ crates/sage-wallet/src/sync_manager.rs | 43 +++++- .../sage-wallet/src/sync_manager/options.rs | 2 + .../src/sync_manager/sync_event.rs | 5 + crates/sage-wallet/src/test.rs | 1 + src-tauri/src/app_state.rs | 6 +- src/pages/Offers.tsx | 32 ++-- 9 files changed, 261 insertions(+), 34 deletions(-) create mode 100644 crates/sage-wallet/src/queues/offer_queue.rs diff --git a/crates/sage-database/src/offers.rs b/crates/sage-database/src/offers.rs index dc093884..36be3ef0 100644 --- a/crates/sage-database/src/offers.rs +++ b/crates/sage-database/src/offers.rs @@ -4,8 +4,8 @@ use chia::protocol::Bytes32; use sqlx::SqliteExecutor; use crate::{ - into_row, Database, DatabaseTx, OfferCatRow, OfferCatSql, OfferNftRow, OfferNftSql, OfferRow, - OfferSql, OfferXchRow, OfferXchSql, Result, + into_row, to_bytes32, Database, DatabaseTx, OfferCatRow, OfferCatSql, OfferNftRow, OfferNftSql, + OfferRow, OfferSql, OfferStatus, OfferXchRow, OfferXchSql, Result, }; impl Database { @@ -36,6 +36,14 @@ impl Database { pub async fn get_offer(&self, offer_id: Bytes32) -> Result> { get_offer(&self.pool, offer_id).await } + + pub async fn update_offer_status(&self, offer_id: Bytes32, status: OfferStatus) -> Result<()> { + update_offer_status(&self.pool, offer_id, status).await + } + + pub async fn offer_coin_ids(&self, offer_id: Bytes32) -> Result> { + offer_coin_ids(&self.pool, offer_id).await + } } impl DatabaseTx<'_> { @@ -238,7 +246,7 @@ async fn get_offer(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result, offer_id: Bytes32) -> Result<()> { +async fn delete_offer(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result<()> { let offer_id = offer_id.as_ref(); sqlx::query!("DELETE FROM `offers` WHERE `offer_id` = ?", offer_id) @@ -248,10 +256,7 @@ pub async fn delete_offer(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> R Ok(()) } -pub async fn offer_xch( - conn: impl SqliteExecutor<'_>, - offer_id: Bytes32, -) -> Result> { +async fn offer_xch(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result> { let offer_id = offer_id.as_ref(); sqlx::query_as!( @@ -266,10 +271,7 @@ pub async fn offer_xch( .collect() } -pub async fn offer_nfts( - conn: impl SqliteExecutor<'_>, - offer_id: Bytes32, -) -> Result> { +async fn offer_nfts(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result> { let offer_id = offer_id.as_ref(); sqlx::query_as!( @@ -284,10 +286,7 @@ pub async fn offer_nfts( .collect() } -pub async fn offer_cats( - conn: impl SqliteExecutor<'_>, - offer_id: Bytes32, -) -> Result> { +async fn offer_cats(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result> { let offer_id = offer_id.as_ref(); sqlx::query_as!( @@ -301,3 +300,36 @@ pub async fn offer_cats( .map(into_row) .collect() } + +async fn update_offer_status( + conn: impl SqliteExecutor<'_>, + offer_id: Bytes32, + status: OfferStatus, +) -> Result<()> { + let offer_id = offer_id.as_ref(); + let status = status as u8; + + sqlx::query!( + "UPDATE `offers` SET `status` = ? WHERE `offer_id` = ?", + status, + offer_id + ) + .execute(conn) + .await?; + + Ok(()) +} + +async fn offer_coin_ids(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result> { + let offer_id = offer_id.as_ref(); + + sqlx::query!( + "SELECT `coin_id` FROM `offered_coins` WHERE `offer_id` = ?", + offer_id + ) + .fetch_all(conn) + .await? + .into_iter() + .map(|row| to_bytes32(&row.coin_id)) + .collect() +} diff --git a/crates/sage-wallet/src/queues.rs b/crates/sage-wallet/src/queues.rs index febb5d72..cea69244 100644 --- a/crates/sage-wallet/src/queues.rs +++ b/crates/sage-wallet/src/queues.rs @@ -1,9 +1,11 @@ mod cat_queue; mod nft_uri_queue; +mod offer_queue; mod puzzle_queue; mod transaction_queue; pub use cat_queue::*; pub use nft_uri_queue::*; +pub use offer_queue::*; pub use puzzle_queue::*; pub use transaction_queue::*; diff --git a/crates/sage-wallet/src/queues/offer_queue.rs b/crates/sage-wallet/src/queues/offer_queue.rs new file mode 100644 index 00000000..3a0e9759 --- /dev/null +++ b/crates/sage-wallet/src/queues/offer_queue.rs @@ -0,0 +1,142 @@ +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use chia::protocol::Bytes32; +use sage_database::{Database, OfferStatus}; +use tokio::{ + sync::{mpsc, Mutex}, + time::{sleep, timeout}, +}; +use tracing::warn; + +use crate::{PeerState, SyncEvent, WalletError}; + +#[derive(Debug)] +pub struct OfferQueue { + db: Database, + genesis_challenge: Bytes32, + state: Arc>, + sync_sender: mpsc::Sender, +} + +impl OfferQueue { + pub fn new( + db: Database, + genesis_challenge: Bytes32, + state: Arc>, + sync_sender: mpsc::Sender, + ) -> Self { + Self { + db, + genesis_challenge, + state, + sync_sender, + } + } + + pub async fn start(mut self, delay: Duration) -> Result<(), WalletError> { + loop { + self.process_batch().await?; + sleep(delay).await; + } + } + + async fn process_batch(&mut self) -> Result<(), WalletError> { + let peak_height = self.state.lock().await.peak().map_or(0, |peak| peak.0); + + let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); + + let offers = self.db.active_offers().await?; + + for offer in offers { + if offer + .expiration_height + .is_some_and(|height| height <= peak_height) + || offer.expiration_timestamp.is_some_and(|ts| ts <= timestamp) + { + self.db + .update_offer_status(offer.offer_id, OfferStatus::Expired) + .await?; + + self.sync_sender + .send(SyncEvent::OfferUpdated { + offer_id: offer.offer_id, + status: OfferStatus::Expired, + }) + .await + .ok(); + + continue; + } + + loop { + let Some(peer) = self.state.lock().await.acquire_peer() else { + return Ok(()); + }; + + let coin_ids = self.db.offer_coin_ids(offer.offer_id).await?; + + let coin_states = match timeout( + Duration::from_secs(5), + peer.fetch_coins(coin_ids.clone(), self.genesis_challenge), + ) + .await + { + Ok(Ok(coin_states)) => coin_states, + Err(_timeout) => { + warn!("Coin lookup timed out for {}", peer.socket_addr()); + self.state.lock().await.ban( + peer.socket_addr().ip(), + Duration::from_secs(300), + "coin lookup timeout", + ); + continue; + } + Ok(Err(err)) => { + warn!("Coin lookup failed for {}: {}", peer.socket_addr(), err); + self.state.lock().await.ban( + peer.socket_addr().ip(), + Duration::from_secs(300), + "coin lookup failed", + ); + continue; + } + }; + + if coin_states.iter().all(|cs| cs.spent_height.is_some()) + && coin_ids + .into_iter() + .all(|coin_id| coin_states.iter().any(|cs| cs.coin.coin_id() == coin_id)) + { + self.db + .update_offer_status(offer.offer_id, OfferStatus::Completed) + .await?; + + self.sync_sender + .send(SyncEvent::OfferUpdated { + offer_id: offer.offer_id, + status: OfferStatus::Completed, + }) + .await + .ok(); + } else if coin_states.iter().any(|cs| cs.spent_height.is_some()) { + self.db + .update_offer_status(offer.offer_id, OfferStatus::Cancelled) + .await?; + + self.sync_sender + .send(SyncEvent::OfferUpdated { + offer_id: offer.offer_id, + status: OfferStatus::Cancelled, + }) + .await + .ok(); + } + } + } + + Ok(()) + } +} diff --git a/crates/sage-wallet/src/sync_manager.rs b/crates/sage-wallet/src/sync_manager.rs index 39a06855..fa0b8034 100644 --- a/crates/sage-wallet/src/sync_manager.rs +++ b/crates/sage-wallet/src/sync_manager.rs @@ -20,7 +20,9 @@ use tokio::{ use tracing::{debug, info, warn}; use wallet_sync::{incremental_sync, sync_wallet}; -use crate::{CatQueue, NftUriQueue, PuzzleQueue, TransactionQueue, Wallet, WalletError}; +use crate::{ + CatQueue, NftUriQueue, OfferQueue, PuzzleQueue, TransactionQueue, Wallet, WalletError, +}; mod options; mod peer_discovery; @@ -49,6 +51,7 @@ pub struct SyncManager { cat_queue_task: Option>>, nft_uri_queue_task: Option>>, transaction_queue_task: Option>>, + offer_queue_task: Option>>, pending_coin_subscriptions: Vec, } @@ -86,6 +89,9 @@ impl Drop for SyncManager { if let Some(task) = &mut self.transaction_queue_task { task.abort(); } + if let Some(task) = &mut self.offer_queue_task { + task.abort(); + } } } @@ -116,6 +122,7 @@ impl SyncManager { cat_queue_task: None, nft_uri_queue_task: None, transaction_queue_task: None, + offer_queue_task: None, pending_coin_subscriptions: Vec::new(), }; @@ -223,6 +230,9 @@ impl SyncManager { if let Some(task) = &mut self.transaction_queue_task.take() { task.abort(); } + if let Some(task) = &mut self.offer_queue_task.take() { + task.abort(); + } } async fn handle_message(&self, ip: IpAddr, message: Message) -> Result<(), WalletError> { @@ -399,11 +409,25 @@ impl SyncManager { ); self.transaction_queue_task = Some(task); } + + if self.offer_queue_task.is_none() { + let task = tokio::spawn( + OfferQueue::new( + wallet.db.clone(), + wallet.genesis_challenge, + self.state.clone(), + self.event_sender.clone(), + ) + .start(self.options.timeouts.offer_delay), + ); + self.offer_queue_task = Some(task); + } } else { self.puzzle_lookup_task = None; self.cat_queue_task = None; self.nft_uri_queue_task = None; self.transaction_queue_task = None; + self.offer_queue_task = None; } } @@ -489,5 +513,22 @@ impl SyncManager { None => {} } } + + if let Some(task) = &mut self.offer_queue_task { + match poll_once(task).await { + Some(Err(error)) => { + warn!("Offer queue failed with panic: {error}"); + self.offer_queue_task = None; + } + Some(Ok(Err(error))) => { + warn!("Offer queue failed with error: {error}"); + self.offer_queue_task = None; + } + Some(Ok(Ok(()))) => { + self.offer_queue_task = None; + } + None => {} + } + } } } diff --git a/crates/sage-wallet/src/sync_manager/options.rs b/crates/sage-wallet/src/sync_manager/options.rs index 5d8eab51..78fbfdba 100644 --- a/crates/sage-wallet/src/sync_manager/options.rs +++ b/crates/sage-wallet/src/sync_manager/options.rs @@ -17,6 +17,7 @@ pub struct Timeouts { pub nft_uri_delay: Duration, pub puzzle_delay: Duration, pub transaction_delay: Duration, + pub offer_delay: Duration, pub connection: Duration, pub initial_peak: Duration, pub remove_subscription: Duration, @@ -32,6 +33,7 @@ impl Default for Timeouts { nft_uri_delay: Duration::from_secs(1), puzzle_delay: Duration::from_secs(1), transaction_delay: Duration::from_secs(1), + offer_delay: Duration::from_secs(1), connection: Duration::from_secs(3), initial_peak: Duration::from_secs(2), remove_subscription: Duration::from_secs(3), diff --git a/crates/sage-wallet/src/sync_manager/sync_event.rs b/crates/sage-wallet/src/sync_manager/sync_event.rs index 899f180f..56d77ee1 100644 --- a/crates/sage-wallet/src/sync_manager/sync_event.rs +++ b/crates/sage-wallet/src/sync_manager/sync_event.rs @@ -1,6 +1,7 @@ use std::net::IpAddr; use chia::protocol::{Bytes32, CoinState}; +use sage_database::OfferStatus; #[derive(Debug, Clone, PartialEq, Eq)] pub enum SyncEvent { @@ -17,6 +18,10 @@ pub enum SyncEvent { transaction_id: Bytes32, success: bool, }, + OfferUpdated { + offer_id: Bytes32, + status: OfferStatus, + }, PuzzleBatchSynced, CatInfo, DidInfo, diff --git a/crates/sage-wallet/src/test.rs b/crates/sage-wallet/src/test.rs index 97f37254..798c772e 100644 --- a/crates/sage-wallet/src/test.rs +++ b/crates/sage-wallet/src/test.rs @@ -98,6 +98,7 @@ impl TestWallet { cat_delay: Duration::from_millis(100), puzzle_delay: Duration::from_millis(100), transaction_delay: Duration::from_millis(100), + offer_delay: Duration::from_millis(100), ..Default::default() }, testing: true, diff --git a/src-tauri/src/app_state.rs b/src-tauri/src/app_state.rs index e66e5731..b7389299 100644 --- a/src-tauri/src/app_state.rs +++ b/src-tauri/src/app_state.rs @@ -61,9 +61,9 @@ impl AppStateInner { SyncEvent::Subscribed => ApiEvent::Subscribed, SyncEvent::DerivationIndex { .. } => ApiEvent::Derivation, // TODO: New event? - SyncEvent::CoinsUpdated { .. } | SyncEvent::TransactionEnded { .. } => { - ApiEvent::CoinState - } + SyncEvent::CoinsUpdated { .. } + | SyncEvent::TransactionEnded { .. } + | SyncEvent::OfferUpdated { .. } => ApiEvent::CoinState, SyncEvent::PuzzleBatchSynced => ApiEvent::PuzzleBatchSynced, SyncEvent::CatInfo => ApiEvent::CatInfo, SyncEvent::DidInfo => ApiEvent::DidInfo, diff --git a/src/pages/Offers.tsx b/src/pages/Offers.tsx index b3b905f5..831da02f 100644 --- a/src/pages/Offers.tsx +++ b/src/pages/Offers.tsx @@ -1,4 +1,4 @@ -import { commands, OfferAssets, OfferRecord } from '@/bindings'; +import { commands, events, OfferAssets, OfferRecord } from '@/bindings'; import Container from '@/components/Container'; import Header from '@/components/Header'; import { Button } from '@/components/ui/button'; @@ -44,7 +44,7 @@ export function Offers() { [navigate], ); - useEffect(() => { + const updateOffers = () => { commands.getOffers({}).then((result) => { if (result.status === 'error') { console.error(result.error); @@ -53,6 +53,20 @@ export function Offers() { setOffers(result.data.offers); }); + }; + + useEffect(() => { + updateOffers(); + + const unlisten = events.syncEvent.listen((data) => { + if (data.payload.type === 'coin_state') { + updateOffers(); + } + }); + + return () => { + unlisten.then((u) => u()); + }; }, []); useEffect(() => { @@ -120,19 +134,7 @@ export function Offers() {
{offers.map((record, i) => ( - { - commands.getOffers({}).then((result) => { - if (result.status === 'error') { - console.error(result.error); - return; - } - setOffers(result.data.offers); - }); - }} - key={i} - /> + ))}