Skip to content

Commit a6174dc

Browse files
committed
Make everything async
We migrate BDK and other remaining blocking parts to async.
1 parent e82b5f7 commit a6174dc

File tree

5 files changed

+155
-99
lines changed

5 files changed

+155
-99
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" }
1616
lightning-rapid-gossip-sync = { version = "0.0.110" }
1717

1818
#bdk = "0.20.0"
19-
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch="master", features = ["use-esplora-async", "key-value-db"]}
19+
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch = "master", default-features = false, features = ["async-interface","use-esplora-async", "key-value-db"]}
2020
bitcoin = "0.28.1"
2121

2222
rand = "0.8.5"

src/access.rs

+109-64
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,47 @@
11
use crate::logger::{
22
log_error, log_given_level, log_internal, log_trace, FilesystemLogger, Logger,
33
};
4-
use crate::Error;
4+
use crate::{Config, Error};
55

66
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
77
use lightning::chain::WatchedOutput;
88
use lightning::chain::{Confirm, Filter};
99

10-
use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx};
10+
use bdk::blockchain::EsploraBlockchain;
1111
use bdk::database::BatchDatabase;
12+
use bdk::esplora_client;
1213
use bdk::wallet::AddressIndex;
13-
use bdk::{SignOptions, SyncOptions};
14+
use bdk::{FeeRate, SignOptions, SyncOptions};
1415

15-
use bitcoin::{BlockHash, Script, Transaction, Txid};
16+
use bitcoin::{Script, Transaction, Txid};
1617

1718
use std::collections::HashSet;
18-
use std::sync::{Arc, Mutex};
19+
use std::sync::{mpsc, Arc, Mutex, RwLock};
1920

2021
/// The minimum feerate we are allowed to send, as specify by LDK.
2122
const MIN_FEERATE: u32 = 253;
2223

24+
// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
25+
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
26+
const BDK_CLIENT_STOP_GAP: usize = 20;
27+
28+
// The number of concurrent requests made against the API provider.
29+
const BDK_CLIENT_CONCURRENCY: u8 = 8;
30+
2331
pub struct ChainAccess<D>
2432
where
2533
D: BatchDatabase,
2634
{
27-
blockchain: EsploraBlockchain,
35+
blockchain: Arc<EsploraBlockchain>,
36+
client: Arc<esplora_client::AsyncClient>,
2837
wallet: Mutex<bdk::Wallet<D>>,
2938
queued_transactions: Mutex<Vec<Txid>>,
3039
watched_transactions: Mutex<Vec<Txid>>,
3140
queued_outputs: Mutex<Vec<WatchedOutput>>,
3241
watched_outputs: Mutex<Vec<WatchedOutput>>,
3342
last_sync_height: tokio::sync::Mutex<Option<u32>>,
43+
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
44+
_config: Arc<Config>,
3445
logger: Arc<FilesystemLogger>,
3546
}
3647

@@ -39,38 +50,59 @@ where
3950
D: BatchDatabase,
4051
{
4152
pub(crate) fn new(
42-
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
53+
wallet: bdk::Wallet<D>, config: Arc<Config>, logger: Arc<FilesystemLogger>,
4354
) -> Self {
4455
let wallet = Mutex::new(wallet);
4556
let watched_transactions = Mutex::new(Vec::new());
4657
let queued_transactions = Mutex::new(Vec::new());
4758
let watched_outputs = Mutex::new(Vec::new());
4859
let queued_outputs = Mutex::new(Vec::new());
4960
let last_sync_height = tokio::sync::Mutex::new(None);
61+
let tokio_runtime = RwLock::new(None);
62+
// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
63+
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
64+
let blockchain = Arc::new(
65+
EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
66+
.with_concurrency(BDK_CLIENT_CONCURRENCY),
67+
);
68+
let client_builder =
69+
esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url));
70+
let client = Arc::new(client_builder.build_async().unwrap());
5071
Self {
5172
blockchain,
73+
client,
5274
wallet,
5375
queued_transactions,
5476
watched_transactions,
5577
queued_outputs,
5678
watched_outputs,
5779
last_sync_height,
80+
tokio_runtime,
81+
_config: config,
5882
logger,
5983
}
6084
}
6185

86+
pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
87+
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
88+
}
89+
90+
pub(crate) fn drop_runtime(&self) {
91+
*self.tokio_runtime.write().unwrap() = None;
92+
}
93+
6294
pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
6395
let sync_options = SyncOptions { progress: None };
6496

65-
self.wallet.lock().unwrap().sync(&self.blockchain, sync_options)?;
97+
self.wallet.lock().unwrap().sync(&self.blockchain, sync_options).await?;
6698

6799
Ok(())
68100
}
69101

