Skip to content

Commit

Permalink
Merge pull request #968 from breez/keep-alive
Browse files Browse the repository at this point in the history
Keep node connection alive during payments
  • Loading branch information
roeierez authored May 8, 2024
2 parents 5aefb6b + 3137981 commit e1eebd1
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 10 deletions.
12 changes: 12 additions & 0 deletions libs/sdk-core/src/breez_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,8 @@ impl BreezServices {
// start the signer
let (shutdown_signer_sender, signer_signer_receiver) = mpsc::channel(1);
self.start_signer(signer_signer_receiver).await;
self.start_node_keep_alive(self.shutdown_receiver.clone())
.await;

// Sync node state
let sync_breez_services = self.clone();
Expand Down Expand Up @@ -1419,6 +1421,16 @@ impl BreezServices {
});
}

async fn start_node_keep_alive(
self: &Arc<BreezServices>,
shutdown_receiver: watch::Receiver<()>,
) {
let cloned = self.clone();
tokio::spawn(async move {
cloned.node_api.start_keep_alive(shutdown_receiver).await;
});
}

async fn start_backup_watcher(self: &Arc<BreezServices>) -> Result<()> {
self.backup_watcher
.start(self.shutdown_receiver.clone())
Expand Down
72 changes: 64 additions & 8 deletions libs/sdk-core/src/greenlight/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::cmp::{min, Reverse};
use std::iter::Iterator;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Result};
use ecies::symmetric::{sym_decrypt, sym_encrypt};
use futures::Stream;
use futures::{Future, Stream};
use gl_client::node::ClnClient;
use gl_client::pb::cln::listinvoices_invoices::ListinvoicesInvoicesStatus;
use gl_client::pb::cln::listpays_pays::ListpaysPaysStatus;
Expand All @@ -27,8 +28,8 @@ use gl_client::tls::TlsConfig;
use gl_client::{node, utils};
use serde::{Deserialize, Serialize};
use strum_macros::{Display, EnumString};
use tokio::sync::{mpsc, Mutex};
use tokio::time::sleep;
use tokio::sync::{mpsc, watch, Mutex};
use tokio::time::{sleep, MissedTickBehavior};
use tokio_stream::StreamExt;
use tonic::transport::{Endpoint, Uri};
use tonic::{Code, Streaming};
Expand Down Expand Up @@ -63,6 +64,7 @@ pub(crate) struct Greenlight {
gl_client: Mutex<Option<node::Client>>,
node_client: Mutex<Option<ClnClient>>,
persister: Arc<SqliteStorage>,
inprogress_payments: AtomicU16,
}

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -169,6 +171,7 @@ impl Greenlight {
gl_client: Mutex::new(None),
node_client: Mutex::new(None),
persister,
inprogress_payments: AtomicU16::new(0),
})
}

Expand Down Expand Up @@ -763,6 +766,16 @@ impl Greenlight {

Ok(max_per_channel)
}

async fn with_keep_alive<T, F>(&self, f: F) -> T
where
F: Future<Output = T>,
{
_ = self.inprogress_payments.fetch_add(1, Ordering::Relaxed);
let res = f.await;
_ = self.inprogress_payments.fetch_sub(1, Ordering::Relaxed);
res
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -1059,13 +1072,13 @@ impl NodeAPI for Greenlight {

// Now we wait for the first part to be completed as a way to wait for the payment
// to complete.
let response = client
.wait_send_pay(WaitsendpayRequest {
let response = self
.with_keep_alive(client.wait_send_pay(WaitsendpayRequest {
payment_hash: hex::decode(invoice.payment_hash.clone())?,
partid: Some(1),
timeout: Some(self.sdk_config.payment_timeout_sec),
groupid: Some(group_id),
})
}))
.await?
.into_inner();
Ok(PaymentResponse {
Expand Down Expand Up @@ -1107,7 +1120,10 @@ impl NodeAPI for Greenlight {
msat: self.sdk_config.exemptfee_msat,
}),
};
let result: cln::PayResponse = client.pay(request).await?.into_inner();
let result: cln::PayResponse = self
.with_keep_alive(client.pay(request))
.await?
.into_inner();

// Before returning from send_payment we need to make sure it is persisted in the backend node.
// We do so by polling for the payment.
Expand Down Expand Up @@ -1145,7 +1161,10 @@ impl NodeAPI for Greenlight {
retry_for: Some(self.sdk_config.payment_timeout_sec),
maxdelay: None,
};
let result = client.key_send(request).await?.into_inner();
let result = self
.with_keep_alive(client.key_send(request))
.await?
.into_inner();

// Before returning from send_payment we need to make sure it is persisted in the backend node.
// We do so by polling for the payment.
Expand Down Expand Up @@ -1245,6 +1264,43 @@ impl NodeAPI for Greenlight {
}
}

async fn start_keep_alive(&self, mut shutdown: watch::Receiver<()>) {
info!("keep alive started");
let mut interval = tokio::time::interval(Duration::from_secs(15));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown.changed() => {
info!("keep alive exited");
break;
}
_ = interval.tick() => {
let inprogress_payments = self.inprogress_payments.load(Ordering::Relaxed);
if inprogress_payments == 0 {
continue
}
let client_res = self.get_node_client().await;
match client_res {
Ok(mut client) => {
let res = client.getinfo(cln::GetinfoRequest {}).await;
match res {
Ok(_) => {
info!("keep alive ping sent, in progress payments: {inprogress_payments}");
}
Err(e) => {
error!("keep alive ping failed: {e}");
}
}
}
Err(e) => {
error!("keep alive ping failed to create client: {e}");
}
}
}
}
}
}

async fn connect_peer(&self, id: String, addr: String) -> NodeResult<()> {
let mut client = self.get_node_client().await?;
let connect_req = cln::ConnectRequest {
Expand Down
3 changes: 2 additions & 1 deletion libs/sdk-core/src/node_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::pin::Pin;

use anyhow::Result;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, watch};
use tokio_stream::Stream;
use tonic::Streaming;

Expand Down Expand Up @@ -128,6 +128,7 @@ pub trait NodeAPI: Send + Sync {
req: PrepareRedeemOnchainFundsRequest,
) -> NodeResult<PrepareRedeemOnchainFundsResponse>;
async fn start_signer(&self, shutdown: mpsc::Receiver<()>);
async fn start_keep_alive(&self, shutdown: watch::Receiver<()>);
async fn connect_peer(&self, node_id: String, addr: String) -> NodeResult<()>;
fn sign_invoice(&self, invoice: RawBolt11Invoice) -> NodeResult<String>;
async fn close_peer_channels(&self, node_id: String) -> NodeResult<Vec<String>>;
Expand Down
4 changes: 3 additions & 1 deletion libs/sdk-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use rand::distributions::uniform::{SampleRange, SampleUniform};
use rand::distributions::{Alphanumeric, DistString, Standard};
use rand::rngs::OsRng;
use rand::{random, Rng};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, watch, Mutex};
use tokio::time::sleep;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -414,6 +414,8 @@ impl NodeAPI for MockNodeAPI {

async fn start_signer(&self, _shutdown: mpsc::Receiver<()>) {}

async fn start_keep_alive(&self, _shutdown: watch::Receiver<()>) {}

async fn connect_peer(&self, _node_id: String, _addr: String) -> NodeResult<()> {
Ok(())
}
Expand Down

0 comments on commit e1eebd1

Please sign in to comment.