Skip to content

Commit

Permalink
Register and un-register for notifications on all used LSPs (#1035)
Browse files Browse the repository at this point in the history
* Update breez.proto

* LspAPI: add new method list_used_lsps()

* Register and un-register for notifications on all used LSPs

* NodeAPI: add has_active_channel_to()

* Notifications: only consider LSPs with whom we have a channel

* Notification LSPs: always consider active LSP, only historical LSPs with whom we have a channel

* NodeAPI: remove has_active_channel_to()

* NodeAPI: extract get_open_peer_channels()

* Simplify get_open_peer_channels, change to get_open_peers

* Register/unregister notifications: attempt for all used LSPs, even if one failed

* Register/unregister notifications: throw error if any attempt failed, but preserve loop

* get_notification_lsps: don't throw error if no active LSP

* get_notification_lsps: simplify active LSP check

* Add comment to breez.proto

* get_open_peer_channels_pb: consider both public and private channels

* get_routing_hints: separate private from public channels
  • Loading branch information
ok300 authored Jul 16, 2024
1 parent acadf5b commit 2b8a586
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 49 deletions.
19 changes: 17 additions & 2 deletions libs/sdk-common/src/grpc/proto/breez.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ service Information {

service ChannelOpener {
rpc LSPList(LSPListRequest) returns (LSPListReply) {}
rpc OpenLSPChannel(OpenLSPChannelRequest) returns (OpenLSPChannelReply) {}

// Returns active and historical LSPs used by a node.
// In the response, the LSP with a non-empty fee list is active, the LSPs with an empty fee list are historical.
rpc LSPFullList(LSPFullListRequest) returns (LSPFullListReply) {}

rpc RegisterPayment(RegisterPaymentRequest) returns (RegisterPaymentReply) {}
rpc CheckChannels(CheckChannelsRequest) returns (CheckChannelsReply) {}
}
Expand Down Expand Up @@ -147,6 +151,11 @@ message LSPListRequest {
// The identity pubkey of the client
string pubkey = 2 [ json_name = "pubkey" ];
}

message LSPFullListRequest {
/// The identity pubkey of the client
string pubkey = 1 [json_name = "pubkey"];
}
message LSPInformation {
string name = 1 [ json_name = "name" ];
string widget_url = 2 [ json_name = "widget_url" ];
Expand All @@ -160,7 +169,9 @@ message LSPInformation {

bytes lsp_pubkey = 12;

repeated OpeningFeeParams opening_fee_params_list = 15;
repeated OpeningFeeParams opening_fee_params_menu = 15;

string id = 16;
}

// Fee parameters for opening a channel, received from the LSP.
Expand All @@ -176,6 +187,10 @@ message LSPListReply {
map<string, LSPInformation> lsps = 1; // The key is the lsp id
}

message LSPFullListReply {
repeated LSPInformation lsps = 1;
}

message RegisterPaymentRequest {
string lsp_id = 1;
bytes blob = 3;
Expand Down
123 changes: 103 additions & 20 deletions libs/sdk-core/src/breez_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1919,37 +1919,87 @@ impl BreezServices {

/// Registers for lightning payment notifications. When a payment is intercepted by the LSP
/// to this node, a callback will be triggered to the `webhook_url`.
///
/// Note: these notifications are registered for all LSPs (active and historical) with whom
/// we have a channel.
async fn register_payment_notifications(&self, webhook_url: String) -> SdkResult<()> {
let message = webhook_url.clone();
let sign_request = SignMessageRequest { message };
let sign_response = self.sign_message(sign_request).await?;
let lsp_info = self.lsp_info().await?;
self.lsp_api
.register_payment_notifications(
lsp_info.id,
lsp_info.lsp_pubkey,
webhook_url.clone(),
sign_response.signature,
)
.await?;
Ok(())

// Attempt register call for all relevant LSPs
let mut error_found = false;
for lsp_info in get_notification_lsps(
self.persister.clone(),
self.lsp_api.clone(),
self.node_api.clone(),
)
.await?
{
let lsp_id = lsp_info.id;
let res = self
.lsp_api
.register_payment_notifications(
lsp_id.clone(),
lsp_info.lsp_pubkey,
webhook_url.clone(),
sign_response.signature.clone(),
)
.await;
if res.is_err() {
error_found = true;
warn!("Failed to register notifications for LSP {lsp_id}: {res:?}");
}
}

match error_found {
true => Err(SdkError::generic(
"Failed to register notifications for at least one LSP, see logs for details",
)),
false => Ok(()),
}
}

/// Unregisters lightning payment notifications with the current LSP for the `webhook_url`.
///
/// Note: these notifications are unregistered for all LSPs (active and historical) with whom
/// we have a channel.
async fn unregister_payment_notifications(&self, webhook_url: String) -> SdkResult<()> {
let message = webhook_url.clone();
let sign_request = SignMessageRequest { message };
let sign_response = self.sign_message(sign_request).await?;
let lsp_info = self.lsp_info().await?;
self.lsp_api
.unregister_payment_notifications(
lsp_info.id,
lsp_info.lsp_pubkey,
webhook_url.clone(),
sign_response.signature,
)
.await?;
Ok(())

// Attempt register call for all relevant LSPs
let mut error_found = false;
for lsp_info in get_notification_lsps(
self.persister.clone(),
self.lsp_api.clone(),
self.node_api.clone(),
)
.await?
{
let lsp_id = lsp_info.id;
let res = self
.lsp_api
.unregister_payment_notifications(
lsp_id.clone(),
lsp_info.lsp_pubkey,
webhook_url.clone(),
sign_response.signature.clone(),
)
.await;
if res.is_err() {
error_found = true;
warn!("Failed to un-register notifications for LSP {lsp_id}: {res:?}");
}
}

match error_found {
true => Err(SdkError::generic(
"Failed to un-register notifications for at least one LSP, see logs for details",
)),
false => Ok(()),
}
}

/// Registers for a onchain tx notification. When a new transaction to the specified `address`
Expand Down Expand Up @@ -2640,6 +2690,39 @@ async fn get_lsp_by_id(
.cloned())
}

/// Convenience method to get all LSPs (active and historical) relevant for registering or
/// unregistering webhook notifications
async fn get_notification_lsps(
persister: Arc<SqliteStorage>,
lsp_api: Arc<dyn LspAPI>,
node_api: Arc<dyn NodeAPI>,
) -> SdkResult<Vec<LspInformation>> {
let node_pubkey = persister
.get_node_state()?
.ok_or(SdkError::generic("Node info not found"))?
.id;
let open_peers = node_api.get_open_peers().await?;

let mut notification_lsps = vec![];
for lsp in lsp_api.list_used_lsps(node_pubkey).await? {
match !lsp.opening_fee_params_list.values.is_empty() {
true => {
// Non-empty fee params list = this is the active LSP
// Always consider the active LSP for notifications
notification_lsps.push(lsp);
}
false => {
// Consider only historical LSPs with whom we have an active channel
let has_active_channel_to_lsp = open_peers.contains(&lsp.lsp_pubkey);
if has_active_channel_to_lsp {
notification_lsps.push(lsp);
}
}
}
}
Ok(notification_lsps)
}

#[cfg(test)]
pub(crate) mod tests {
use std::collections::HashMap;
Expand Down
60 changes: 40 additions & 20 deletions libs/sdk-core/src/greenlight/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,28 @@ impl Greenlight {
Ok(max_per_channel)
}

/// Get open peer channels (private and public) as raw protobuf structs, indexed by peer pubkey
async fn get_open_peer_channels_pb(
&self,
) -> NodeResult<HashMap<Vec<u8>, cln::ListpeerchannelsChannels>> {
let mut node_client = self.get_node_client().await?;
// Get the peer channels
let peer_channels = node_client
.list_peer_channels(cln::ListpeerchannelsRequest::default())
.await?
.into_inner();

let open_peer_channels: HashMap<Vec<u8>, cln::ListpeerchannelsChannels> = peer_channels
.channels
.into_iter()
.filter(|c| {
c.state == Some(cln::ChannelState::ChanneldNormal as i32) && c.peer_id.is_some()
})
.map(|c| (c.peer_id.clone().unwrap(), c))
.collect();
Ok(open_peer_channels)
}

async fn with_keep_alive<T, F>(&self, f: F) -> T
where
F: Future<Output = T>,
Expand Down Expand Up @@ -1650,27 +1672,19 @@ impl NodeAPI for Greenlight {
&self,
lsp_info: &LspInformation,
) -> NodeResult<(Vec<RouteHint>, bool)> {
let mut hints: Vec<RouteHint> = vec![];
let mut node_client = self.get_node_client().await?;
// Get the peer channels
let peer_channels = node_client
.list_peer_channels(cln::ListpeerchannelsRequest::default())
.await?
.into_inner();

let mut has_public_channel = false;
let open_peer_channels: HashMap<Vec<u8>, cln::ListpeerchannelsChannels> = peer_channels
.channels
let open_peer_channels = self.get_open_peer_channels_pb().await?;
let (open_peer_channels_private, open_peer_channels_public): (
HashMap<Vec<u8>, ListpeerchannelsChannels>,
HashMap<Vec<u8>, ListpeerchannelsChannels>,
) = open_peer_channels
.into_iter()
.filter(|c| {
let is_private = c.private.unwrap_or_default();
has_public_channel |= !is_private;
is_private
&& c.state == Some(cln::ChannelState::ChanneldNormal as i32)
&& c.peer_id.is_some()
})
.map(|c| (c.peer_id.clone().unwrap(), c))
.collect();
.partition(|(_, c)| c.private.unwrap_or_default());
let has_public_channel = !open_peer_channels_public.is_empty();

let mut hints: Vec<RouteHint> = vec![];

// Get channels where our node is the destination
let pubkey = self
.persister
Expand All @@ -1689,8 +1703,8 @@ impl NodeAPI for Greenlight {
.map(|c| (c.source.clone(), c))
.collect();

// Create a routing hint from each channel.
for (peer_id, peer_channel) in open_peer_channels {
// Create a routing hint from each private channel.
for (peer_id, peer_channel) in open_peer_channels_private {
let peer_id_str = hex::encode(&peer_id);
let optional_channel_id = peer_channel
.alias
Expand Down Expand Up @@ -1745,6 +1759,12 @@ impl NodeAPI for Greenlight {
}
Ok((hints, has_public_channel))
}

async fn get_open_peers(&self) -> NodeResult<HashSet<Vec<u8>>> {
let open_peer_channels = self.get_open_peer_channels_pb().await?;
let open_peers: HashSet<Vec<u8>> = open_peer_channels.into_keys().collect();
Ok(open_peers)
}
}

#[derive(Clone, PartialEq, Eq, Debug, EnumString, Display, Deserialize, Serialize)]
Expand Down
25 changes: 21 additions & 4 deletions libs/sdk-core/src/lsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use crate::models::{LspAPI, OpeningFeeParams, OpeningFeeParamsMenu};
use anyhow::{anyhow, Result};
use prost::Message;
use sdk_common::grpc::{
self, LspListRequest, PaymentInformation, RegisterPaymentNotificationRequest,
RegisterPaymentNotificationResponse, RegisterPaymentReply, RegisterPaymentRequest,
RemovePaymentNotificationRequest, RemovePaymentNotificationResponse,
self, LspFullListRequest, LspListRequest, PaymentInformation,
RegisterPaymentNotificationRequest, RegisterPaymentNotificationResponse, RegisterPaymentReply,
RegisterPaymentRequest, RemovePaymentNotificationRequest, RemovePaymentNotificationResponse,
SubscribeNotificationsRequest, UnsubscribeNotificationsRequest,
};
use sdk_common::prelude::BreezServer;
Expand Down Expand Up @@ -61,7 +61,7 @@ impl LspInformation {
min_htlc_msat: lsp_info.min_htlc_msat,
lsp_pubkey: lsp_info.lsp_pubkey,
opening_fee_params_list: OpeningFeeParamsMenu::try_from(
lsp_info.opening_fee_params_list,
lsp_info.opening_fee_params_menu,
)?,
};

Expand Down Expand Up @@ -108,6 +108,23 @@ impl LspAPI for BreezServer {
Ok(lsp_list)
}

async fn list_used_lsps(&self, pubkey: String) -> SdkResult<Vec<LspInformation>> {
let mut client = self.get_channel_opener_client().await?;

let request = Request::new(LspFullListRequest { pubkey });
let response = client.lsp_full_list(request).await?;
let mut lsp_list: Vec<LspInformation> = Vec::new();
for grpc_lsp_info in response.into_inner().lsps.into_iter() {
let lsp_id = grpc_lsp_info.id.clone();
match LspInformation::try_from(&lsp_id, grpc_lsp_info) {
Ok(lsp) => lsp_list.push(lsp),
Err(e) => error!("LSP Information validation failed for LSP {lsp_id}: {e}"),
}
}
lsp_list.sort_by(|a, b| a.name.to_lowercase().cmp(&b.name.to_lowercase()));
Ok(lsp_list)
}

async fn register_payment_notifications(
&self,
lsp_id: String,
Expand Down
3 changes: 3 additions & 0 deletions libs/sdk-core/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ pub struct CustomMessage {
/// Trait covering LSP-related functionality
#[tonic::async_trait]
pub trait LspAPI: Send + Sync {
/// List LSPs available to the given pubkey
async fn list_lsps(&self, node_pubkey: String) -> SdkResult<Vec<LspInformation>>;
/// List all LSPs, active and historical, used by the given pubkey
async fn list_used_lsps(&self, node_pubkey: String) -> SdkResult<Vec<LspInformation>>;
/// Register for webhook callbacks at the given `webhook_url` whenever a new payment is received
async fn register_payment_notifications(
&self,
Expand Down
7 changes: 5 additions & 2 deletions libs/sdk-core/src/node_api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::pin::Pin;

use anyhow::Result;
Expand Down Expand Up @@ -182,10 +183,12 @@ pub trait NodeAPI: Send + Sync {
fn derive_bip32_key(&self, path: Vec<ChildNumber>) -> NodeResult<ExtendedPrivKey>;
fn legacy_derive_bip32_key(&self, path: Vec<ChildNumber>) -> NodeResult<ExtendedPrivKey>;

// Gets the routing hints related to all private channels that the node has.
// Also returns a boolean indicating if the node has a public channel or not.
/// Gets the routing hints related to all private channels that the node has.
/// Also returns a boolean indicating if the node has a public channel or not.
async fn get_routing_hints(
&self,
lsp_info: &LspInformation,
) -> NodeResult<(Vec<RouteHint>, bool)>;
/// Get peers with whom we have an open channel
async fn get_open_peers(&self) -> NodeResult<HashSet<Vec<u8>>>;
}
10 changes: 9 additions & 1 deletion libs/sdk-core/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::time::{Duration, SystemTime};
use std::{mem, vec};
Expand Down Expand Up @@ -506,6 +506,10 @@ impl NodeAPI for MockNodeAPI {
async fn fetch_bolt11(&self, _payment_hash: Vec<u8>) -> NodeResult<Option<FetchBolt11Result>> {
Ok(None)
}

async fn get_open_peers(&self) -> NodeResult<HashSet<Vec<u8>>> {
Ok(HashSet::new())
}
}

impl MockNodeAPI {
Expand Down Expand Up @@ -657,6 +661,10 @@ impl LspAPI for MockBreezServer {
}])
}

async fn list_used_lsps(&self, _node_pubkey: String) -> SdkResult<Vec<LspInformation>> {
Ok(vec![])
}

async fn register_payment_notifications(
&self,
_lsp_id: String,
Expand Down

0 comments on commit 2b8a586

Please sign in to comment.