Skip to content

Commit 623116a

Browse files
committed
Make everything async
We migrate BDK and other remaining blocking parts to async.
1 parent dc34988 commit 623116a

File tree

4 files changed

+129
-93
lines changed

4 files changed

+129
-93
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" }
1616
lightning-rapid-gossip-sync = { version = "0.0.110" }
1717

1818
#bdk = "0.20.0"
19-
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch="master", features = ["use-esplora-async", "key-value-db"]}
19+
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch = "master", default-features = false, features = ["async-interface","use-esplora-async", "key-value-db"]}
2020
bitcoin = "0.28.1"
2121

2222
rand = "0.8.5"

src/access.rs

+63-44
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,47 @@
11
use crate::logger::{
22
log_error, log_given_level, log_internal, log_trace, FilesystemLogger, Logger,
33
};
4-
use crate::Error;
4+
use crate::{Config, Error};
55

66
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
77
use lightning::chain::WatchedOutput;
88
use lightning::chain::{Confirm, Filter};
99

10-
use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx};
10+
use bdk::blockchain::{Blockchain, EsploraBlockchain};
1111
use bdk::database::BatchDatabase;
12+
use bdk::esplora_client;
1213
use bdk::wallet::AddressIndex;
1314
use bdk::{SignOptions, SyncOptions};
1415

15-
use bitcoin::{BlockHash, Script, Transaction, Txid};
16+
use bitcoin::{Script, Transaction, Txid};
1617

1718
use std::collections::HashSet;
18-
use std::sync::{Arc, Mutex};
19+
use std::sync::{Arc, Mutex, RwLock};
1920

2021
/// The minimum feerate we are allowed to send, as specify by LDK.
2122
const MIN_FEERATE: u32 = 253;
2223

24+
// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
25+
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
26+
const BDK_CLIENT_STOP_GAP: usize = 20;
27+
28+
// The number of concurrent requests made against the API provider.
29+
const BDK_CLIENT_CONCURRENCY: u8 = 8;
30+
2331
pub struct ChainAccess<D>
2432
where
2533
D: BatchDatabase,
2634
{
2735
blockchain: EsploraBlockchain,
36+
_client: Arc<esplora_client::AsyncClient>,
2837
wallet: Mutex<bdk::Wallet<D>>,
2938
queued_transactions: Mutex<Vec<Txid>>,
3039
watched_transactions: Mutex<Vec<Txid>>,
3140
queued_outputs: Mutex<Vec<WatchedOutput>>,
3241
watched_outputs: Mutex<Vec<WatchedOutput>>,
3342
last_sync_height: tokio::sync::Mutex<Option<u32>>,
43+
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
44+
_config: Arc<Config>,
3445
logger: Arc<FilesystemLogger>,
3546
}
3647

@@ -39,30 +50,49 @@ where
3950
D: BatchDatabase,
4051
{
4152
pub(crate) fn new(
42-
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
53+
wallet: bdk::Wallet<D>, config: Arc<Config>, logger: Arc<FilesystemLogger>,
4354
) -> Self {
4455
let wallet = Mutex::new(wallet);
4556
let watched_transactions = Mutex::new(Vec::new());
4657
let queued_transactions = Mutex::new(Vec::new());
4758
let watched_outputs = Mutex::new(Vec::new());
4859
let queued_outputs = Mutex::new(Vec::new());
4960
let last_sync_height = tokio::sync::Mutex::new(None);
61+
let tokio_runtime = RwLock::new(None);
62+
// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
63+
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
64+
let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
65+
.with_concurrency(BDK_CLIENT_CONCURRENCY);
66+
let client_builder =
67+
esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url));
68+
let client = Arc::new(client_builder.build_async().unwrap());
5069
Self {
5170
blockchain,
71+
_client: client,
5272
wallet,
5373
queued_transactions,
5474
watched_transactions,
5575
queued_outputs,
5676
watched_outputs,
5777
last_sync_height,
78+
tokio_runtime,
79+
_config: config,
5880
logger,
5981
}
6082
}
6183

84+
pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
85+
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
86+
}
87+
88+
pub(crate) fn drop_runtime(&self) {
89+
*self.tokio_runtime.write().unwrap() = None;
90+
}
91+
6292
pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
6393
let sync_options = SyncOptions { progress: None };
6494

