diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index 42df6adc3..b11737e5a 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -27,6 +27,7 @@ //! network: bdk::bitcoin::Network::Testnet, //! wallet_name: "wallet_name".to_string(), //! sync_params: None, +//! max_tries: 3, //! }; //! let blockchain = RpcBlockchain::from_config(&config); //! ``` @@ -45,12 +46,17 @@ use bitcoincore_rpc::json::{ ListUnspentResultEntry, ScanningDetails, }; use bitcoincore_rpc::jsonrpc::serde_json::{json, Value}; +use bitcoincore_rpc::jsonrpc::{ + self, simple_http::SimpleHttpTransport, Error as JsonRpcError, Request, Response, Transport, +}; use bitcoincore_rpc::Auth as RpcAuth; use bitcoincore_rpc::{Client, RpcApi}; use log::{debug, info}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; +use std::fmt; use std::path::PathBuf; +use std::sync::atomic::{AtomicU8, Ordering}; use std::thread; use std::time::Duration; @@ -80,6 +86,10 @@ pub struct RpcConfig { pub wallet_name: String, /// Sync parameters pub sync_params: Option, + /// Max number of attempts before giving up and returning an error + /// + /// Set to `0` preserve the old behavior of erroring immediately + pub max_tries: u8, } /// Sync parameters for Bitcoin Core RPC. @@ -195,6 +205,68 @@ impl WalletSync for RpcBlockchain { } } +struct SimpleHttpWithRetry { + inner: SimpleHttpTransport, + attempts: AtomicU8, + limit: u8, +} + +macro_rules! impl_inner { + ($self:expr, $method:ident, $req:expr) => {{ + while $self.attempts.load(Ordering::Relaxed) <= $self.limit { + match $self.inner.$method($req.clone()) { + Ok(r) => { + $self.attempts.store(0, Ordering::Relaxed); + return Ok(r); + } + Err(JsonRpcError::Transport(e)) => { + match e.downcast_ref::() { + Some(jsonrpc::simple_http::Error::SocketError(io)) + if io.kind() == std::io::ErrorKind::WouldBlock => + { + let attempt = $self.attempts.fetch_add(1, Ordering::Relaxed); + let delay = std::cmp::min(1000, 100 << attempt as u64); + + debug!( + "Got a WouldBlock error at attempt {}, sleeping for {}ms", + attempt, delay + ); + std::thread::sleep(std::time::Duration::from_millis(delay)); + + continue; + } + _ => {} + } + + $self.attempts.store(0, Ordering::Relaxed); + return Err(JsonRpcError::Transport(e)); + } + Err(e) => { + $self.attempts.store(0, Ordering::Relaxed); + return Err(e); + } + } + } + + $self.attempts.store(0, Ordering::Relaxed); + Err(JsonRpcError::Transport("All attempts errored".into())) + }}; +} + +impl Transport for SimpleHttpWithRetry { + fn send_request(&self, req: Request) -> Result { + impl_inner!(self, send_request, req) + } + + fn send_batch(&self, reqs: &[Request]) -> Result, JsonRpcError> { + impl_inner!(self, send_batch, reqs) + } + + fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.fmt_target(f) + } +} + impl ConfigurableBlockchain for RpcBlockchain { type Config = RpcConfig; @@ -203,7 +275,23 @@ impl ConfigurableBlockchain for RpcBlockchain { fn from_config(config: &Self::Config) -> Result { let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name); - let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?; + let mut builder = SimpleHttpTransport::builder() + .url(&wallet_url) + .map_err(|e| bitcoincore_rpc::Error::JsonRpc(e.into()))?; + + let (user, pass) = bitcoincore_rpc::Auth::from(config.auth.clone()).get_user_pass()?; + if let Some(user) = user { + builder = builder.auth(user, pass); + } + + let transport = SimpleHttpWithRetry { + inner: builder.build(), + attempts: AtomicU8::new(0), + limit: config.max_tries, + }; + let jsonrpc_client = jsonrpc::client::Client::with_transport(transport); + + let client = Client::from_jsonrpc(jsonrpc_client); let rpc_version = client.version()?; info!("connected to '{}' with auth: {:?}", wallet_url, config.auth); @@ -816,6 +904,7 @@ fn descriptor_from_script_pubkey(script: &Script) -> String { /// wallet_name_prefix: Some("prefix-".to_string()), /// default_skip_blocks: 100_000, /// sync_params: None, +/// max_tries: 3, /// }; /// let main_wallet_blockchain = factory.build("main_wallet", Some(200_000))?; /// # Ok(()) @@ -835,6 +924,10 @@ pub struct RpcBlockchainFactory { pub default_skip_blocks: u32, /// Sync parameters pub sync_params: Option, + /// Max number of attempts before giving up and returning an error + /// + /// Set to `0` preserve the old behavior of erroring immediately + pub max_tries: u8, } impl BlockchainFactory for RpcBlockchainFactory { @@ -855,6 +948,7 @@ impl BlockchainFactory for RpcBlockchainFactory { checksum ), sync_params: self.sync_params.clone(), + max_tries: self.max_tries, }) } } @@ -882,6 +976,7 @@ mod test { network: Network::Regtest, wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ), sync_params: None, + max_tries: 5, }; RpcBlockchain::from_config(&config).unwrap() } @@ -899,6 +994,7 @@ mod test { wallet_name_prefix: Some("prefix-".into()), default_skip_blocks: 0, sync_params: None, + max_tries: 3, }; (test_client, factory)