Skip to content

Commit 18e7fe6

Browse files
committed
f DRY and use get_merkle_block
1 parent 5f6676f commit 18e7fe6

File tree

1 file changed

+120
-97
lines changed

1 file changed

+120
-97
lines changed

src/access.rs

+120-97
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use bdk::database::BatchDatabase;
1414
use bdk::wallet::AddressIndex;
1515
use bdk::{SignOptions, SyncOptions};
1616

17-
use bitcoin::{BlockHash, Script, Transaction, Txid};
17+
use bitcoin::{BlockHash, BlockHeader, Script, Transaction, Txid};
1818

1919
use std::collections::HashSet;
2020
use std::sync::{Arc, Mutex};
@@ -28,15 +28,15 @@ where
2828
// A BDK on-chain wallet.
2929
wallet: Mutex<bdk::Wallet<D>>,
3030
// Transactions that were registered via the `Filter` interface and have to be processed.
31-
queued_transactions: Mutex<Vec<Txid>>,
31+
queued_transactions: Mutex<HashSet<Txid>>,
3232
// Transactions that were previously processed, but must not be forgotten yet.
33-
watched_transactions: Mutex<Vec<Txid>>,
33+
watched_transactions: Mutex<HashSet<Txid>>,
3434
// Outputs that were registered via the `Filter` interface and have to be processed.
35-
queued_outputs: Mutex<Vec<WatchedOutput>>,
35+
queued_outputs: Mutex<HashSet<WatchedOutput>>,
3636
// Outputs that were previously processed, but must not be forgotten yet.
37-
watched_outputs: Mutex<Vec<WatchedOutput>>,
38-
// The tip height observed during our last sync.
39-
last_sync_height: tokio::sync::Mutex<Option<u32>>,
37+
watched_outputs: Mutex<HashSet<WatchedOutput>>,
38+
// The tip hash observed during our last sync.
39+
last_sync_hash: tokio::sync::Mutex<Option<BlockHash>>,
4040
logger: Arc<FilesystemLogger>,
4141
}
4242

@@ -48,19 +48,19 @@ where
4848
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
4949
) -> Self {
5050
let wallet = Mutex::new(wallet);
51-
let watched_transactions = Mutex::new(Vec::new());
52-
let queued_transactions = Mutex::new(Vec::new());
53-
let watched_outputs = Mutex::new(Vec::new());
54-
let queued_outputs = Mutex::new(Vec::new());
55-
let last_sync_height = tokio::sync::Mutex::new(None);
51+
let watched_transactions = Mutex::new(HashSet::new());
52+
let queued_transactions = Mutex::new(HashSet::new());
53+
let watched_outputs = Mutex::new(HashSet::new());
54+
let queued_outputs = Mutex::new(HashSet::new());
55+
let last_sync_hash = tokio::sync::Mutex::new(None);
5656
Self {
5757
blockchain,
5858
wallet,
5959
queued_transactions,
6060
watched_transactions,
6161
queued_outputs,
6262
watched_outputs,
63-
last_sync_height,
63+
last_sync_hash,
6464
logger,
6565
}
6666
}
@@ -76,33 +76,71 @@ where
7676
}
7777

7878
pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
79-
let client = &*self.blockchain;
79+
// This lock makes sure we're syncing once at a time.
80+
let mut locked_last_sync_hash = self.last_sync_hash.lock().await;
8081

81-
let tip_hash= client.get_tip_hash().await?;
82-
let tip_block_status = client.get_block_status(&tip_hash).await?;
83-
let tip_height = tip_block_status.height.unwrap_or(0);
82+
let client = &*self.blockchain;
8483

85-
let mut locked_last_sync_height = self.last_sync_height.lock().await;
86-
if tip_block_status.in_best_chain && (tip_height >= locked_last_sync_height.unwrap_or(0)) {
87-
self.sync_best_block_updated(&confirmables, &tip_hash, tip_height).await?;
88-
*locked_last_sync_height = Some(tip_height);
84+
loop {
85+
let pending_registrations = self.process_queues();
86+
let tip_hash = client.get_tip_hash().await?;
87+
let new_tip = Some(tip_hash) != *locked_last_sync_hash;
8988

90-
self.sync_transactions_confirmed(&confirmables).await?;
91-
self.sync_transaction_unconfirmed(&confirmables).await?;
89+
if pending_registrations || new_tip {
90+
if new_tip {
91+
self.sync_best_block_updated(&confirmables, &tip_hash).await?;
92+
*locked_last_sync_hash = Some(tip_hash);
93+
}
94+
self.sync_transactions_confirmed(&confirmables).await?;
95+
self.sync_transaction_unconfirmed(&confirmables).await?;
96+
} else {
97+
break;
98+
}
9299
}
93-
// TODO: check whether new outputs have been registered by now and process them
94100
Ok(())
95101
}
96102