70-
pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
71-
let client = &*self.blockchain;
72-
73-
let cur_height = client.get_height().await?;
102+
pub(crate) async fn sync(
103+
&self, confirmables: Vec<&(dyn Confirm + Send + Sync)>,
104+
) -> Result<(), Error> {
105+
let cur_height = self.client.get_height().await?;
74106

75107
let mut locked_last_sync_height = self.last_sync_height.lock().await;
76108
if cur_height >= locked_last_sync_height.unwrap_or(0) {
@@ -84,13 +116,11 @@ where
84116
}
85117

86118
async fn sync_best_block_updated(
87-
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, cur_height: u32,
119+
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, cur_height: u32,
88120
locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option<u32>>,
89121
) -> Result<(), Error> {
90-
let client = &*self.blockchain;
91-
92122
// Inform the interface of the new block.
93-
let cur_block_header = client.get_header(cur_height).await?;
123+
let cur_block_header = self.client.get_header(cur_height).await?;
94124
for c in confirmables {
95125
c.best_block_updated(&cur_block_header, cur_height);
96126
}
@@ -100,10 +130,8 @@ where
100130
}
101131

102132
async fn sync_transactions_confirmed(
103-
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
133+
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>,
104134
) -> Result<(), Error> {
105-
let client = &*self.blockchain;
106-
107135
// First, check the confirmation status of registered transactions as well as the
108136
// status of dependent transactions of registered outputs.
109137

@@ -125,13 +153,13 @@ where
125153
let mut unconfirmed_registered_txs = Vec::new();
126154

127155
for txid in registered_txs {
128-
if let Some(tx_status) = client.get_tx_status(&txid).await? {
156+
if let Some(tx_status) = self.client.get_tx_status(&txid).await? {
129157
if tx_status.confirmed {
130-
if let Some(tx) = client.get_tx(&txid).await? {
158+
if let Some(tx) = self.client.get_tx(&txid).await? {
131159
if let Some(block_height) = tx_status.block_height {
132160
// TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17)
133-
let block_header = client.get_header(block_height).await?;
134-
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
161+
let block_header = self.client.get_header(block_height).await?;
162+
if let Some(merkle_proof) = self.client.get_merkle_proof(&txid).await? {
135163
if block_height == merkle_proof.block_height {
136164
confirmed_txs.push((
137165
tx,
@@ -160,20 +188,21 @@ where
160188
let mut unspent_registered_outputs = Vec::new();
161189

162190
for output in registered_outputs {
163-
if let Some(output_status) = client
191+
if let Some(output_status) = self
192+
.client
164193
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
165194
.await?
166195
{
167196
if output_status.spent {
168197
if let Some(spending_tx_status) = output_status.status {
169198
if spending_tx_status.confirmed {
170199
let spending_txid = output_status.txid.unwrap();
171-
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
200+
if let Some(spending_tx) = self.client.get_tx(&spending_txid).await? {
172201
let block_height = spending_tx_status.block_height.unwrap();
173202
// TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17)
174-
let block_header = client.get_header(block_height).await?;
203+
let block_header = self.client.get_header(block_height).await?;
175204
if let Some(merkle_proof) =
176-
client.get_merkle_proof(&spending_txid).await?
205+
self.client.get_merkle_proof(&spending_txid).await?
177206
{
178207
confirmed_txs.push((
179208
spending_tx,
@@ -213,15 +242,15 @@ where
213242
}
214243

215244
async fn sync_transaction_unconfirmed(
216-
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
245+
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>,
217246
) -> Result<(), Error> {
218-
let client = &*self.blockchain;
219247
// Query the interface for relevant txids and check whether they have been
220248
// reorged-out of the chain.
221249
let relevant_txids =
222250
confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::<HashSet<Txid>>();
223251
for txid in relevant_txids {
224-
let tx_unconfirmed = client
252+
let tx_unconfirmed = self
253+
.client
225254
.get_tx_status(&txid)
226255
.await
227256
.ok()
@@ -240,12 +269,14 @@ where
240269
pub(crate) fn create_funding_transaction(
241270
&self, output_script: &Script, value_sats: u64, confirmation_target: ConfirmationTarget,
242271
) -> Result<Transaction, Error> {
243-
let num_blocks = num_blocks_from_conf_target(confirmation_target);
244-
let fee_rate = self.blockchain.estimate_fee(num_blocks)?;
245-
246272
let locked_wallet = self.wallet.lock().unwrap();
247273
let mut tx_builder = locked_wallet.build_tx();
248274

275+
let fallback_fee = fallback_fee_from_conf_target(confirmation_target);
276+
let fee_rate = self
277+
.estimate_fee(confirmation_target)
278+
.unwrap_or(FeeRate::from_sat_per_kwu(fallback_fee as f32));
279+
249280
tx_builder.add_recipient(output_script.clone(), value_sats).fee_rate(fee_rate).enable_rbf();
250281

251282
let (mut psbt, _) = tx_builder.finish()?;
@@ -271,17 +302,43 @@ where
271302
let address_info = self.wallet.lock().unwrap().get_address(AddressIndex::New)?;
272303
Ok(address_info.address)
273304
}
305+
306+
fn estimate_fee(&self, confirmation_target: ConfirmationTarget) -> Result<bdk::FeeRate, Error> {
307+
let num_blocks = num_blocks_from_conf_target(confirmation_target);
308+
309+
let locked_runtime = self.tokio_runtime.read().unwrap();
310+
if locked_runtime.as_ref().is_none() {
311+
return Err(Error::FeeEstimationFailed);
312+
}
313+
314+
let tokio_client = Arc::clone(&self.client);
315+
let (sender, receiver) = mpsc::sync_channel(1);
316+
317+
locked_runtime.as_ref().unwrap().spawn(async move {
318+
let res = tokio_client.get_fee_estimates().await;
319+
let _ = sender.send(res);
320+
});
321+
322+
let estimates = receiver
323+
.recv()
324+
.map_err(|_| Error::FeeEstimationFailed)?
325+
.map_err(|_| Error::FeeEstimationFailed)?;
326+
327+
Ok(bdk::FeeRate::from_sat_per_vb(
328+
esplora_client::convert_fee_rate(num_blocks, estimates)
329+
.map_err(|_| Error::FeeEstimationFailed)?,
330+
))
331+
}
274332
}
275333

276334
impl<D> FeeEstimator for ChainAccess<D>
277335
where
278336
D: BatchDatabase,
279337
{
280338
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
281-
let num_blocks = num_blocks_from_conf_target(confirmation_target);
282339
let fallback_fee = fallback_fee_from_conf_target(confirmation_target);
283-
self.blockchain
284-
.estimate_fee(num_blocks)
340+
341+
self.estimate_fee(confirmation_target)
285342
.map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32
286343
}
287344
}
@@ -291,7 +348,22 @@ where
291348
D: BatchDatabase,
292349
{
293350
fn broadcast_transaction(&self, tx: &Transaction) {
294-
match self.blockchain.broadcast(tx) {
351+
let locked_runtime = self.tokio_runtime.read().unwrap();
352+
if locked_runtime.as_ref().is_none() {
353+
log_error!(self.logger, "Failed to broadcast transaction: No runtime.");
354+
return;
355+
}
356+
357+
let tokio_client = Arc::clone(&self.client);
358+
let tokio_tx = tx.clone();
359+
let (sender, receiver) = mpsc::sync_channel(1);
360+
361+
locked_runtime.as_ref().unwrap().spawn(async move {
362+
let res = tokio_client.broadcast(&tokio_tx).await;
363+
let _ = sender.send(res);
364+
});
365+
366+
match receiver.recv().unwrap() {
295367
Ok(_) => {}
296368
Err(err) => {
297369
log_error!(self.logger, "Failed to broadcast transaction: {}", err);
@@ -315,33 +387,6 @@ where
315387
}
316388
}
317389

318-
impl<D> GetHeight for ChainAccess<D>
319-
where
320-
D: BatchDatabase,
321-
{
322-
fn get_height(&self) -> Result<u32, bdk::Error> {
323-
self.blockchain.get_height()
324-
}
325-
}
326-
327-
impl<D> GetBlockHash for ChainAccess<D>
328-
where
329-
D: BatchDatabase,
330-
{
331-
fn get_block_hash(&self, height: u64) -> Result<BlockHash, bdk::Error> {
332-
self.blockchain.get_block_hash(height)
333-
}
334-
}
335-
336-
impl<D> GetTx for ChainAccess<D>
337-
where
338-
D: BatchDatabase,
339-
{
340-
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, bdk::Error> {
341-
self.blockchain.get_tx(txid)
342-
}
343-
}
344-
345390
fn num_blocks_from_conf_target(confirmation_target: ConfirmationTarget) -> usize {
346391
match confirmation_target {
347392
ConfirmationTarget::Background => 12,

src/error.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub enum Error {
1010
NotRunning,
1111
/// The funding transaction could not be created.
1212
FundingTxCreationFailed,
13+
/// Returned when we could not estimate a transaction fee.
14+
FeeEstimationFailed,
1315
/// A network connection has been closed.
1416
ConnectionFailed,
1517
/// Payment of the given invoice has already been intiated.
@@ -41,9 +43,8 @@ impl fmt::Display for Error {
4143
match *self {
4244
Self::AlreadyRunning => write!(f, "LDKLite is already running."),
4345
Self::NotRunning => write!(f, "LDKLite is not running."),
44-
Self::FundingTxCreationFailed => {
45-
write!(f, "Funding transaction could not be created.")
46-
}
46+
Self::FundingTxCreationFailed => write!(f, "Funding transaction could not be created."),
47+
Self::FeeEstimationFailed => write!(f, "Fee estimation failed."),
4748
Self::ConnectionFailed => write!(f, "Network connection closed."),
4849
Self::NonUniquePaymentHash => write!(f, "An invoice must not get payed twice."),
4950
Self::InvoiceInvalid => write!(f, "The given invoice is invalid."),

0 commit comments

Comments
 (0)