Skip to content

Commit dfe43e1

Browse files
committed
f Go async
1 parent ca4e0e5 commit dfe43e1

File tree

2 files changed

+63
-65
lines changed

2 files changed

+63
-65
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" }
1616
lightning-rapid-gossip-sync = { version = "0.0.110" }
1717

1818
#bdk = "0.20.0"
19-
bdk = { git = "https://github.com/tnull/bdk", branch="feat/use-external-esplora-client", features = ["use-esplora-ureq", "key-value-db"]}
19+
bdk = { git = "https://github.com/tnull/bdk", branch="feat/use-external-esplora-client", features = ["use-esplora-reqwest", "key-value-db"]}
2020
bitcoin = "0.28.1"
2121

2222
rand = "0.8.5"

src/access.rs

+62-64
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use bdk::{SignOptions, SyncOptions};
1717
use bitcoin::{BlockHash, Script, Transaction, Txid};
1818

1919
use std::collections::HashSet;
20-
use std::sync::{Arc, Mutex, MutexGuard};
20+
use std::sync::{Arc, Mutex};
2121

2222
/// The minimum feerate we are allowed to send, as specify by LDK.
2323
const MIN_FEERATE: u32 = 253;
@@ -32,7 +32,7 @@ where
3232
watched_transactions: Mutex<Vec<Txid>>,
3333
queued_outputs: Mutex<Vec<WatchedOutput>>,
3434
watched_outputs: Mutex<Vec<WatchedOutput>>,
35-
last_sync_height: Mutex<Option<u32>>,
35+
last_sync_height: tokio::sync::Mutex<Option<u32>>,
3636
logger: Arc<FilesystemLogger>,
3737
}
3838

@@ -48,7 +48,7 @@ where
4848
let queued_transactions = Mutex::new(Vec::new());
4949
let watched_outputs = Mutex::new(Vec::new());
5050
let queued_outputs = Mutex::new(Vec::new());
51-
let last_sync_height = Mutex::new(None);
51+
let last_sync_height = tokio::sync::Mutex::new(None);
5252
Self {
5353
blockchain,
5454
wallet,
@@ -61,7 +61,7 @@ where
6161
}
6262
}
6363

64-
pub(crate) fn sync_wallet(&self) -> Result<(), Error> {
64+
pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
6565
let sync_options = SyncOptions { progress: None };
6666

6767
self.wallet
@@ -73,73 +73,70 @@ where
7373
Ok(())
7474
}
7575

76-
pub(crate) fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
76+
pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
7777
let client = &*self.blockchain;
7878

79-
let cur_height = client.get_height()?;
79+
let cur_height = client.get_height().await?;
8080

81-
let mut locked_last_sync_height = self.last_sync_height.lock().unwrap();
81+
let mut locked_last_sync_height = self.last_sync_height.lock().await;
8282
if cur_height >= locked_last_sync_height.unwrap_or(0) {
83-
self.sync_best_block_updated(
84-
confirmables.clone(),
85-
cur_height,
86-
&mut locked_last_sync_height,
87-
)?;
88-
self.sync_transactions_confirmed(confirmables.clone())?;
89-
self.sync_transaction_unconfirmed(confirmables.clone())?;
83+
self.sync_best_block_updated(&confirmables, cur_height, &mut locked_last_sync_height)
84+
.await?;
85+
self.sync_transactions_confirmed(&confirmables).await?;
86+
self.sync_transaction_unconfirmed(&confirmables).await?;
9087
}
9188
// TODO: check whether new outputs have been registered by now and process them
9289
Ok(())
9390
}
9491

