Skip to content
This repository was archived by the owner on Feb 3, 2025. It is now read-only.

Commit d25b388

Browse files
authored
Merge pull request #42 from benthecarman/chain-access
2 parents cbc78f5 + 40d81ad commit d25b388

File tree

6 files changed

+427
-48
lines changed

6 files changed

+427
-48
lines changed

node-manager/src/chain.rs

Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
use bitcoin::{BlockHash, BlockHeader, Script, Transaction, Txid};
2+
3+
use crate::error::MutinyError;
4+
use crate::wallet::MutinyWallet;
5+
use bdk::blockchain::Blockchain;
6+
use bdk_macros::maybe_await;
7+
use lightning::chain::chaininterface::{
8+
BroadcasterInterface, ConfirmationTarget, FeeEstimator, FEERATE_FLOOR_SATS_PER_KW,
9+
};
10+
use lightning::chain::{Confirm, Filter, WatchedOutput};
11+
use log::error;
12+
use std::collections::HashSet;
13+
use std::sync::{Arc, Mutex};
14+
use wasm_bindgen_futures::spawn_local;
15+
16+
pub struct MutinyChain {
17+
wallet: Arc<MutinyWallet>,
18+
// Transactions that were registered via the `Filter` interface and have to be processed.
19+
queued_transactions: Mutex<HashSet<Txid>>,
20+
// Transactions that were previously processed, but must not be forgotten yet.
21+
watched_transactions: Mutex<HashSet<Txid>>,
22+
// Outputs that were registered via the `Filter` interface and have to be processed.
23+
queued_outputs: Mutex<HashSet<WatchedOutput>>,
24+
// Outputs that were previously processed, but must not be forgotten yet.
25+
watched_outputs: Mutex<HashSet<WatchedOutput>>,
26+
// The tip hash observed during our last sync.
27+
last_sync_hash: futures::lock::Mutex<Option<BlockHash>>,
28+
}
29+
30+
impl MutinyChain {
31+
pub(crate) fn new(wallet: Arc<MutinyWallet>) -> Self {
32+
let watched_transactions = Mutex::new(HashSet::new());
33+
let queued_transactions = Mutex::new(HashSet::new());
34+
let watched_outputs = Mutex::new(HashSet::new());
35+
let queued_outputs = Mutex::new(HashSet::new());
36+
let last_sync_hash = futures::lock::Mutex::new(None);
37+
Self {
38+
wallet,
39+
queued_transactions,
40+
watched_transactions,
41+
queued_outputs,
42+
watched_outputs,
43+
last_sync_hash,
44+
}
45+
}
46+
47+
/// Syncs the LDK wallet via the `Confirm` interface. We run in a loop until we completed a
48+
/// full iteration without
49+
pub(crate) async fn sync(
50+
&self,
51+
confirmables: Vec<&(dyn Confirm + Sync)>,
52+
) -> Result<(), MutinyError> {
53+
// This lock makes sure we're syncing once at a time.
54+
let mut locked_last_sync_hash = self.last_sync_hash.lock().await;
55+
56+
let client = &*self.wallet.blockchain;
57+
58+
let mut tip_hash = client.get_tip_hash().await?;
59+
60+
loop {
61+
let registrations_are_pending = self.process_queues();
62+
let tip_is_new = Some(tip_hash) != *locked_last_sync_hash;
63+
64+
// We loop until any registered transactions have been processed at least once, or the
65+
// tip hasn't been updated during the last iteration.
66+
if !registrations_are_pending && !tip_is_new {
67+
// Nothing to do.
68+
break;
69+
} else {
70+
// Update the known tip to the newest one.
71+
if tip_is_new {
72+
// First check for any unconfirmed transactions and act on it immediately.
73+
self.sync_unconfirmed_transactions(&confirmables).await?;
74+
75+
match self.sync_best_block_updated(&confirmables, &tip_hash).await {
76+
Ok(()) => {}
77+
Err(MutinyError::ChainAccessFailed) => {
78+
// Immediately restart syncing when we encounter any inconsistencies.
79+
continue;
80+
}
81+
Err(err) => {
82+
// (Semi-)permanent failure, retry later.
83+
return Err(err);
84+
}
85+
}
86+
}
87+
88+
match self.get_confirmed_transactions().await {
89+
Ok((confirmed_txs, unconfirmed_registered_txs, unspent_registered_outputs)) => {
90+
// Double-check tip hash. If something changed, restart last-minute.
91+
tip_hash = client.get_tip_hash().await?;
92+
if Some(tip_hash) != *locked_last_sync_hash {
93+
continue;
94+
}
95+
96+
self.sync_confirmed_transactions(
97+
&confirmables,
98+
confirmed_txs,
99+
unconfirmed_registered_txs,
100+
unspent_registered_outputs,
101+
);
102+
}
103+
Err(MutinyError::ChainAccessFailed) => {
104+
// Immediately restart syncing when we encounter any inconsistencies.
105+
continue;
106+
}
107+
Err(err) => {
108+
// (Semi-)permanent failure, retry later.
109+
return Err(err);
110+
}
111+
}
112+
*locked_last_sync_hash = Some(tip_hash);
113+
}
114+
}
115+
Ok(())
116+
}
117+
118+
// Processes the transaction and output queues, returns `true` if new items had been
119+
// registered.
120+
fn process_queues(&self) -> bool {
121+
let mut pending_registrations = false;
122+
{
123+
let mut locked_queued_transactions = self.queued_transactions.lock().unwrap();
124+
if !locked_queued_transactions.is_empty() {
125+
let mut locked_watched_transactions = self.watched_transactions.lock().unwrap();
126+
pending_registrations = true;
127+
128+
locked_watched_transactions.extend(locked_queued_transactions.iter());
129+
*locked_queued_transactions = HashSet::new();
130+
}
131+
}
132+
{
133+
let mut locked_queued_outputs = self.queued_outputs.lock().unwrap();
134+
if !locked_queued_outputs.is_empty() {
135+
let mut locked_watched_outputs = self.watched_outputs.lock().unwrap();
136+
pending_registrations = true;
137+
138+
locked_watched_outputs.extend(locked_queued_outputs.iter().cloned());
139+
*locked_queued_outputs = HashSet::new();
140+
}
141+
}
142+
pending_registrations
143+
}
144+
145+
async fn sync_best_block_updated(
146+
&self,
147+
confirmables: &Vec<&(dyn Confirm + Sync)>,
148+
tip_hash: &BlockHash,
149+
) -> Result<(), MutinyError> {
150+
let client = &*self.wallet.blockchain;
151+
152+
// Inform the interface of the new block.
153+
let tip_header = client.get_header_by_hash(tip_hash).await?;
154+
let tip_status = client.get_block_status(tip_hash).await?;
155+
if tip_status.in_best_chain {
156+
if let Some(tip_height) = tip_status.height {
157+
for c in confirmables {
158+
c.best_block_updated(&tip_header, tip_height);
159+
}
160+
}
161+
} else {
162+
return Err(MutinyError::ChainAccessFailed);
163+
}
164+
Ok(())
165+
}
166+
167+
fn sync_confirmed_transactions(
168+
&self,
169+
confirmables: &Vec<&(dyn Confirm + Sync)>,
170+
confirmed_txs: Vec<ConfirmedTx>,
171+
unconfirmed_registered_txs: HashSet<Txid>,
172+
unspent_registered_outputs: HashSet<WatchedOutput>,
173+
) {
174+
for ctx in confirmed_txs {
175+
for c in confirmables {
176+
c.transactions_confirmed(
177+
&ctx.block_header,
178+
&[(ctx.pos, &ctx.tx)],
179+
ctx.block_height,
180+
);
181+
}
182+
}
183+
184+
*self.watched_transactions.lock().unwrap() = unconfirmed_registered_txs;
185+
*self.watched_outputs.lock().unwrap() = unspent_registered_outputs;
186+
}
187+
188+
async fn get_confirmed_transactions(
189+
&self,
190+
) -> Result<(Vec<ConfirmedTx>, HashSet<Txid>, HashSet<WatchedOutput>), MutinyError> {
191+
let client = &*self.wallet.blockchain;
192+
193+
// First, check the confirmation status of registered transactions as well as the
194+
// status of dependent transactions of registered outputs.
195+
196+
let mut confirmed_txs = Vec::new();
197+
198+
// Check in the current queue, as well as in registered transactions leftover from
199+
// previous iterations.
200+
let registered_txs = self.watched_transactions.lock().unwrap().clone();
201+
202+
// Remember all registered but unconfirmed transactions for future processing.
203+
let mut unconfirmed_registered_txs = HashSet::new();
204+
205+
for txid in registered_txs {
206+
if let Some(confirmed_tx) = self.get_confirmed_tx(&txid, None, None).await? {
207+
confirmed_txs.push(confirmed_tx);
208+
} else {
209+
unconfirmed_registered_txs.insert(txid);
210+
}
211+
}
212+
213+
// Check all registered outputs for dependent spending transactions.
214+
let registered_outputs = self.watched_outputs.lock().unwrap().clone();
215+
216+
// Remember all registered outputs that haven't been spent for future processing.
217+
let mut unspent_registered_outputs = HashSet::new();
218+
219+
for output in registered_outputs {
220+
if let Some(output_status) = client
221+
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
222+
.await?
223+
{
224+
if let Some(spending_txid) = output_status.txid {
225+
if let Some(spending_tx_status) = output_status.status {
226+
if let Some(confirmed_tx) = self
227+
.get_confirmed_tx(
228+
&spending_txid,
229+
spending_tx_status.block_hash,
230+
spending_tx_status.block_height,
231+
)
232+
.await?
233+
{
234+
confirmed_txs.push(confirmed_tx);
235+
continue;
236+
}
237+
}
238+
}
239+
}
240+
unspent_registered_outputs.insert(output);
241+
}
242+
243+
// Sort all confirmed transactions first by block height, then by in-block
244+
// position, and finally feed them to the interface in order.
245+
confirmed_txs.sort_unstable_by(|tx1, tx2| {
246+
tx1.block_height
247+
.cmp(&tx2.block_height)
248+
.then_with(|| tx1.pos.cmp(&tx2.pos))
249+
});
250+
251+
Ok((
252+
confirmed_txs,
253+
unconfirmed_registered_txs,
254+
unspent_registered_outputs,
255+
))
256+
}
257+
258+
async fn get_confirmed_tx(
259+
&self,
260+
txid: &Txid,
261+
expected_block_hash: Option<BlockHash>,
262+
known_block_height: Option<u32>,
263+
) -> Result<Option<ConfirmedTx>, MutinyError> {
264+
let client = &*self.wallet.blockchain;
265+
266+
if let Some(merkle_proof) = client.get_merkle_proof(txid).await? {
267+
let block_hash = client.get_block_hash(merkle_proof.block_height).await?;
268+
if let Some(expected_block_hash) = expected_block_hash {
269+
if expected_block_hash != block_hash {
270+
return Err(MutinyError::ChainAccessFailed);
271+
}
272+
}
273+
274+
let block_header = client.get_header_by_hash(&block_hash).await?;
275+
276+
if let Some(tx) = client.get_tx(txid).await? {
277+
// We can take a shortcut here if a previous call already gave us the height.
278+
if let Some(block_height) = known_block_height {
279+
// if we have mismatched heights something probably went wrong
280+
if merkle_proof.block_height != block_height {
281+
return Err(MutinyError::ChainAccessFailed);
282+
}
283+
return Ok(Some(ConfirmedTx {
284+
tx,
285+
block_header,
286+
pos: merkle_proof.pos,
287+
block_height,
288+
}));
289+
}
290+
291+
return Ok(Some(ConfirmedTx {
292+
tx,
293+
block_header,
294+
pos: merkle_proof.pos,
295+
block_height: merkle_proof.block_height,
296+
}));
297+
}
298+
}
299+
Ok(None)
300+
}
301+
302+
async fn sync_unconfirmed_transactions(
303+
&self,
304+
confirmables: &Vec<&(dyn Confirm + Sync)>,
305+
) -> Result<(), MutinyError> {
306+
let client = &*self.wallet.blockchain;
307+
// Query the interface for relevant txids and check whether they are still
308+
// in the best chain, mark them unconfirmed otherwise.
309+
310+
let relevant_txids = confirmables
311+
.iter()
312+
.flat_map(|c| c.get_relevant_txids())
313+
.collect::<Vec<Txid>>();
314+
for txid in relevant_txids {
315+
match client.get_tx_status(&txid).await {
316+
Ok(Some(status)) => {
317+
// Skip if the tx in question is still confirmed.
318+
if status.confirmed {
319+
continue;
320+
}
321+
}
322+
// if the tx no longer exists or errors, we should
323+
// consider it unconfirmed
324+
Ok(None) => (),
325+
Err(_) => (),
326+
}
327+
328+
for c in confirmables {
329+
c.transaction_unconfirmed(&txid);
330+
}
331+
}
332+
333+
Ok(())
334+
}
335+
}
336+
337+
struct ConfirmedTx {
338+
tx: Transaction,
339+
block_header: BlockHeader,
340+
block_height: u32,
341+
pos: usize,
342+
}
343+
344+
impl Filter for MutinyChain {
345+
fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
346+
self.queued_transactions.lock().unwrap().insert(*txid);
347+
}
348+
349+
fn register_output(&self, output: WatchedOutput) {
350+
self.queued_outputs.lock().unwrap().insert(output);
351+
}
352+
}
353+
354+
impl BroadcasterInterface for MutinyChain {
355+
fn broadcast_transaction(&self, tx: &Transaction) {
356+
let blockchain = self.wallet.blockchain.clone();
357+
let tx_clone = tx.clone();
358+
spawn_local(async move {
359+
maybe_await!(blockchain.broadcast(&tx_clone))
360+
.unwrap_or_else(|_| error!("failed to broadcast tx! {}", tx_clone.txid()))
361+
});
362+
}
363+
}
364+
365+
impl FeeEstimator for MutinyChain {
366+
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
367+
// todo get from esplora
368+
fallback_fee_from_conf_target(confirmation_target)
369+
}
370+
}
371+
372+
fn fallback_fee_from_conf_target(confirmation_target: ConfirmationTarget) -> u32 {
373+
match confirmation_target {
374+
ConfirmationTarget::Background => FEERATE_FLOOR_SATS_PER_KW,
375+
ConfirmationTarget::Normal => 2000,
376+
ConfirmationTarget::HighPriority => 5000,
377+
}
378+
}

node-manager/src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bdk::esplora_client;
12
use lightning::ln::peer_handler::PeerHandleError;
23
use thiserror::Error;
34
use wasm_bindgen::JsValue;
@@ -92,6 +93,14 @@ impl From<bdk::Error> for MutinyError {
9293
}
9394
}
9495

96+
// TODO add more granular errors for esplora failures
97+
impl From<esplora_client::Error> for MutinyError {
98+
fn from(_e: esplora_client::Error) -> Self {
99+
// This is most likely a chain access failure
100+
Self::ChainAccessFailed
101+
}
102+
}
103+
95104
impl From<PeerHandleError> for MutinyError {
96105
fn from(_e: PeerHandleError) -> Self {
97106
// TODO handle the case where `no_connection_possible`

0 commit comments

Comments
 (0)