65-
self.wallet.lock().unwrap().sync(&self.blockchain, sync_options)?;
95+
self.wallet.lock().unwrap().sync(&self.blockchain, sync_options).await?;
6696

6797
Ok(())
6898
}
@@ -237,11 +267,11 @@ where
237267
Ok(())
238268
}
239269

240-
pub(crate) fn create_funding_transaction(
270+
pub(crate) async fn create_funding_transaction(
241271
&self, output_script: &Script, value_sats: u64, confirmation_target: ConfirmationTarget,
242272
) -> Result<Transaction, Error> {
243273
let num_blocks = num_blocks_from_conf_target(confirmation_target);
244-
let fee_rate = self.blockchain.estimate_fee(num_blocks)?;
274+
let fee_rate = self.blockchain.estimate_fee(num_blocks).await?;
245275

246276
let locked_wallet = self.wallet.lock().unwrap();
247277
let mut tx_builder = locked_wallet.build_tx();
@@ -280,9 +310,18 @@ where
280310
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
281311
let num_blocks = num_blocks_from_conf_target(confirmation_target);
282312
let fallback_fee = fallback_fee_from_conf_target(confirmation_target);
283-
self.blockchain
284-
.estimate_fee(num_blocks)
285-
.map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32
313+
314+
let locked_runtime = self.tokio_runtime.read().unwrap();
315+
if locked_runtime.as_ref().is_none() {
316+
return fallback_fee;
317+
}
318+
319+
locked_runtime.as_ref().unwrap().block_on(async {
320+
self.blockchain
321+
.estimate_fee(num_blocks)
322+
.await
323+
.map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32
324+
})
286325
}
287326
}
288327

@@ -291,13 +330,20 @@ where
291330
D: BatchDatabase,
292331
{
293332
fn broadcast_transaction(&self, tx: &Transaction) {
294-
match self.blockchain.broadcast(tx) {
295-
Ok(_) => {}
296-
Err(err) => {
297-
log_error!(self.logger, "Failed to broadcast transaction: {}", err);
298-
panic!("Failed to broadcast transaction: {}", err);
299-
}
333+
let locked_runtime = self.tokio_runtime.read().unwrap();
334+
if locked_runtime.as_ref().is_none() {
335+
return;
300336
}
337+
338+
locked_runtime.as_ref().unwrap().block_on(async {
339+
match self.blockchain.broadcast(tx).await {
340+
Ok(_) => {}
341+
Err(err) => {
342+
log_error!(self.logger, "Failed to broadcast transaction: {}", err);
343+
panic!("Failed to broadcast transaction: {}", err);
344+
}
345+
}
346+
})
301347
}
302348
}
303349

@@ -315,33 +361,6 @@ where
315361
}
316362
}
317363