103+
// Processes the transaction and output queues, returns `true` if new items had been
104+
// registered.
105+
fn process_queues(&self) -> bool {
106+
let mut pending_registrations = false;
107+
{
108+
let mut locked_queued_transactions = self.queued_transactions.lock().unwrap();
109+
if !locked_queued_transactions.is_empty() {
110+
let mut locked_watched_transactions = self.watched_transactions.lock().unwrap();
111+
pending_registrations = true;
112+
113+
locked_watched_transactions.extend(locked_queued_transactions.iter());
114+
*locked_queued_transactions = HashSet::new();
115+
}
116+
}
117+
{
118+
let mut locked_queued_outputs = self.queued_outputs.lock().unwrap();
119+
if !locked_queued_outputs.is_empty() {
120+
let mut locked_watched_outputs = self.watched_outputs.lock().unwrap();
121+
pending_registrations = true;
122+
123+
locked_watched_outputs.extend(locked_queued_outputs.iter().cloned());
124+
*locked_queued_outputs = HashSet::new();
125+
}
126+
}
127+
pending_registrations
128+
}
129+
97130
async fn sync_best_block_updated(
98-
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, tip_hash: &BlockHash, tip_height: u32
131+
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, tip_hash: &BlockHash,
99132
) -> Result<(), Error> {
100133
let client = &*self.blockchain;
101134

102135
// Inform the interface of the new block.
103-
let tip_block_header = client.get_header_by_hash(tip_hash).await?;
104-
for c in confirmables {
105-
c.best_block_updated(&tip_block_header, tip_height);
136+
let tip_header = client.get_header_by_hash(tip_hash).await?;
137+
let tip_status = client.get_block_status(&tip_hash).await?;
138+
if tip_status.in_best_chain {
139+
if let Some(tip_height) = tip_status.height {
140+
for c in confirmables {
141+
c.best_block_updated(&tip_header, tip_height);
142+
}
143+
}
106144
}
107145
Ok(())
108146
}
@@ -119,104 +157,82 @@ where
119157

120158
// Check in the current queue, as well as in registered transactions leftover from
121159
// previous iterations.
122-
let registered_txs: HashSet<Txid> = {
123-
let locked_queued_transactions = self.queued_transactions.lock().unwrap();
124-
let locked_watched_transactions = self.watched_transactions.lock().unwrap();
125-
locked_watched_transactions
126-
.iter()
127-
.chain(locked_queued_transactions.iter())
128-
.cloned()
129-
.collect()
130-
};
160+
let registered_txs = self.watched_transactions.lock().unwrap().clone();
131161

132162
// Remember all registered but unconfirmed transactions for future processing.
133-
let mut unconfirmed_registered_txs = Vec::new();
163+
let mut unconfirmed_registered_txs = HashSet::new();
134164

135165
for txid in registered_txs {
136-
if let Some(tx_status) = client.get_tx_status(&txid).await? {
137-
if tx_status.confirmed {
138-
if let Some(block_hash) = tx_status.block_hash {
139-
if let Some(tx) = client.get_tx(&txid).await? {
140-
let block_header = client.get_header_by_hash(&block_hash).await?;
141-
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
142-
confirmed_txs.push((
143-
tx,
144-
merkle_proof.block_height,
145-
block_header,
146-
merkle_proof.pos,
147-
));
148-
continue;
149-
}
150-
}
151-
}
152-
}
166+
if let Some(confirmed_tx) = self.get_confirmed_tx(&txid).await? {
167+
confirmed_txs.push(confirmed_tx);
168+
} else {
169+
unconfirmed_registered_txs.insert(txid);
153170
}
154-
unconfirmed_registered_txs.push(txid);
155171
}
156172

157173
// Check all registered outputs for dependent spending transactions.
158-
let registered_outputs: Vec<WatchedOutput> = {
159-
let locked_queued_outputs = self.queued_outputs.lock().unwrap();
160-
let locked_watched_outputs = self.watched_outputs.lock().unwrap();
161-
locked_watched_outputs.iter().chain(locked_queued_outputs.iter()).cloned().collect()
162-
};
174+
let registered_outputs = self.watched_outputs.lock().unwrap().clone();
163175

164176
// Remember all registered outputs that haven't been spent for future processing.
165-
let mut unspent_registered_outputs = Vec::new();
177+
let mut unspent_registered_outputs = HashSet::new();
166178

167179
for output in registered_outputs {
168180
if let Some(output_status) = client
169181
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
170182
.await?
171183
{
172-
if output_status.spent {
173-
if let Some(spending_tx_status) = output_status.status {
174-
if spending_tx_status.confirmed {
175-
let spending_txid = output_status.txid.unwrap();
176-
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
177-
if let Some(block_hash) = spending_tx_status.block_hash {
178-
let block_header = client.get_header_by_hash(&block_hash).await?;
179-
if let Some(merkle_proof) =
180-
client.get_merkle_proof(&spending_txid).await?
181-
{
182-
confirmed_txs.push((
183-
spending_tx,
184-
merkle_proof.block_height,
185-
block_header,
186-
merkle_proof.pos,
187-
));
188-
continue;
189-
}
190-
}
191-
}
192-
}
184+
if let Some(spending_txid) = output_status.txid {
185+
if let Some(confirmed_tx_tuple) = self.get_confirmed_tx(&spending_txid).await? {
186+
confirmed_txs.push(confirmed_tx_tuple);
187+
continue;
193188
}
194189
}
195190
}
196-
unspent_registered_outputs.push(output);
191+
unspent_registered_outputs.insert(output);
197192
}
198193

199194
// Sort all confirmed transactions first by block height, then by in-block
200195
// position, and finally feed them to the interface in order.
201-
confirmed_txs.sort_unstable_by(
202-
|(_, block_height1, _, pos1), (_, block_height2, _, pos2)| {
203-
block_height1.cmp(&block_height2).then_with(|| pos1.cmp(&pos2))
204-
},
205-
);
206-
for (tx, block_height, block_header, pos) in confirmed_txs {
196+
confirmed_txs.sort_unstable_by(|tx1, tx2| {
197+
tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
198+
});
199+
for ctx in confirmed_txs {
207200
for c in confirmables {
208-
c.transactions_confirmed(&block_header, &[(pos, &tx)], block_height);
201+
c.transactions_confirmed(
202+
&ctx.block_header,
203+
&[(ctx.pos, &ctx.tx)],
204+
ctx.block_height,
205+
);
209206
}
210207
}
211208

212-
*self.queued_transactions.lock().unwrap() = Vec::new();
213209
*self.watched_transactions.lock().unwrap() = unconfirmed_registered_txs;
214-
*self.queued_outputs.lock().unwrap() = Vec::new();
215210
*self.watched_outputs.lock().unwrap() = unspent_registered_outputs;
216211

217212
Ok(())
218213
}
219214

215+
async fn get_confirmed_tx(&self, txid: &Txid) -> Result<Option<ConfirmedTx>, Error> {
216+
let client = &*self.blockchain;
217+
if let Some(merkle_block) = client.get_merkle_block(&txid).await? {
218+
let mut matches = vec![*txid];
219+
let mut indexes = Vec::new();
220+
let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
221+
assert_eq!(indexes.len(), 1);
222+
let pos = indexes[0] as usize;
223+
224+
if let Some(tx) = client.get_tx(&txid).await? {
225+
let block_header = merkle_block.header;
226+
let block_hash = block_header.block_hash();
227+
let block_status = client.get_block_status(&block_hash).await?;
228+
if let Some(block_height) = block_status.height {
229+
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
230+
}
231+
}
232+
}
233+
Ok(None)
234+
}
235+
220236
async fn sync_transaction_unconfirmed(
221237
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
222238
) -> Result<(), Error> {
@@ -232,7 +248,7 @@ where
232248
if let Some(block_hash) = block_hash_opt {
233249
let block_status = client.get_block_status(&block_hash).await?;
234250
if block_status.in_best_chain {
235-
// Skip if the block in queestion is still confirmed.
251+
// Skip if the block in question is still confirmed.
236252
continue;
237253
}
238254
}
@@ -281,7 +297,14 @@ where
281297
}
282298
}
283299

284-
impl<D> FeeEstimator for LdkLiteChainAccess<D>
300+
struct ConfirmedTx {
301+
tx: Transaction,
302+
block_header: BlockHeader,
303+
block_height: u32,
304+
pos: usize,
305+
}
306+
307+
impl<D> FeeEstimator for ChainAccess<D>
285308
where
286309
D: BatchDatabase,
287310
{
@@ -314,11 +337,11 @@ where
314337
D: BatchDatabase,
315338
{
316339
fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
317-
self.queued_transactions.lock().unwrap().push(*txid);
340+
self.queued_transactions.lock().unwrap().insert(*txid);
318341
}
319342

320343
fn register_output(&self, output: WatchedOutput) {
321-
self.queued_outputs.lock().unwrap().push(output);
344+
self.queued_outputs.lock().unwrap().insert(output);
322345
}
323346
}
324347

0 commit comments

Comments
 (0)