Skip to content

Commit f242dff

Browse files
committed
f Make async work. Ugly though.
1 parent ed9e7d0 commit f242dff

File tree

2 files changed

+115
-44
lines changed

2 files changed

+115
-44
lines changed

src/access.rs

Lines changed: 105 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,47 @@ use crate::logger::{
44
log_error, log_given_level, log_info, log_internal, log_trace, log_warn, FilesystemLogger,
55
Logger,
66
};
7+
use crate::{scid_utils, LdkLiteConfig};
78

89
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
910
use lightning::chain::WatchedOutput;
10-
use lightning::chain::{Confirm, Filter};
11+
use lightning::chain::{Access, AccessError, Confirm, Filter};
1112

1213
use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx};
1314
use bdk::database::BatchDatabase;
15+
use bdk::esplora_client;
1416
use bdk::wallet::AddressIndex;
1517
use bdk::{SignOptions, SyncOptions};
1618

17-
use bitcoin::{BlockHash, Script, Transaction, Txid};
19+
use bitcoin::{BlockHash, Script, Transaction, TxOut, Txid};
1820

1921
use std::collections::HashSet;
20-
use std::sync::{Arc, Mutex};
22+
use std::sync::{Arc, Mutex, RwLock};
2123

2224
/// The minimum feerate we are allowed to send, as specify by LDK.
2325
const MIN_FEERATE: u32 = 253;
2426

27+
// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
28+
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
29+
const BDK_CLIENT_STOP_GAP: usize = 20;
30+
31+
// The number of concurrent requests made against the API provider.
32+
const BDK_CLIENT_CONCURRENCY: u8 = 8;
33+
2534
pub struct LdkLiteChainAccess<D>
2635
where
2736
D: BatchDatabase,
2837
{
2938
blockchain: EsploraBlockchain,
39+
client: Arc<esplora_client::AsyncClient>,
3040
wallet: Mutex<bdk::Wallet<D>>,
3141
queued_transactions: Mutex<Vec<Txid>>,
3242
watched_transactions: Mutex<Vec<Txid>>,
3343
queued_outputs: Mutex<Vec<WatchedOutput>>,
3444
watched_outputs: Mutex<Vec<WatchedOutput>>,
3545
last_sync_height: tokio::sync::Mutex<Option<u32>>,
46+
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
47+
config: Arc<LdkLiteConfig>,
3648
logger: Arc<FilesystemLogger>,
3749
}
3850

@@ -41,26 +53,45 @@ where
4153
D: BatchDatabase,
4254
{
4355
pub(crate) fn new(
44-
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
56+
wallet: bdk::Wallet<D>, config: Arc<LdkLiteConfig>, logger: Arc<FilesystemLogger>,
4557
) -> Self {
4658
let wallet = Mutex::new(wallet);
4759
let watched_transactions = Mutex::new(Vec::new());
4860
let queued_transactions = Mutex::new(Vec::new());
4961
let watched_outputs = Mutex::new(Vec::new());
5062
let queued_outputs = Mutex::new(Vec::new());
5163
let last_sync_height = tokio::sync::Mutex::new(None);
64+
let tokio_runtime = RwLock::new(None);
65+
// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
66+
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
67+
let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
68+
.with_concurrency(BDK_CLIENT_CONCURRENCY);
69+
let client_builder =
70+
esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url));
71+
let client = Arc::new(client_builder.build_async().unwrap());
5272
Self {
5373
blockchain,
74+
client,
5475
wallet,
5576
queued_transactions,
5677
watched_transactions,
5778
queued_outputs,
5879
watched_outputs,
5980
last_sync_height,
81+
tokio_runtime,
82+
config,
6083
logger,
6184
}
6285
}
6386

87+
pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
88+
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
89+
}
90+
91+
pub(crate) fn drop_runtime(&self) {
92+
*self.tokio_runtime.write().unwrap() = None;
93+
}
94+
6495
pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
6596
let sync_options = SyncOptions { progress: None };
6697

@@ -74,9 +105,7 @@ where
74105
}
75106

76107
pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
77-
let client = &*self.blockchain;
78-
79-
let cur_height = client.get_height().await?;
108+
let cur_height = self.client.get_height().await?;
80109

