Skip to content

Commit

Permalink
Merge pull request #111 from xch-dev/offer-queue
Browse files Browse the repository at this point in the history
Handle offer status updates
  • Loading branch information
Rigidity authored Nov 23, 2024
2 parents 9c004af + 9661b72 commit 163d682
Show file tree
Hide file tree
Showing 20 changed files with 370 additions and 96 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 47 additions & 15 deletions crates/sage-database/src/offers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use chia::protocol::Bytes32;
use sqlx::SqliteExecutor;

use crate::{
into_row, Database, DatabaseTx, OfferCatRow, OfferCatSql, OfferNftRow, OfferNftSql, OfferRow,
OfferSql, OfferXchRow, OfferXchSql, Result,
into_row, to_bytes32, Database, DatabaseTx, OfferCatRow, OfferCatSql, OfferNftRow, OfferNftSql,
OfferRow, OfferSql, OfferStatus, OfferXchRow, OfferXchSql, Result,
};

impl Database {
Expand Down Expand Up @@ -36,6 +36,14 @@ impl Database {
pub async fn get_offer(&self, offer_id: Bytes32) -> Result<Option<OfferRow>> {
get_offer(&self.pool, offer_id).await
}

pub async fn update_offer_status(&self, offer_id: Bytes32, status: OfferStatus) -> Result<()> {
update_offer_status(&self.pool, offer_id, status).await
}

pub async fn offer_coin_ids(&self, offer_id: Bytes32) -> Result<Vec<Bytes32>> {
offer_coin_ids(&self.pool, offer_id).await
}
}

impl DatabaseTx<'_> {
Expand Down Expand Up @@ -238,7 +246,7 @@ async fn get_offer(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result<O
.transpose()
}

pub async fn delete_offer(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result<()> {
async fn delete_offer(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result<()> {
let offer_id = offer_id.as_ref();

sqlx::query!("DELETE FROM `offers` WHERE `offer_id` = ?", offer_id)
Expand All @@ -248,10 +256,7 @@ pub async fn delete_offer(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> R
Ok(())
}

pub async fn offer_xch(
conn: impl SqliteExecutor<'_>,
offer_id: Bytes32,
) -> Result<Vec<OfferXchRow>> {
async fn offer_xch(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result<Vec<OfferXchRow>> {
let offer_id = offer_id.as_ref();

sqlx::query_as!(
Expand All @@ -266,10 +271,7 @@ pub async fn offer_xch(
.collect()
}

pub async fn offer_nfts(
conn: impl SqliteExecutor<'_>,
offer_id: Bytes32,
) -> Result<Vec<OfferNftRow>> {
async fn offer_nfts(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result<Vec<OfferNftRow>> {
let offer_id = offer_id.as_ref();

sqlx::query_as!(
Expand All @@ -284,10 +286,7 @@ pub async fn offer_nfts(
.collect()
}

pub async fn offer_cats(
conn: impl SqliteExecutor<'_>,
offer_id: Bytes32,
) -> Result<Vec<OfferCatRow>> {
async fn offer_cats(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result<Vec<OfferCatRow>> {
let offer_id = offer_id.as_ref();

sqlx::query_as!(
Expand All @@ -301,3 +300,36 @@ pub async fn offer_cats(
.map(into_row)
.collect()
}

async fn update_offer_status(
conn: impl SqliteExecutor<'_>,
offer_id: Bytes32,
status: OfferStatus,
) -> Result<()> {
let offer_id = offer_id.as_ref();
let status = status as u8;

sqlx::query!(
"UPDATE `offers` SET `status` = ? WHERE `offer_id` = ?",
status,
offer_id
)
.execute(conn)
.await?;

Ok(())
}

async fn offer_coin_ids(conn: impl SqliteExecutor<'_>, offer_id: Bytes32) -> Result<Vec<Bytes32>> {
let offer_id = offer_id.as_ref();

sqlx::query!(
"SELECT `coin_id` FROM `offered_coins` WHERE `offer_id` = ?",
offer_id
)
.fetch_all(conn)
.await?
.into_iter()
.map(|row| to_bytes32(&row.coin_id))
.collect()
}
2 changes: 2 additions & 0 deletions crates/sage-wallet/src/queues.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod cat_queue;
mod nft_uri_queue;
mod offer_queue;
mod puzzle_queue;
mod transaction_queue;

pub use cat_queue::*;
pub use nft_uri_queue::*;
pub use offer_queue::*;
pub use puzzle_queue::*;
pub use transaction_queue::*;
142 changes: 142 additions & 0 deletions crates/sage-wallet/src/queues/offer_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use chia::protocol::Bytes32;
use sage_database::{Database, OfferStatus};
use tokio::{
sync::{mpsc, Mutex},
time::{sleep, timeout},
};
use tracing::warn;

use crate::{PeerState, SyncEvent, WalletError};

#[derive(Debug)]
pub struct OfferQueue {
db: Database,
genesis_challenge: Bytes32,
state: Arc<Mutex<PeerState>>,
sync_sender: mpsc::Sender<SyncEvent>,
}

impl OfferQueue {
pub fn new(
db: Database,
genesis_challenge: Bytes32,
state: Arc<Mutex<PeerState>>,
sync_sender: mpsc::Sender<SyncEvent>,
) -> Self {
Self {
db,
genesis_challenge,
state,
sync_sender,
}
}

pub async fn start(mut self, delay: Duration) -> Result<(), WalletError> {
loop {
self.process_batch().await?;
sleep(delay).await;
}
}

async fn process_batch(&mut self) -> Result<(), WalletError> {
let peak_height = self.state.lock().await.peak().map_or(0, |peak| peak.0);

let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();

let offers = self.db.active_offers().await?;

for offer in offers {
if offer
.expiration_height
.is_some_and(|height| height <= peak_height)
|| offer.expiration_timestamp.is_some_and(|ts| ts <= timestamp)
{
self.db
.update_offer_status(offer.offer_id, OfferStatus::Expired)
.await?;

self.sync_sender
.send(SyncEvent::OfferUpdated {
offer_id: offer.offer_id,
status: OfferStatus::Expired,
})
.await
.ok();

continue;
}

loop {
let Some(peer) = self.state.lock().await.acquire_peer() else {
return Ok(());
};

let coin_ids = self.db.offer_coin_ids(offer.offer_id).await?;

let coin_states = match timeout(
Duration::from_secs(5),
peer.fetch_coins(coin_ids.clone(), self.genesis_challenge),
)
.await
{
Ok(Ok(coin_states)) => coin_states,
Err(_timeout) => {
warn!("Coin lookup timed out for {}", peer.socket_addr());
self.state.lock().await.ban(
peer.socket_addr().ip(),
Duration::from_secs(300),
"coin lookup timeout",
);
continue;
}
Ok(Err(err)) => {
warn!("Coin lookup failed for {}: {}", peer.socket_addr(), err);
self.state.lock().await.ban(
peer.socket_addr().ip(),
Duration::from_secs(300),
"coin lookup failed",
);
continue;
}
};

if coin_states.iter().all(|cs| cs.spent_height.is_some())
&& coin_ids
.into_iter()
.all(|coin_id| coin_states.iter().any(|cs| cs.coin.coin_id() == coin_id))
{
self.db
.update_offer_status(offer.offer_id, OfferStatus::Completed)
.await?;

self.sync_sender
.send(SyncEvent::OfferUpdated {
offer_id: offer.offer_id,
status: OfferStatus::Completed,
})
.await
.ok();
} else if coin_states.iter().any(|cs| cs.spent_height.is_some()) {
self.db
.update_offer_status(offer.offer_id, OfferStatus::Cancelled)
.await?;

self.sync_sender
.send(SyncEvent::OfferUpdated {
offer_id: offer.offer_id,
status: OfferStatus::Cancelled,
})
.await
.ok();
}
}
}

Ok(())
}
}
17 changes: 15 additions & 2 deletions crates/sage-wallet/src/queues/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ impl TransactionQueue {
}

if resolved {
self.sync_sender
.send(SyncEvent::TransactionEnded {
transaction_id,
success: true,
})
.await
.ok();
continue;
}

Expand All @@ -141,11 +148,17 @@ impl TransactionQueue {
let mut tx = self.db.tx().await?;
safely_remove_transaction(&mut tx, transaction_id).await?;
tx.commit().await?;

self.sync_sender
.send(SyncEvent::TransactionEnded {
transaction_id,
success: false,
})
.await
.ok();
}
}

self.sync_sender.send(SyncEvent::Transaction).await.ok();

Ok(())
}
}
Expand Down
Loading

0 comments on commit 163d682

Please sign in to comment.