95-
fn sync_best_block_updated(
96-
&self, confirmables: Vec<&(dyn Confirm + Sync)>, cur_height: u32,
97-
locked_last_sync_height: &mut MutexGuard<Option<u32>>,
92+
async fn sync_best_block_updated(
93+
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, cur_height: u32,
94+
locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option<u32>>,
9895
) -> Result<(), Error> {
9996
let client = &*self.blockchain;
10097

10198
// Inform the interface of the new block.
102-
let cur_block_header = client.get_header(cur_height)?;
103-
for c in &confirmables {
99+
let cur_block_header = client.get_header(cur_height).await?;
100+
for c in confirmables {
104101
c.best_block_updated(&cur_block_header, cur_height);
105102
}
106103

107104
**locked_last_sync_height = Some(cur_height);
108105
Ok(())
109106
}
110107

111-
fn sync_transactions_confirmed(
112-
&self, confirmables: Vec<&(dyn Confirm + Sync)>,
108+
async fn sync_transactions_confirmed(
109+
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
113110
) -> Result<(), Error> {
114111
let client = &*self.blockchain;
115112

116113
// First, check the confirmation status of registered transactions as well as the
117114
// status of dependent transactions of registered outputs.
118-
let mut locked_queued_transactions = self.queued_transactions.lock().unwrap();
119-
let mut locked_queued_outputs = self.queued_outputs.lock().unwrap();
120-
let mut locked_watched_transactions = self.watched_transactions.lock().unwrap();
121-
let mut locked_watched_outputs = self.watched_outputs.lock().unwrap();
122115

123116
let mut confirmed_txs = Vec::new();
124117

125118
// Check in the current queue, as well as in registered transactions leftover from
126119
// previous iterations.
127-
let registered_txs: HashSet<Txid> = locked_watched_transactions
128-
.iter()
129-
.chain(locked_queued_transactions.iter())
130-
.cloned()
131-
.collect();
120+
let registered_txs: HashSet<Txid> = {
121+
let locked_queued_transactions = self.queued_transactions.lock().unwrap();
122+
let locked_watched_transactions = self.watched_transactions.lock().unwrap();
123+
locked_watched_transactions
124+
.iter()
125+
.chain(locked_queued_transactions.iter())
126+
.cloned()
127+
.collect()
128+
};
132129

133130
// Remember all registered but unconfirmed transactions for future processing.
134131
let mut unconfirmed_registered_txs = Vec::new();
135132

136133
for txid in registered_txs {
137-
if let Some(tx_status) = client.get_tx_status(&txid)? {
134+
if let Some(tx_status) = client.get_tx_status(&txid).await? {
138135
if tx_status.confirmed {
139-
if let Some(tx) = client.get_tx(&txid)? {
136+
if let Some(tx) = client.get_tx(&txid).await? {
140137
if let Some(block_height) = tx_status.block_height {
141-
let block_header = client.get_header(block_height)?;
142-
if let Some(merkle_proof) = client.get_merkle_proof(&txid)? {
138+
let block_header = client.get_header(block_height).await?;
139+
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
143140
confirmed_txs.push((
144141
tx,
145142
block_height,
@@ -156,25 +153,29 @@ where
156153
}
157154

158155
// Check all registered outputs for dependent spending transactions.
159-
let registered_outputs: Vec<WatchedOutput> =
160-
locked_watched_outputs.iter().chain(locked_queued_outputs.iter()).cloned().collect();
156+
let registered_outputs: Vec<WatchedOutput> = {
157+
let locked_queued_outputs = self.queued_outputs.lock().unwrap();
158+
let locked_watched_outputs = self.watched_outputs.lock().unwrap();
159+
locked_watched_outputs.iter().chain(locked_queued_outputs.iter()).cloned().collect()
160+
};
161161

162162
// Remember all registered outputs that haven't been spent for future processing.
163163
let mut unspent_registered_outputs = Vec::new();
164164

165165
for output in registered_outputs {
166-
if let Some(output_status) =
167-
client.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)?
166+
if let Some(output_status) = client
167+
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
168+
.await?
168169
{
169170
if output_status.spent {
170171
if let Some(spending_tx_status) = output_status.status {
171172
if spending_tx_status.confirmed {
172173
let spending_txid = output_status.txid.unwrap();
173-
if let Some(spending_tx) = client.get_tx(&spending_txid)? {
174+
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
174175
let block_height = spending_tx_status.block_height.unwrap();
175-
let block_header = client.get_header(block_height)?;
176+
let block_header = client.get_header(block_height).await?;
176177
if let Some(merkle_proof) =
177-
client.get_merkle_proof(&spending_txid)?
178+
client.get_merkle_proof(&spending_txid).await?
178179
{
179180
confirmed_txs.push((
180181
spending_tx,
@@ -200,41 +201,38 @@ where
200201
},
201202
);
202203
for (tx, block_height, block_header, pos) in confirmed_txs {
203-
for c in &confirmables {
204+
for c in confirmables {
204205
c.transactions_confirmed(&block_header, &[(pos, &tx)], block_height);
205206
}
206207
}
207208

208-
*locked_watched_transactions = unconfirmed_registered_txs;
209-
*locked_queued_transactions = Vec::new();
210-
*locked_watched_outputs = unspent_registered_outputs;
211-
*locked_queued_outputs = Vec::new();
209+
*self.queued_transactions.lock().unwrap() = Vec::new();
210+
*self.watched_transactions.lock().unwrap() = unconfirmed_registered_txs;
211+
*self.queued_outputs.lock().unwrap() = Vec::new();
212+
*self.watched_outputs.lock().unwrap() = unspent_registered_outputs;
212213

213214
Ok(())
214215
}
215216

216-
fn sync_transaction_unconfirmed(
217-
&self, confirmables: Vec<&(dyn Confirm + Sync)>,
217+
async fn sync_transaction_unconfirmed(
218+
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
218219
) -> Result<(), Error> {
219220
let client = &*self.blockchain;
220221
// Query the interface for relevant txids and check whether they have been
221222
// reorged-out of the chain.
222-
let unconfirmed_txids = confirmables
223-
.iter()
224-
.flat_map(|c| c.get_relevant_txids())
225-
.filter(|txid| {
226-
client
227-
.get_tx_status(txid)
228-
.ok()
229-
.unwrap_or(None)
230-
.map_or(true, |status| !status.confirmed)
231-
})
232-
.collect::<Vec<Txid>>();
233-
234-
// Mark all relevant unconfirmed transactions as unconfirmed.
235-
for txid in &unconfirmed_txids {
236-
for c in &confirmables {
237-
c.transaction_unconfirmed(txid);
223+
let relevant_txids =
224+
confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::<HashSet<Txid>>();
225+
for txid in relevant_txids {
226+
let tx_unconfirmed = client
227+
.get_tx_status(&txid)
228+
.await
229+
.ok()
230+
.unwrap_or(None)
231+
.map_or(true, |status| !status.confirmed);
232+
if tx_unconfirmed {
233+
for c in confirmables {
234+
c.transaction_unconfirmed(&txid);
235+
}
238236
}
239237
}
240238

0 commit comments

Comments
 (0)