Skip to content

Commit ed49984

Browse files
committed
Make everything async
We migrate BDK and other remaining blocking parts to async.
1 parent e82b5f7 commit ed49984

File tree

5 files changed

+153
-99
lines changed

5 files changed

+153
-99
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" }
1616
lightning-rapid-gossip-sync = { version = "0.0.110" }
1717

1818
#bdk = "0.20.0"
19-
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch="master", features = ["use-esplora-async", "key-value-db"]}
19+
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch = "master", default-features = false, features = ["async-interface","use-esplora-async", "key-value-db"]}
2020
bitcoin = "0.28.1"
2121

2222
rand = "0.8.5"

src/access.rs

+107-64
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,47 @@
11
use crate::logger::{
22
log_error, log_given_level, log_internal, log_trace, FilesystemLogger, Logger,
33
};
4-
use crate::Error;
4+
use crate::{Config, Error};
55

66
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
77
use lightning::chain::WatchedOutput;
88
use lightning::chain::{Confirm, Filter};
99

10-
use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx};
10+
use bdk::blockchain::EsploraBlockchain;
1111
use bdk::database::BatchDatabase;
12+
use bdk::esplora_client;
1213
use bdk::wallet::AddressIndex;
13-
use bdk::{SignOptions, SyncOptions};
14+
use bdk::{FeeRate, SignOptions, SyncOptions};
1415

15-
use bitcoin::{BlockHash, Script, Transaction, Txid};
16+
use bitcoin::{Script, Transaction, Txid};
1617

1718
use std::collections::HashSet;
18-
use std::sync::{Arc, Mutex};
19+
use std::sync::{mpsc, Arc, Mutex, RwLock};
1920

2021
/// The minimum feerate we are allowed to send, as specify by LDK.
2122
const MIN_FEERATE: u32 = 253;
2223

24+
// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
25+
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
26+
const BDK_CLIENT_STOP_GAP: usize = 20;
27+
28+
// The number of concurrent requests made against the API provider.
29+
const BDK_CLIENT_CONCURRENCY: u8 = 8;
30+
2331
pub struct ChainAccess<D>
2432
where
2533
D: BatchDatabase,
2634
{
27-
blockchain: EsploraBlockchain,
35+
blockchain: Arc<EsploraBlockchain>,
36+
client: Arc<esplora_client::AsyncClient>,
2837
wallet: Mutex<bdk::Wallet<D>>,
2938
queued_transactions: Mutex<Vec<Txid>>,
3039
watched_transactions: Mutex<Vec<Txid>>,
3140
queued_outputs: Mutex<Vec<WatchedOutput>>,
3241
watched_outputs: Mutex<Vec<WatchedOutput>>,
3342
last_sync_height: tokio::sync::Mutex<Option<u32>>,
43+
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
44+
_config: Arc<Config>,
3445
logger: Arc<FilesystemLogger>,
3546
}
3647

@@ -39,38 +50,57 @@ where
3950
D: BatchDatabase,
4051
{
4152
pub(crate) fn new(
42-
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
53+
wallet: bdk::Wallet<D>, config: Arc<Config>, logger: Arc<FilesystemLogger>,
4354
) -> Self {
4455
let wallet = Mutex::new(wallet);
4556
let watched_transactions = Mutex::new(Vec::new());
4657
let queued_transactions = Mutex::new(Vec::new());
4758
let watched_outputs = Mutex::new(Vec::new());
4859
let queued_outputs = Mutex::new(Vec::new());
4960
let last_sync_height = tokio::sync::Mutex::new(None);
61+
let tokio_runtime = RwLock::new(None);
62+
// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
63+
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
64+
let blockchain = Arc::new(
65+
EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
66+
.with_concurrency(BDK_CLIENT_CONCURRENCY),
67+
);
68+
let client_builder =
69+
esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url));
70+
let client = Arc::new(client_builder.build_async().unwrap());
5071
Self {
5172
blockchain,
73+
client,
5274
wallet,
5375
queued_transactions,
5476
watched_transactions,
5577
queued_outputs,
5678
watched_outputs,
5779
last_sync_height,
80+
tokio_runtime,
81+
_config: config,
5882
logger,
5983
}
6084
}
6185

86+
pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
87+
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
88+
}
89+
90+
pub(crate) fn drop_runtime(&self) {
91+
*self.tokio_runtime.write().unwrap() = None;
92+
}
93+
6294
pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
6395
let sync_options = SyncOptions { progress: None };
6496

65-
self.wallet.lock().unwrap().sync(&self.blockchain, sync_options)?;
97+
self.wallet.lock().unwrap().sync(&self.blockchain, sync_options).await?;
6698

6799
Ok(())
68100
}
69101