318-
impl<D> GetHeight for ChainAccess<D>
319-
where
320-
D: BatchDatabase,
321-
{
322-
fn get_height(&self) -> Result<u32, bdk::Error> {
323-
self.blockchain.get_height()
324-
}
325-
}
326-
327-
impl<D> GetBlockHash for ChainAccess<D>
328-
where
329-
D: BatchDatabase,
330-
{
331-
fn get_block_hash(&self, height: u64) -> Result<BlockHash, bdk::Error> {
332-
self.blockchain.get_block_hash(height)
333-
}
334-
}
335-
336-
impl<D> GetTx for ChainAccess<D>
337-
where
338-
D: BatchDatabase,
339-
{
340-
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, bdk::Error> {
341-
self.blockchain.get_tx(txid)
342-
}
343-
}
344-
345364
fn num_blocks_from_conf_target(confirmation_target: ConfirmationTarget) -> usize {
346365
match confirmation_target {
347366
ConfirmationTarget::Background => 12,

src/event.rs

+47-27
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
1818
use bitcoin::secp256k1::Secp256k1;
1919
use rand::{thread_rng, Rng};
2020
use std::collections::{hash_map, VecDeque};
21-
use std::sync::{Arc, Condvar, Mutex};
22-
use std::thread;
21+
use std::sync::{Arc, Condvar, Mutex, RwLock};
2322
use std::time::Duration;
2423

2524
/// The event queue will be persisted under this key.
@@ -221,6 +220,7 @@ pub(crate) struct EventHandler {
221220
keys_manager: Arc<KeysManager>,
222221
inbound_payments: Arc<PaymentInfoStorage>,
223222
outbound_payments: Arc<PaymentInfoStorage>,
223+
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
224224
logger: Arc<FilesystemLogger>,
225225
_config: Arc<Config>,
226226
}
@@ -233,6 +233,7 @@ impl EventHandler {
233233
inbound_payments: Arc<PaymentInfoStorage>, outbound_payments: Arc<PaymentInfoStorage>,
234234
logger: Arc<FilesystemLogger>, _config: Arc<Config>,
235235
) -> Self {
236+
let tokio_runtime = RwLock::new(None);
236237
Self {
237238
event_queue,
238239
chain_access,
@@ -241,10 +242,19 @@ impl EventHandler {
241242
keys_manager,
242243
inbound_payments,
243244
outbound_payments,
245+
tokio_runtime,
244246
logger,
245247
_config,
246248
}
247249
}
250+
251+
pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
252+
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
253+
}
254+
255+
pub(crate) fn drop_runtime(&self) {
256+
*self.tokio_runtime.write().unwrap() = None;
257+
}
248258
}
249259

250260
impl LdkEventHandler for EventHandler {
@@ -262,29 +272,36 @@ impl LdkEventHandler for EventHandler {
262272
let confirmation_target = ConfirmationTarget::Normal;
263273

264274
// Sign the final funding transaction and broadcast it.
265-
match self.chain_access.create_funding_transaction(
266-
&output_script,
267-
*channel_value_satoshis,
268-
confirmation_target,
269-
) {
270-
Ok(final_tx) => {
271-
// Give the funding transaction back to LDK for opening the channel.
272-
if self
273-
.channel_manager
274-
.funding_transaction_generated(
275-
&temporary_channel_id,
276-
counterparty_node_id,
277-
final_tx,
278-
)
279-
.is_err()
280-
{
281-
log_error!(self.logger, "Channel went away before we could fund it. The peer disconnected or refused the channel");
275+
let locked_runtime = self.tokio_runtime.read().unwrap();
276+
if locked_runtime.as_ref().is_none() {
277+
return;
278+
}
279+
280+
locked_runtime.as_ref().unwrap().block_on(async {
281+
match self.chain_access.create_funding_transaction(
282+
&output_script,
283+
*channel_value_satoshis,
284+
confirmation_target,
285+
).await {
286+
Ok(final_tx) => {
287+
// Give the funding transaction back to LDK for opening the channel.
288+
if self
289+
.channel_manager
290+
.funding_transaction_generated(
291+
&temporary_channel_id,
292+
counterparty_node_id,
293+
final_tx,
294+
)
295+
.is_err()
296+
{
297+
log_error!(self.logger, "Channel went away before we could fund it. The peer disconnected or refused the channel");
298+
}
299+
}
300+
Err(err) => {
301+
log_error!(self.logger, "Failed to create funding transaction: {}", err);
282302
}
283303
}
284-
Err(err) => {
285-
log_error!(self.logger, "Failed to create funding transaction: {}", err);
286-
}
287-
}
304+
});
288305
}
289306
LdkEvent::PaymentReceived { payment_hash, purpose, amount_msat } => {
290307
log_info!(
@@ -387,11 +404,14 @@ impl LdkEventHandler for EventHandler {
387404
let forwarding_channel_manager = self.channel_manager.clone();
388405
let min = time_forwardable.as_millis() as u64;
389406

390-
// TODO: any way we still can use tokio here?
391-
// TODO: stop this thread on shutdown
392-
thread::spawn(move || {
407+
let locked_runtime = self.tokio_runtime.read().unwrap();
408+
if locked_runtime.as_ref().is_none() {
409+
return;
410+
}
411+
412+
locked_runtime.as_ref().unwrap().spawn(async move {
393413
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
394-
thread::sleep(Duration::from_millis(millis_to_sleep));
414+
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
395415
forwarding_channel_manager.process_pending_htlc_forwards();
396416
});
397417
}

0 commit comments

Comments
 (0)