81110
let mut locked_last_sync_height = self.last_sync_height.lock().await;
82111
if cur_height >= locked_last_sync_height.unwrap_or(0) {
@@ -93,10 +122,8 @@ where
93122
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, cur_height: u32,
94123
locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option<u32>>,
95124
) -> Result<(), Error> {
96-
let client = &*self.blockchain;
97-
98125
// Inform the interface of the new block.
99-
let cur_block_header = client.get_header(cur_height).await?;
126+
let cur_block_header = self.client.get_header(cur_height).await?;
100127
for c in confirmables {
101128
c.best_block_updated(&cur_block_header, cur_height);
102129
}
@@ -108,8 +135,6 @@ where
108135
async fn sync_transactions_confirmed(
109136
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
110137
) -> Result<(), Error> {
111-
let client = &*self.blockchain;
112-
113138
// First, check the confirmation status of registered transactions as well as the
114139
// status of dependent transactions of registered outputs.
115140

@@ -131,12 +156,12 @@ where
131156
let mut unconfirmed_registered_txs = Vec::new();
132157

133158
for txid in registered_txs {
134-
if let Some(tx_status) = client.get_tx_status(&txid).await? {
159+
if let Some(tx_status) = self.client.get_tx_status(&txid).await? {
135160
if tx_status.confirmed {
136-
if let Some(tx) = client.get_tx(&txid).await? {
161+
if let Some(tx) = self.client.get_tx(&txid).await? {
137162
if let Some(block_height) = tx_status.block_height {
138-
let block_header = client.get_header(block_height).await?;
139-
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
163+
let block_header = self.client.get_header(block_height).await?;
164+
if let Some(merkle_proof) = self.client.get_merkle_proof(&txid).await? {
140165
confirmed_txs.push((
141166
tx,
142167
block_height,
@@ -163,19 +188,20 @@ where
163188
let mut unspent_registered_outputs = Vec::new();
164189

165190
for output in registered_outputs {
166-
if let Some(output_status) = client
191+
if let Some(output_status) = self
192+
.client
167193
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
168194
.await?
169195
{
170196
if output_status.spent {
171197
if let Some(spending_tx_status) = output_status.status {
172198
if spending_tx_status.confirmed {
173199
let spending_txid = output_status.txid.unwrap();
174-
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
200+
if let Some(spending_tx) = self.client.get_tx(&spending_txid).await? {
175201
let block_height = spending_tx_status.block_height.unwrap();
176-
let block_header = client.get_header(block_height).await?;
202+
let block_header = self.client.get_header(block_height).await?;
177203
if let Some(merkle_proof) =
178-
client.get_merkle_proof(&spending_txid).await?
204+
self.client.get_merkle_proof(&spending_txid).await?
179205
{
180206
confirmed_txs.push((
181207
spending_tx,
@@ -217,13 +243,13 @@ where
217243
async fn sync_transaction_unconfirmed(
218244
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
219245
) -> Result<(), Error> {
220-
let client = &*self.blockchain;
221246
// Query the interface for relevant txids and check whether they have been
222247
// reorged-out of the chain.
223248
let relevant_txids =
224249
confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::<HashSet<Txid>>();
225250
for txid in relevant_txids {
226-
let tx_unconfirmed = client
251+
let tx_unconfirmed = self
252+
.client
227253
.get_tx_status(&txid)
228254
.await
229255
.ok()
@@ -300,6 +326,63 @@ where
300326
}
301327
}
302328

329+
impl<D> Access for LdkLiteChainAccess<D>
330+
where
331+
D: BatchDatabase,
332+
{
333+
fn get_utxo(
334+
&self, genesis_hash: &BlockHash, short_channel_id: u64,
335+
) -> Result<TxOut, AccessError> {
336+
if genesis_hash
337+
!= &bitcoin::blockdata::constants::genesis_block(self.config.network)
338+
.header
339+
.block_hash()
340+
{
341+
return Err(AccessError::UnknownChain);
342+
}
343+
344+
let locked_runtime = self.tokio_runtime.read().unwrap();
345+
if locked_runtime.as_ref().is_none() {
346+
return Err(AccessError::UnknownTx);
347+
}
348+
349+
let block_height = scid_utils::block_from_scid(&short_channel_id);
350+
let tx_index = scid_utils::tx_index_from_scid(&short_channel_id);
351+
let vout = scid_utils::vout_from_scid(&short_channel_id);
352+
353+
let block_hash = self
354+
.blockchain
355+
.get_block_hash(block_height.into())
356+
.map_err(|_| AccessError::UnknownTx)?;
357+
358+
let tokio_client = Arc::clone(&self.client);
359+
let txout_opt: Arc<Mutex<Option<TxOut>>> = Arc::new(Mutex::new(None));
360+
let txout_opt_tokio = Arc::clone(&txout_opt);
361+
362+
locked_runtime.as_ref().unwrap().spawn(async move {
363+
let txid_res =
364+
tokio_client.get_txid_at_block_index(&block_hash, tx_index as usize).await;
365+
366+
if let Some(txid) = txid_res.unwrap_or(None) {
367+
let tx_res = tokio_client.get_tx(&txid).await;
368+
369+
if let Some(tx) = tx_res.unwrap_or(None) {
370+
if let Some(tx_out) = tx.output.get(vout as usize) {
371+
*txout_opt_tokio.lock().unwrap() = Some(tx_out.clone());
372+
}
373+
}
374+
}
375+
});
376+
377+
let locked_opt = txout_opt.lock().unwrap();
378+
if let Some(tx_out) = &*locked_opt {
379+
return Ok(tx_out.clone());
380+
} else {
381+
return Err(AccessError::UnknownTx);
382+
}
383+
}
384+
}
385+
303386
impl<D> Filter for LdkLiteChainAccess<D>
304387
where
305388
D: BatchDatabase,

src/lib.rs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ mod error;
2929
pub mod event;
3030
mod hex_utils;
3131
mod io_utils;
32-
mod scid_utils;
3332
mod logger;
3433
mod peer_store;
34+
mod scid_utils;
3535

3636
use access::LdkLiteChainAccess;
3737
pub use error::LdkLiteError as Error;
@@ -70,7 +70,6 @@ use lightning_invoice::utils::DefaultRouter;
7070
use lightning_invoice::{payment, Currency, Invoice};
7171

7272
use bdk::bitcoin::secp256k1::Secp256k1;
73-
use bdk::blockchain::esplora::EsploraBlockchain;
7473
use bdk::blockchain::{GetBlockHash, GetHeight};
7574
use bdk::sled;
7675
use bdk::template::Bip84;
@@ -90,13 +89,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
9089
use std::sync::{Arc, Mutex, RwLock};
9190
use std::time::{Duration, Instant, SystemTime};
9291

93-
// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
94-
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
95-
const BDK_CLIENT_STOP_GAP: usize = 20;
96-
97-
// The number of concurrent requests made against the API provider.
98-
const BDK_CLIENT_CONCURRENCY: u8 = 8;
99-
10092
// The timeout after which we abandon retrying failed payments.
10193
const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
10294

@@ -223,17 +215,8 @@ impl LdkLiteBuilder {
223215
database,
224216
)?;
225217

226-
// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
227-
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
228-
let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
229-
.with_concurrency(BDK_CLIENT_CONCURRENCY);
230-
231-
let chain_access = Arc::new(LdkLiteChainAccess::new(
232-
blockchain,
233-
bdk_wallet,
234-
Arc::clone(&config),
235-
Arc::clone(&logger),
236-
));
218+
let chain_access =
219+
Arc::new(LdkLiteChainAccess::new(bdk_wallet, Arc::clone(&config), Arc::clone(&logger)));
237220

238221
// Step 3: Initialize Persist
239222
let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone()));
@@ -412,7 +395,7 @@ impl LdkLiteBuilder {
412395
/// Wraps all objects that need to be preserved during the run time of [`LdkLite`]. Will be dropped
413396
/// upon [`LdkLite::stop()`].
414397
struct LdkLiteRuntime {
415-
tokio_runtime: tokio::runtime::Runtime,
398+
tokio_runtime: Arc<tokio::runtime::Runtime>,
416399
_background_processor: BackgroundProcessor,
417400
stop_networking: Arc<AtomicBool>,
418401
stop_wallet_sync: Arc<AtomicBool>,
@@ -473,14 +456,19 @@ impl LdkLite {
473456
runtime.stop_networking.store(true, Ordering::Release);
474457
self.peer_manager.disconnect_all_peers();
475458

459+
// Drop the chain access held runtime.
460+
self.chain_access.drop_runtime();
461+
476462
// Drop the runtime, which stops the background processor and any possibly remaining tokio threads.
477463
*run_lock = None;
478464
Ok(())
479465
}
480466

481467
fn setup_runtime(&self) -> Result<LdkLiteRuntime, Error> {
482468
let tokio_runtime =
483-
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
469+
Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());
470+
471+
self.chain_access.set_runtime(Arc::clone(&tokio_runtime));
484472

485473
// Setup wallet sync
486474
let chain_access = Arc::clone(&self.chain_access);

0 commit comments

Comments
 (0)