70-
pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
71-
let client = &*self.blockchain;
72-
73-
let cur_height = client.get_height().await?;
102+
pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Send + Sync)>) -> Result<(), Error> {
103+
let cur_height = self.client.get_height().await?;
74104

75105
let mut locked_last_sync_height = self.last_sync_height.lock().await;
76106
if cur_height >= locked_last_sync_height.unwrap_or(0) {
@@ -84,13 +114,11 @@ where
84114
}
85115

86116
async fn sync_best_block_updated(
87-
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, cur_height: u32,
117+
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, cur_height: u32,
88118
locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option<u32>>,
89119
) -> Result<(), Error> {
90-
let client = &*self.blockchain;
91-
92120
// Inform the interface of the new block.
93-
let cur_block_header = client.get_header(cur_height).await?;
121+
let cur_block_header = self.client.get_header(cur_height).await?;
94122
for c in confirmables {
95123
c.best_block_updated(&cur_block_header, cur_height);
96124
}
@@ -100,10 +128,8 @@ where
100128
}
101129

102130
async fn sync_transactions_confirmed(
103-
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
131+
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>,
104132
) -> Result<(), Error> {
105-
let client = &*self.blockchain;
106-
107133
// First, check the confirmation status of registered transactions as well as the
108134
// status of dependent transactions of registered outputs.
109135

@@ -125,13 +151,13 @@ where
125151
let mut unconfirmed_registered_txs = Vec::new();
126152

127153
for txid in registered_txs {
128-
if let Some(tx_status) = client.get_tx_status(&txid).await? {
154+
if let Some(tx_status) = self.client.get_tx_status(&txid).await? {
129155
if tx_status.confirmed {
130-
if let Some(tx) = client.get_tx(&txid).await? {
156+
if let Some(tx) = self.client.get_tx(&txid).await? {
131157
if let Some(block_height) = tx_status.block_height {
132158
// TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17)
133-
let block_header = client.get_header(block_height).await?;
134-
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
159+
let block_header = self.client.get_header(block_height).await?;
160+
if let Some(merkle_proof) = self.client.get_merkle_proof(&txid).await? {
135161
if block_height == merkle_proof.block_height {
136162
confirmed_txs.push((
137163
tx,
@@ -160,20 +186,21 @@ where
160186
let mut unspent_registered_outputs = Vec::new();
161187

162188
for output in registered_outputs {
163-
if let Some(output_status) = client
189+
if let Some(output_status) = self
190+
.client
164191
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
165192
.await?
166193
{
167194
if output_status.spent {
168195
if let Some(spending_tx_status) = output_status.status {
169196
if spending_tx_status.confirmed {
170197
let spending_txid = output_status.txid.unwrap();
171-
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
198+
if let Some(spending_tx) = self.client.get_tx(&spending_txid).await? {
172199
let block_height = spending_tx_status.block_height.unwrap();
173200
// TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17)
174-
let block_header = client.get_header(block_height).await?;
201+
let block_header = self.client.get_header(block_height).await?;
175202
if let Some(merkle_proof) =
176-
client.get_merkle_proof(&spending_txid).await?
203+
self.client.get_merkle_proof(&spending_txid).await?
177204
{
178205
confirmed_txs.push((
179206
spending_tx,
@@ -213,15 +240,15 @@ where
213240
}
214241

215242
async fn sync_transaction_unconfirmed(
216-
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
243+
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>,
217244
) -> Result<(), Error> {
218-
let client = &*self.blockchain;
219245
// Query the interface for relevant txids and check whether they have been
220246
// reorged-out of the chain.
221247
let relevant_txids =
222248
confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::<HashSet<Txid>>();
223249
for txid in relevant_txids {
224-
let tx_unconfirmed = client
250+
let tx_unconfirmed = self
251+
.client
225252
.get_tx_status(&txid)
226253
.await
227254
.ok()
@@ -240,12 +267,14 @@ where
240267
pub(crate) fn create_funding_transaction(
241268
&self, output_script: &Script, value_sats: u64, confirmation_target: ConfirmationTarget,
242269
) -> Result<Transaction, Error> {
243-
let num_blocks = num_blocks_from_conf_target(confirmation_target);
244-
let fee_rate = self.blockchain.estimate_fee(num_blocks)?;
245-
246270
let locked_wallet = self.wallet.lock().unwrap();
247271
let mut tx_builder = locked_wallet.build_tx();
248272

273+
let fallback_fee = fallback_fee_from_conf_target(confirmation_target);
274+
let fee_rate = self
275+
.estimate_fee(confirmation_target)
276+
.unwrap_or(FeeRate::from_sat_per_kwu(fallback_fee as f32));
277+
249278
tx_builder.add_recipient(output_script.clone(), value_sats).fee_rate(fee_rate).enable_rbf();
250279

251280
let (mut psbt, _) = tx_builder.finish()?;
@@ -271,17 +300,43 @@ where
271300
let address_info = self.wallet.lock().unwrap().get_address(AddressIndex::New)?;
272301
Ok(address_info.address)
273302
}
303+
304+
fn estimate_fee(&self, confirmation_target: ConfirmationTarget) -> Result<bdk::FeeRate, Error> {
305+
let num_blocks = num_blocks_from_conf_target(confirmation_target);
306+
307+
let locked_runtime = self.tokio_runtime.read().unwrap();
308+
if locked_runtime.as_ref().is_none() {
309+
return Err(Error::FeeEstimationFailed);
310+
}
311+
312+
let tokio_client = Arc::clone(&self.client);
313+
let (sender, receiver) = mpsc::sync_channel(1);
314+
315+
locked_runtime.as_ref().unwrap().spawn(async move {
316+
let res = tokio_client.get_fee_estimates().await;
317+
let _ = sender.send(res);
318+
});
319+
320+
let estimates = receiver
321+
.recv()
322+
.map_err(|_| Error::FeeEstimationFailed)?
323+
.map_err(|_| Error::FeeEstimationFailed)?;
324+
325+
Ok(bdk::FeeRate::from_sat_per_vb(
326+
esplora_client::convert_fee_rate(num_blocks, estimates)
327+
.map_err(|_| Error::FeeEstimationFailed)?,
328+
))
329+
}
274330
}
275331

276332
impl<D> FeeEstimator for ChainAccess<D>
277333
where
278334
D: BatchDatabase,
279335
{
280336
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
281-
let num_blocks = num_blocks_from_conf_target(confirmation_target);
282337
let fallback_fee = fallback_fee_from_conf_target(confirmation_target);
283-
self.blockchain
284-
.estimate_fee(num_blocks)
338+
339+
self.estimate_fee(confirmation_target)
285340
.map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32
286341
}
287342
}
@@ -291,7 +346,22 @@ where
291346
D: BatchDatabase,
292347
{
293348
fn broadcast_transaction(&self, tx: &Transaction) {
294-
match self.blockchain.broadcast(tx) {
349+
let locked_runtime = self.tokio_runtime.read().unwrap();
350+
if locked_runtime.as_ref().is_none() {
351+
log_error!(self.logger, "Failed to broadcast transaction: No runtime.");
352+
return;
353+
}
354+
355+
let tokio_client = Arc::clone(&self.client);
356+
let tokio_tx = tx.clone();
357+
let (sender, receiver) = mpsc::sync_channel(1);
358+
359+
locked_runtime.as_ref().unwrap().spawn(async move {
360+
let res = tokio_client.broadcast(&tokio_tx).await;
361+
let _ = sender.send(res);
362+
});
363+
364+
match receiver.recv().unwrap() {
295365
Ok(_) => {}
296366
Err(err) => {
297367
log_error!(self.logger, "Failed to broadcast transaction: {}", err);
@@ -315,33 +385,6 @@ where
315385
}
316386
}
317387

318-
impl<D> GetHeight for ChainAccess<D>
319-
where
320-
D: BatchDatabase,
321-
{
322-
fn get_height(&self) -> Result<u32, bdk::Error> {
323-
self.blockchain.get_height()
324-
}
325-
}
326-
327-
impl<D> GetBlockHash for ChainAccess<D>
328-
where
329-
D: BatchDatabase,
330-
{
331-
fn get_block_hash(&self, height: u64) -> Result<BlockHash, bdk::Error> {
332-
self.blockchain.get_block_hash(height)
333-
}
334-
}
335-
336-
impl<D> GetTx for ChainAccess<D>
337-
where
338-
D: BatchDatabase,
339-
{
340-
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, bdk::Error> {
341-
self.blockchain.get_tx(txid)
342-
}
343-
}
344-
345388
fn num_blocks_from_conf_target(confirmation_target: ConfirmationTarget) -> usize {
346389
match confirmation_target {
347390
ConfirmationTarget::Background => 12,

src/error.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub enum Error {
1010
NotRunning,
1111
/// The funding transaction could not be created.
1212
FundingTxCreationFailed,
13+
/// Returned when we could not estimate a transaction fee.
14+
FeeEstimationFailed,
1315
/// A network connection has been closed.
1416
ConnectionFailed,
1517
/// Payment of the given invoice has already been intiated.
@@ -41,9 +43,8 @@ impl fmt::Display for Error {
4143
match *self {
4244
Self::AlreadyRunning => write!(f, "LDKLite is already running."),
4345
Self::NotRunning => write!(f, "LDKLite is not running."),
44-
Self::FundingTxCreationFailed => {
45-
write!(f, "Funding transaction could not be created.")
46-
}
46+
Self::FundingTxCreationFailed => write!(f, "Funding transaction could not be created."),
47+
Self::FeeEstimationFailed => write!(f, "Fee estimation failed."),
4748
Self::ConnectionFailed => write!(f, "Network connection closed."),
4849
Self::NonUniquePaymentHash => write!(f, "An invoice must not get payed twice."),
4950
Self::InvoiceInvalid => write!(f, "The given invoice is invalid."),

0 commit comments

Comments
 (0)