diff --git a/Cargo.lock b/Cargo.lock index 0a9edf9c..3b5818bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -324,6 +324,7 @@ dependencies = [ "base64-compat", "bech32", "bitcoin_hashes", + "bitcoinconsensus", "secp256k1", "serde", ] @@ -337,6 +338,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bitcoinconsensus" +version = "0.19.0-3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a8aa43b5cd02f856cb126a9af819e77b8910fdd74dd1407be649f2f5fe3a1b5" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "bitcoincore-rpc" version = "0.15.0" @@ -1713,6 +1724,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d885bf509066af86ae85354c8959028ad6192c22a2657ef8271e94029d30f9d0" dependencies = [ "bitcoin", + "hex", + "regex", ] [[package]] @@ -2983,6 +2996,7 @@ version = "0.2.0" dependencies = [ "bitcoin", "bitcoincore-rpc", + "futures", "hex", "home", "jsonrpc-http-server", diff --git a/teos-common/Cargo.toml b/teos-common/Cargo.toml index be0cb489..04fcc2e7 100644 --- a/teos-common/Cargo.toml +++ b/teos-common/Cargo.toml @@ -24,4 +24,7 @@ bitcoin = { version = "0.28.0", features = [ "use-serde" ] } lightning = "0.0.108" [build-dependencies] -tonic-build = "0.6" \ No newline at end of file +tonic-build = "0.6" + +[dev-dependencies] +lightning = { version = "0.0.108", features = ["_test_utils"] } \ No newline at end of file diff --git a/teos-common/src/lib.rs b/teos-common/src/lib.rs index 7c8b621c..ac628258 100644 --- a/teos-common/src/lib.rs +++ b/teos-common/src/lib.rs @@ -13,6 +13,7 @@ pub mod constants; pub mod cryptography; pub mod dbm; pub mod errors; +pub mod lightning; pub mod net; pub mod receipts; pub mod ser; diff --git a/teos-common/src/lightning/convert.rs b/teos-common/src/lightning/convert.rs new file mode 100644 index 00000000..47cb455b --- /dev/null +++ b/teos-common/src/lightning/convert.rs @@ -0,0 +1,145 @@ +//! A module that implements useful gRPC messages to lightning message conversions. + +use super::messages::*; +use crate::appointment::Locator; +use crate::constants::ENCRYPTED_BLOB_MAX_SIZE; +use crate::protos as msgs; + +use bitcoin::hashes::Hash; +use bitcoin::Txid; + +/// Conversions from individual messages to tower messages. +mod msg_to_tower_msg { + use super::*; + macro_rules! impl_from_msg { + ($msg: ident) => { + impl From<$msg> for TowerMessage { + fn from(m: $msg) -> TowerMessage { + TowerMessage::$msg(m) + } + } + }; + } + + impl_from_msg!(Register); + impl_from_msg!(SubscriptionDetails); + impl_from_msg!(AddUpdateAppointment); + impl_from_msg!(AppointmentAccepted); + impl_from_msg!(AppointmentRejected); + impl_from_msg!(GetAppointment); + impl_from_msg!(AppointmentData); + impl_from_msg!(TrackerData); + impl_from_msg!(AppointmentNotFound); + impl_from_msg!(GetSubscriptionInfo); + impl_from_msg!(SubscriptionInfo); +} + +// FIXME: There are a lot of `unwrap()`s in these conversions. We assume that the gRPC interface won't send invalid data. +// If the conversion panics this would crash the lightning server. + +/// Conversion from user requests to gRPC requests. +/// These are used by the tower when routing lightning requests to its internal gRPC server. +mod msg_to_grpc { + use super::*; + impl From for msgs::RegisterRequest { + fn from(r: Register) -> Self { + msgs::RegisterRequest { + user_id: r.pubkey.to_vec(), + } + } + } + + impl From for msgs::AddAppointmentRequest { + fn from(r: AddUpdateAppointment) -> Self { + let appointment = msgs::Appointment { + locator: r.locator.to_vec(), + encrypted_blob: r.encrypted_blob, + to_self_delay: r.to_self_delay.unwrap_or(42), + }; + + msgs::AddAppointmentRequest { + appointment: Some(appointment), + signature: r.signature, + } + } + } + + impl From for msgs::GetAppointmentRequest { + fn from(r: GetAppointment) -> Self { + msgs::GetAppointmentRequest { + locator: r.locator.to_vec(), + signature: r.signature, + } + } + } + + impl From for msgs::GetSubscriptionInfoRequest { + fn from(r: GetSubscriptionInfo) -> Self { + msgs::GetSubscriptionInfoRequest { + signature: r.signature, + } + } + } +} + +/// Conversion from gRPC responses to tower responses. +/// These are used by the tower when parsing internal gRPC server's responses. +mod grpc_to_tower_msg { + use super::*; + impl From for TowerMessage { + fn from(r: msgs::RegisterResponse) -> Self { + SubscriptionDetails { + appointment_max_size: ENCRYPTED_BLOB_MAX_SIZE as u16, + start_block: r.subscription_start, + amount_msat: None, + invoice: None, + signature: Some(r.subscription_signature), + } + .into() + } + } + + impl From for TowerMessage { + fn from(r: msgs::AddAppointmentResponse) -> Self { + AppointmentAccepted { + locator: Locator::from_slice(&r.locator).unwrap(), + start_block: r.start_block, + receipt_signature: Some(r.signature), + } + .into() + } + } + + impl From for TowerMessage { + fn from(r: msgs::GetAppointmentResponse) -> Self { + match r.appointment_data.unwrap().appointment_data.unwrap() { + msgs::appointment_data::AppointmentData::Appointment(a) => AppointmentData { + locator: Locator::from_slice(&a.locator).unwrap(), + encrypted_blob: a.encrypted_blob, + } + .into(), + msgs::appointment_data::AppointmentData::Tracker(t) => TrackerData { + dispute_txid: Txid::from_slice(&t.dispute_txid).unwrap(), + penalty_txid: Txid::from_slice(&t.penalty_txid).unwrap(), + penalty_rawtx: t.penalty_rawtx, + } + .into(), + } + } + } + + impl From for TowerMessage { + fn from(r: msgs::GetSubscriptionInfoResponse) -> Self { + SubscriptionInfo { + available_slots: r.available_slots, + subscription_expiry: r.subscription_expiry, + locators: r + .locators + .into_iter() + .map(|l| Locator::from_slice(&l).unwrap()) + .collect(), + } + .into() + } + } +} diff --git a/teos-common/src/lightning/messages.rs b/teos-common/src/lightning/messages.rs new file mode 100644 index 00000000..6a7c4024 --- /dev/null +++ b/teos-common/src/lightning/messages.rs @@ -0,0 +1,357 @@ +//! Watchtower custom lightning messages that implement LDK's [`Readable`] & [`Writeable`] traits. +//! +//! [`Readable`]: lightning::util::ser::Readable + +use crate::appointment::Locator; +use crate::lightning::ser_macros::{impl_writeable_msg, set_msg_type}; +use crate::UserId; + +use bitcoin::Txid; +use lightning::io::Error; +use lightning::ln::wire; +use lightning::util::ser::{Writeable, Writer}; + +// Re-exporting this for other crates to use. +pub use crate::lightning::ser_utils::Type; + +/// The register message sent by the user to subscribe for the watching service. +#[derive(Clone, Debug)] +pub struct Register { + pub pubkey: UserId, + pub appointment_slots: u32, + pub subscription_period: u32, +} + +/// The subscription details message that is sent to the user after registering or toping up. +/// This message is the response to the register message. +#[derive(Clone, Debug)] +pub struct SubscriptionDetails { + pub appointment_max_size: u16, + pub start_block: u32, + // Optional TLV. + pub amount_msat: Option, + pub invoice: Option, + pub signature: Option, +} + +/// The add/update appointment message sent by the user. +#[derive(Clone, Debug)] +pub struct AddUpdateAppointment { + pub locator: Locator, + pub encrypted_blob: Vec, + pub signature: String, + // Optional TLV. + // FIXME: BOLT13 uses u64. + pub to_self_delay: Option, +} + +/// The appointment accepted message that is sent after an accepted add/update appointment message. +#[derive(Clone, Debug)] +pub struct AppointmentAccepted { + pub locator: Locator, + pub start_block: u32, + // Optional TLV. + pub receipt_signature: Option, +} + +/// The appointment rejected message that is sent if an add/update appointment message was rejected. +#[derive(Clone, Debug)] +pub struct AppointmentRejected { + pub locator: Locator, + pub rcode: u16, + pub reason: String, +} + +/// The get appointment message sent by the user to retrieve a previously sent appointment from the tower. +#[derive(Clone, Debug)] +pub struct GetAppointment { + pub locator: Locator, + pub signature: String, +} + +/// The appointment data message sent by the tower after a get appointment message. +#[derive(Clone, Debug)] +pub struct AppointmentData { + pub locator: Locator, + pub encrypted_blob: Vec, +} + +/// The tracker data message sent by the tower when the requested appointment has been acted upon. +#[derive(Clone, Debug)] +pub struct TrackerData { + pub dispute_txid: Txid, + pub penalty_txid: Txid, + pub penalty_rawtx: Vec, +} + +/// The appointment not found message sent by the tower in response to a get appointment message +/// whose locator didn't match any known appointment. +#[derive(Clone, Debug)] +pub struct AppointmentNotFound { + pub locator: Locator, +} + +/// The get subscription info message (a TEOS custom message, not a bolt13 one). +#[derive(Clone, Debug)] +pub struct GetSubscriptionInfo { + pub signature: String, +} + +/// The subscription info message sent by the tower in response to get subscription info message. +#[derive(Clone, Debug)] +pub struct SubscriptionInfo { + pub available_slots: u32, + pub subscription_expiry: u32, + // Sent as a TLV. Defaults to an empty vector. + pub locators: Vec, +} + +impl_writeable_msg!(Register, { + pubkey, + appointment_slots, + subscription_period +}, {}); + +impl_writeable_msg!(SubscriptionDetails, { + appointment_max_size, + start_block, +}, { + (1, amount_msat, opt), + // Use `opt_str` and not `opt` to avoid writing a length prefix for strings + // since it's already written in the length part of the TLV. + (3, invoice, opt_str), + (5, signature, opt_str), +}); + +impl_writeable_msg!(AddUpdateAppointment, { + locator, + encrypted_blob, + signature, +}, { + (1, to_self_delay, opt), +}); + +impl_writeable_msg!(AppointmentAccepted, { + locator, + start_block, +}, { + // Use `opt_str` and not `opt` to avoid writing a length prefix for strings + // since it's already written in the length part of the TLV. + (1, receipt_signature, opt_str), +}); + +impl_writeable_msg!(AppointmentRejected, { + locator, + rcode, + reason, +}, {}); + +impl_writeable_msg!(GetAppointment, { + locator, + signature, +}, {}); + +impl_writeable_msg!(AppointmentData, { + locator, + encrypted_blob, +}, {}); + +impl_writeable_msg!(TrackerData, { + dispute_txid, + penalty_txid, + penalty_rawtx, +}, {}); + +impl_writeable_msg!(AppointmentNotFound, { + locator, +}, {}); + +impl_writeable_msg!(GetSubscriptionInfo, { + signature, +}, {}); + +impl_writeable_msg!(SubscriptionInfo, { + available_slots, + subscription_expiry, +}, { + (1, locators, vec) +}); + +set_msg_type!(Register, 48848); +set_msg_type!(SubscriptionDetails, 48850); +set_msg_type!(AddUpdateAppointment, 48852); +set_msg_type!(AppointmentAccepted, 48854); +set_msg_type!(AppointmentRejected, 48856); +set_msg_type!(GetAppointment, 48858); +set_msg_type!(AppointmentData, 48860); +set_msg_type!(TrackerData, 48862); +set_msg_type!(AppointmentNotFound, 48864); +// Let these messages get odd types since they are auxiliary messages. +set_msg_type!(GetSubscriptionInfo, 48865); +set_msg_type!(SubscriptionInfo, 48867); + +#[derive(Clone, Debug)] +pub enum TowerMessage { + // Register messages + Register(Register), + SubscriptionDetails(SubscriptionDetails), + // Appointment submission messages + AddUpdateAppointment(AddUpdateAppointment), + AppointmentAccepted(AppointmentAccepted), + AppointmentRejected(AppointmentRejected), + // Appointment fetching messages + GetAppointment(GetAppointment), + AppointmentData(AppointmentData), + TrackerData(TrackerData), + AppointmentNotFound(AppointmentNotFound), + // User subscription messages + GetSubscriptionInfo(GetSubscriptionInfo), + SubscriptionInfo(SubscriptionInfo), +} + +impl wire::Type for TowerMessage { + fn type_id(&self) -> u16 { + match self { + TowerMessage::Register(..) => Register::TYPE, + TowerMessage::SubscriptionDetails(..) => SubscriptionDetails::TYPE, + TowerMessage::AddUpdateAppointment(..) => AddUpdateAppointment::TYPE, + TowerMessage::AppointmentAccepted(..) => AppointmentAccepted::TYPE, + TowerMessage::AppointmentRejected(..) => AppointmentRejected::TYPE, + TowerMessage::GetAppointment(..) => GetAppointment::TYPE, + TowerMessage::AppointmentData(..) => AppointmentData::TYPE, + TowerMessage::TrackerData(..) => TrackerData::TYPE, + TowerMessage::AppointmentNotFound(..) => AppointmentNotFound::TYPE, + TowerMessage::GetSubscriptionInfo(..) => GetSubscriptionInfo::TYPE, + TowerMessage::SubscriptionInfo(..) => SubscriptionInfo::TYPE, + } + } +} + +impl Writeable for TowerMessage { + fn write(&self, writer: &mut W) -> Result<(), Error> { + match self { + TowerMessage::Register(msg) => msg.write(writer), + TowerMessage::SubscriptionDetails(msg) => msg.write(writer), + TowerMessage::AddUpdateAppointment(msg) => msg.write(writer), + TowerMessage::AppointmentAccepted(msg) => msg.write(writer), + TowerMessage::AppointmentRejected(msg) => msg.write(writer), + TowerMessage::GetAppointment(msg) => msg.write(writer), + TowerMessage::AppointmentData(msg) => msg.write(writer), + TowerMessage::TrackerData(msg) => msg.write(writer), + TowerMessage::AppointmentNotFound(msg) => msg.write(writer), + TowerMessage::GetSubscriptionInfo(msg) => msg.write(writer), + TowerMessage::SubscriptionInfo(msg) => msg.write(writer), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cryptography::get_random_bytes; + use crate::lightning::ser_utils::{get_random_locator, get_random_txid, TestVecWriter}; + use crate::test_utils::get_random_user_id; + use lightning::io::Cursor; + use lightning::util::ser::{Readable, Writeable}; + use std::cmp::PartialEq; + use std::fmt::Debug; + use std::iter::FromIterator; + + fn test_msg(msg: T) { + // Get a writer and write the message to it. + let mut stream = TestVecWriter(Vec::new()); + msg.write(&mut stream).unwrap(); + // Create a reader out of the written buffer. + let mut stream = Cursor::new(stream.0); + let read_msg: T = Readable::read(&mut stream).unwrap(); + // Assert the serialized then deserialized message is the same as the original one. + assert_eq!(msg, read_msg); + } + + #[test] + fn test_tower_messages_empty_tlvs() { + test_msg(Register { + pubkey: get_random_user_id(), + appointment_slots: 4300, + subscription_period: 4032, + }); + test_msg(SubscriptionDetails { + appointment_max_size: 3032, + start_block: 358943, + amount_msat: None, + invoice: None, + signature: None, + }); + test_msg(AddUpdateAppointment { + locator: get_random_locator(), + encrypted_blob: get_random_bytes(542), + signature: String::from("sign: locator || encrypted_blob || to_self_delay?"), + to_self_delay: None, + }); + test_msg(AppointmentAccepted { + locator: get_random_locator(), + start_block: 500310, + receipt_signature: None, + }); + test_msg(AppointmentRejected { + locator: get_random_locator(), + rcode: 539, + reason: String::from("You have no more slots. 😒πŸ₯ΊπŸ’”"), + }); + test_msg(GetAppointment { + locator: get_random_locator(), + signature: String::from("this is my signature. and is real."), + }); + test_msg(AppointmentData { + locator: get_random_locator(), + encrypted_blob: get_random_bytes(678), + }); + test_msg(TrackerData { + dispute_txid: get_random_txid(), + penalty_txid: get_random_txid(), + penalty_rawtx: get_random_bytes(432), + }); + test_msg(AppointmentNotFound { + locator: get_random_locator(), + }); + test_msg(GetSubscriptionInfo { + signature: String::from("sign: get subscription info"), + }); + test_msg(SubscriptionInfo { + available_slots: 429, + subscription_expiry: 1093, + locators: Vec::new(), + }); + } + + #[test] + fn test_tower_message_with_tlvs() { + test_msg(SubscriptionDetails { + appointment_max_size: 4498, + start_block: 4934503, + amount_msat: Some(891431), + invoice: Some(String::from( + "lnbc100p1psj9jhxdqud3jxktt5w46x7unfv9kz6mn0v3jsnp4q0d3p2sfluzdx45...", + )), + signature: Some(String::from( + "sign: user_pubkey || appointment_max_size || start_block || amount_msat? || invoice?", + )), + }); + test_msg(AddUpdateAppointment { + locator: get_random_locator(), + encrypted_blob: get_random_bytes(542), + signature: String::from("sign: locator || encrypted_blob || to_self_delay?"), + to_self_delay: Some(56), + }); + test_msg(AppointmentAccepted { + locator: get_random_locator(), + start_block: 500310, + receipt_signature: Some(String::from("sign: user_signature || start_block")), + }); + test_msg(SubscriptionInfo { + available_slots: 429, + subscription_expiry: 1093, + locators: Vec::from_iter((0..10).map(|_| get_random_locator())), + }); + } +} diff --git a/teos-common/src/lightning/mod.rs b/teos-common/src/lightning/mod.rs new file mode 100644 index 00000000..9a0671e1 --- /dev/null +++ b/teos-common/src/lightning/mod.rs @@ -0,0 +1,4 @@ +mod convert; +pub mod messages; +mod ser_macros; +mod ser_utils; diff --git a/teos-common/src/lightning/ser_macros.rs b/teos-common/src/lightning/ser_macros.rs new file mode 100644 index 00000000..f27d0f11 --- /dev/null +++ b/teos-common/src/lightning/ser_macros.rs @@ -0,0 +1,392 @@ +//! A module containing some trait implementation macros to avoid repetition. +//! Most of this file is taken/inspired from [here](https://github.com/lightningdevkit/rust-lightning/blob/3676a056c85f54347e7e079e913317a79e26a2ae/lightning/src/util/ser_macros.rs). + +/* This file is licensed under either of + * Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or + * MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT) + * at your option. +*/ + +macro_rules! encode_tlv { + ($stream:expr, $type:expr, $field:expr, opt) => { + if let Some(ref field) = $field { + ser_macros::encode_tlv!($stream, $type, field); + } + }; + ($stream:expr, $type:expr, $field:expr, vec) => { + // Don't write the vector if it's empty. + if !$field.is_empty() { + // We can't just pass `$field` since this will move it out of the struct we are implementing + // this serialization for (but we could have cloned). That's why we pass a reference to it. + let lightning_vec = ser_utils::LightningVecWriter(&$field); + ser_macros::encode_tlv!($stream, $type, lightning_vec); + } + }; + ($stream:expr, $type:expr, $field:expr, opt_str) => { + if let Some(ref field) = $field { + let lightning_str = ser_utils::LightningVecWriter(field.as_bytes()); + ser_macros::encode_tlv!($stream, $type, lightning_str); + } + }; + ($stream:expr, $type:expr, $field:expr) => { + BigSize($type).write($stream)?; + BigSize($field.serialized_length() as u64).write($stream)?; + $field.write($stream)?; + }; +} + +macro_rules! encode_tlv_stream { + ($stream:expr, {$(($type:expr, $field:expr, $fieldty:tt)),* $(,)*}) => { { + #[allow(unused_imports)] + use { + lightning::util::ser::BigSize, + $crate::lightning::{ser_macros, ser_utils}, + }; + + $( + ser_macros::encode_tlv!($stream, $type, $field, $fieldty); + )* + + #[allow(unused_mut, unused_variables, unused_assignments, unused_comparisons)] + #[cfg(debug_assertions)] + { + let mut last_seen: Option = None; + $( + if let Some(t) = last_seen { + debug_assert!(t < $type, "{} <= {}; TLV types must be strictly increasing", $type, t); + } + last_seen = Some($type); + )* + } + } } +} + +macro_rules! decode_tlv { + ($reader:expr, $field:ident, opt) => { + $field = Some(Readable::read(&mut $reader)?); + }; + ($reader:expr, $field:ident, vec) => { + let lightning_vec = ser_utils::LightningVecReader::read(&mut $reader)?; + $field = lightning_vec.0; + }; + ($reader:expr, $field:ident, opt_str) => { + let lightning_str = ser_utils::LightningVecReader::read(&mut $reader)?; + let inner_str = + String::from_utf8(lightning_str.0).map_err(|_| DecodeError::InvalidValue)?; + $field = Some(inner_str); + }; +} + +macro_rules! decode_tlv_stream { + ($stream:expr, {$(($type:expr, $field:ident, $fieldty:tt)),* $(,)*}) => { { + #[allow(unused_imports)] + use { + lightning::ln::msgs::DecodeError, + lightning::util::ser::{BigSize, Readable}, + $crate::lightning::{ser_macros, ser_utils}, + }; + + let mut last_seen_type: Option = None; + let mut stream_ref = $stream; + + loop { + // First decode the type of this TLV: + let typ: BigSize = { + let mut tracking_reader = ser_utils::ReadTrackingReader::new(&mut stream_ref); + match Readable::read(&mut tracking_reader) { + Err(DecodeError::ShortRead) => { + if !tracking_reader.have_read { + break; + } else { + return Err(DecodeError::ShortRead); + } + }, + Err(e) => return Err(e), + Ok(t) => t, + } + }; + + // Types must be unique and monotonically increasing: + match last_seen_type { + Some(t) if typ.0 <= t => { + return Err(DecodeError::InvalidValue); + }, + _ => {}, + } + last_seen_type = Some(typ.0); + + // Finally, read the length and value itself: + let length: BigSize = Readable::read(&mut stream_ref)?; + let mut s = ser_utils::FixedLengthReader::new(&mut stream_ref, length.0); + match typ.0 { + $($type => { + ser_macros::decode_tlv!(s, $field, $fieldty); + if s.bytes_remain() { + s.eat_remaining()?; // Return ShortRead if there's actually not enough bytes + return Err(DecodeError::InvalidValue); + } + },)* + x if x % 2 == 0 => { + return Err(DecodeError::UnknownRequiredFeature); + }, + _ => {}, + } + s.eat_remaining()?; + } + } } +} + +macro_rules! init_tlv_field_var { + ($field:ident, opt) => { + let mut $field = None; + }; + ($field:ident, vec) => { + let mut $field = Vec::new(); + }; + ($field:ident, opt_str) => { + let mut $field = None; + }; +} + +macro_rules! impl_writeable_msg { + ($st:ty, {$($field:ident),* $(,)*}, {$(($type:expr, $tlvfield:ident, $fieldty:tt)),* $(,)*}) => { + impl lightning::util::ser::Writeable for $st { + fn write(&self, w: &mut W) -> Result<(), lightning::io::Error> { + $(self.$field.write(w)?;)* + $crate::lightning::ser_macros::encode_tlv_stream!(w, {$(($type, self.$tlvfield, $fieldty)),*}); + Ok(()) + } + } + + impl lightning::util::ser::Readable for $st { + fn read(r: &mut R) -> Result { + $(let $field = lightning::util::ser::Readable::read(r)?;)* + $($crate::lightning::ser_macros::init_tlv_field_var!($tlvfield, $fieldty);)* + $crate::lightning::ser_macros::decode_tlv_stream!(r, {$(($type, $tlvfield, $fieldty)),*}); + Ok(Self { + $($field),*, + $($tlvfield),* + }) + } + } + + #[cfg(test)] + impl std::cmp::PartialEq for $st { + fn eq(&self, other: &Self) -> bool { + true + $(&& self.$field == other.$field)* + $(&& self.$tlvfield == other.$tlvfield)* + } + } + } +} + +macro_rules! set_msg_type { + ($st:ty, $type:expr) => { + impl $crate::lightning::ser_utils::Type for $st { + const TYPE: u16 = $type; + } + }; +} + +// Macros used by `impl_writeable_msg`. +pub(super) use decode_tlv; +pub(super) use decode_tlv_stream; +pub(super) use encode_tlv; +pub(super) use encode_tlv_stream; +pub(super) use init_tlv_field_var; + +pub(super) use impl_writeable_msg; +pub(super) use set_msg_type; + +#[cfg(test)] +mod tests { + use std::mem::size_of; + + use crate::appointment::Locator; + use crate::lightning::ser_utils::Type; + use crate::lightning::{ser_macros, ser_utils}; + use lightning::io; + use lightning::ln::msgs::DecodeError; + use lightning::util::ser::{BigSize, Readable, Writeable}; + + macro_rules! encode_decode_tlv { + ($typ:expr, $field:expr, $fieldty:tt) => {{ + // Encode the TLV to a stream. + let mut stream = ser_utils::TestVecWriter(Vec::new()); + encode_tlv!(&mut stream, $typ, $field, $fieldty); + // Get a reader with the writer's buffer. + let mut cursor = io::Cursor::new(stream.0); + let mut stream = ser_utils::ReadTrackingReader::new(&mut cursor); + init_tlv_field_var!(read_field, $fieldty); + #[allow(unreachable_code)] + if false { + unreachable!(); + // This assignment will let the compiler infer the type of `read_field`. + read_field = $field; + } + // Try to read from the stream. Note that the stream might be empty if `$field` + // carried no info to be written in the first place. + let read_typ_result = BigSize::read(&mut stream); + if let Ok(read_typ) = read_typ_result { + let read_len = BigSize::read(&mut stream)?; + let mut stream = ser_utils::FixedLengthReader::new(&mut cursor, read_len.0); + decode_tlv!(stream, read_field, $fieldty); + Ok((read_typ.0, read_len.0, read_field)) + } else if !stream.have_read { + Ok(($typ, 0, read_field)) + } else { + Err(read_typ_result.err().unwrap()) + } + }}; + } + + macro_rules! test_encode_decode_tlv { + ($typ:expr, $len:expr, $val:expr, $fieldty:tt) => { + let (typ, len, val) = encode_decode_tlv!($typ, $val, $fieldty)?; + assert_eq!($typ as u64, typ, "Invalid TLV type"); + assert_eq!($len as u64, len, "Invalid TLV length"); + assert_eq!($val, val, "Invalid TLV value"); + }; + } + + macro_rules! test_opt_ranged_type { + ($type:ident, $expected_len:expr) => { + // This will try some values in the range of `$type`. + for i in ($type::MIN..$type::MAX) + .step_by(($type::MAX / 4 + 1) as usize) + .chain(vec![$type::MAX]) + { + test_encode_decode_tlv!(i as u64, $expected_len, Some(i), opt); + } + }; + } + + #[test] + fn test_encode_decode_tlv() -> Result<(), DecodeError> { + // All the Nones and empty vectors should have a length of zero. + test_encode_decode_tlv!(1, 0, Option::::None, opt); + test_encode_decode_tlv!(1, 0, Option::::None, opt); + test_encode_decode_tlv!(1, 0, Option::::None, opt); + test_encode_decode_tlv!(1, 0, Option::::None, opt); + test_encode_decode_tlv!(1, 0, Option::::None, opt); + let v: Vec = Vec::new(); + test_encode_decode_tlv!(1, 0, v, vec); + let v: Vec = Vec::new(); + test_encode_decode_tlv!(1, 0, v, vec); + let v: Vec = Vec::new(); + test_encode_decode_tlv!(1, 0, v, vec); + + // Non-None primitives should have their in-memory byte length as follows. + test_opt_ranged_type!(u8, 1); + test_opt_ranged_type!(u16, 2); + test_opt_ranged_type!(u32, 4); + test_opt_ranged_type!(u64, 8); + + // Other types. + let s = Some(String::from("teos")); + test_encode_decode_tlv!(1, s.as_ref().unwrap().len() + 2, s, opt); + test_encode_decode_tlv!(1, s.as_ref().unwrap().len(), s, opt_str); + + let v = vec![1_u8, 2, 3, 6]; + test_encode_decode_tlv!(1, v.len() * size_of::(), v, vec); + + let v = vec![1_u16, 2, 3, 809]; + test_encode_decode_tlv!(1, v.len() * size_of::(), v, vec); + + let l = ser_utils::get_random_locator(); + let v = vec![l; 5]; + test_encode_decode_tlv!(1, v.len() * size_of::(), v, vec); + + Ok(()) + } + + macro_rules! encode_decode_tlv_stream { + ({$(($type:expr, $field_name:ident, $fieldty:tt)),* $(,)*}) => {{ + // Encode the TLVs to a stream. + let mut stream = ser_utils::TestVecWriter(Vec::new()); + encode_tlv_stream!(&mut stream, {$(($type, $field_name, $fieldty)),*}); + // Re-initialize the fields to their default values before decoding. + $(init_tlv_field_var!($field_name, $fieldty);)* + // Decode with a reader with the writer's buffer. + decode_tlv_stream!(io::Cursor::new(stream.0), {$(($type, $field_name, $fieldty)),*}); + ($($field_name),*) + }}; + } + + macro_rules! test_encode_decode_tlv_stream { + ({$(($type:expr, $field_name:ident, $field:expr, $fieldty:tt)),* $(,)*}) => { + let original_stream = ($($field),*); + let decoded_stream = { + $(let $field_name = $field;)* + encode_decode_tlv_stream!({$(($type, $field_name, $fieldty)),*}) + }; + assert_eq!(original_stream, decoded_stream, "The decoded stream doesn't match the original one"); + }; + } + + macro_rules! test_encode_decode_tlv_stream_should_panic { + ($args:tt) => { + // Allow unreachable patterns which happen when we have some non-unique tlv types. + #[allow(unreachable_patterns)] + // Runs `test_encode_decode_tlv_stream` inside a closure so that we don't need to + // return a `Result<(), DecodeError`> from the test functions. + // `should_panic` tests must not return anything. + (|| { + test_encode_decode_tlv_stream!($args); + Ok(()) + })() + // Unwrap to panic on error. + .unwrap(); + }; + } + + #[test] + fn test_encode_decode_tlv_stream() -> Result<(), DecodeError> { + test_encode_decode_tlv_stream!({ + (0, a, Option::::None, opt), + (1, b, Some(3_u32), opt), + (2, c, Some(String::from("teos")), opt), + (31, d, Some(String::from("teos")), opt_str), + }); + + let l = ser_utils::get_random_locator(); + test_encode_decode_tlv_stream!({ + (12, a, vec![1_u32, 2, 3], vec), + (24, b, Some(vec![1_u8, 2, 3]), opt), + (31, c, vec!["one".to_owned(), "two".to_owned(), "3".to_owned()], vec), + (59, d, Vec::::new(), vec), + (78, e, vec![l; 4], vec), + }); + + Ok(()) + } + + #[test] + #[should_panic] + fn test_encode_decode_tlv_stream_decreasing_type() { + test_encode_decode_tlv_stream_should_panic!({ + (0, a, Some(0_u8), opt), + (2, b, Some(1_u32), opt), + (1, c, Vec::::new(), vec), + }); + } + + #[test] + #[should_panic] + fn test_encode_decode_tlv_stream_non_increasing_type() { + test_encode_decode_tlv_stream_should_panic!({ + (0, a, Some(0_u8), opt), + (1, b, Some(1_u32), opt), + (1, c, Vec::::new(), vec), + }); + } + + #[test] + fn test_set_msg_type() { + struct Test1; + set_msg_type!(Test1, 10); + + assert_eq!(Test1::TYPE, 10); + } +} diff --git a/teos-common/src/lightning/ser_utils.rs b/teos-common/src/lightning/ser_utils.rs new file mode 100644 index 00000000..c1252dba --- /dev/null +++ b/teos-common/src/lightning/ser_utils.rs @@ -0,0 +1,216 @@ +//! A helper module containing some lightning messages serialization stuff. +//! Most of this file is taken/inspired from [here](https://github.com/lightningdevkit/rust-lightning/blob/3676a056c85f54347e7e079e913317a79e26a2ae/lightning/src/util/ser.rs). + +/* This file is licensed under either of + * Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or + * MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT) + * at your option. +*/ + +use crate::appointment::{Locator, LOCATOR_LEN}; +use crate::UserId; + +use bitcoin::secp256k1::constants::PUBLIC_KEY_SIZE; +use lightning::io::{copy, sink, Error, ErrorKind, Read}; +use lightning::ln::msgs::DecodeError; +use lightning::util::ser::{MaybeReadable, Readable, Writeable, Writer}; + +/// A trait that associates a u16 [`Type::TYPE`] constant with a lightning message. +pub trait Type { + /// The type identifying the message payload. + const TYPE: u16; +} + +// Deserialization for a Locator inside a lightning message. +impl Readable for Locator { + fn read(reader: &mut R) -> Result { + let mut buf = [0; LOCATOR_LEN]; + reader.read_exact(&mut buf)?; + Self::from_slice(&buf).map_err(|_| DecodeError::InvalidValue) + } +} + +// Serialization for a Locator inside a lighting message. +impl Writeable for Locator { + fn write(&self, writer: &mut W) -> Result<(), Error> { + writer.write_all(&self.to_vec()) + } +} + +// Deserialization for a UserId inside a lightning message. +impl Readable for UserId { + fn read(reader: &mut R) -> Result { + let mut buf = [0; PUBLIC_KEY_SIZE]; + reader.read_exact(&mut buf)?; + Self::from_slice(&buf).map_err(|_| DecodeError::InvalidValue) + } +} + +// Serialization for a UserId inside a lighting message. +impl Writeable for UserId { + fn write(&self, writer: &mut W) -> Result<(), Error> { + self.0.write(writer) + } +} + +/// A read wrapper around a vector inside a lightning message. +/// This wrapper mainly exists because we cannot implement LDK's (de)serialization traits +/// for Vec (since neither the traits nor Vec are defined in our crate (the orphan rule)). +/// +/// [`Readable`] implementation for this struct assumes that there is no length prefix. +/// It will read the vector until there are no more items in the stream (Don't use with non-TLV field). +pub(super) struct LightningVecReader(pub Vec); + +// Deserialization for a vector of items inside a lightning message. +impl Readable for LightningVecReader { + #[inline] + fn read(mut reader: &mut R) -> Result { + let mut values = Vec::new(); + loop { + let mut track_read = ReadTrackingReader::new(&mut reader); + match MaybeReadable::read(&mut track_read) { + Ok(Some(v)) => { + values.push(v); + } + Ok(None) => {} + // If we failed to read any bytes at all, we reached the end of our TLV + // stream and have simply exhausted all entries. + Err(ref e) if e == &DecodeError::ShortRead && !track_read.have_read => break, + Err(e) => return Err(e), + } + } + Ok(Self(values)) + } +} + +/// A write wrapper around a vector/slice inside a lightning message. +/// Similar to [`LightningVecReader`] but the inner vector is a slice reference to avoid cloning. +/// +/// Note that we don't prefix the vector/slice with its length when serializing it, that's because this struct +/// is used in TLV streams which already has a BigSize length prefix (Don't use with non-TLV field). +pub(super) struct LightningVecWriter<'a, T>(pub &'a [T]); + +// Serialization for a vector of items inside a lighting message. +impl<'a, T: Writeable> Writeable for LightningVecWriter<'a, T> { + #[inline] + fn write(&self, writer: &mut W) -> Result<(), Error> { + for item in self.0.iter() { + item.write(writer)?; + } + Ok(()) + } +} + +/// Essentially std::io::Take but a bit simpler and with a method to walk the underlying stream +/// forward to ensure we always consume exactly the fixed length specified. +pub(super) struct FixedLengthReader { + read: R, + bytes_read: u64, + total_bytes: u64, +} + +impl FixedLengthReader { + /// Returns a new FixedLengthReader. + pub fn new(read: R, total_bytes: u64) -> Self { + Self { + read, + bytes_read: 0, + total_bytes, + } + } + + /// Returns whether there are remaining bytes or not. + #[inline] + pub fn bytes_remain(&mut self) -> bool { + self.bytes_read != self.total_bytes + } + + /// Consume the remaining bytes. + #[inline] + pub fn eat_remaining(&mut self) -> Result<(), DecodeError> { + copy(self, &mut sink()).or(Err(DecodeError::Io(ErrorKind::Other)))?; + if self.bytes_read != self.total_bytes { + Err(DecodeError::ShortRead) + } else { + Ok(()) + } + } +} + +impl Read for FixedLengthReader { + #[inline] + fn read(&mut self, dest: &mut [u8]) -> Result { + if self.total_bytes == self.bytes_read { + Ok(0) + } else { + let read_len = core::cmp::min(dest.len() as u64, self.total_bytes - self.bytes_read); + match self.read.read(&mut dest[0..(read_len as usize)]) { + Ok(v) => { + self.bytes_read += v as u64; + Ok(v) + } + Err(e) => Err(e), + } + } + } +} + +/// A Read which tracks whether any bytes have been read at all. This allows us to distinguish +/// between "EOF reached before we started" and "EOF reached mid-read". +pub(super) struct ReadTrackingReader { + read: R, + /// Tells whether we have read from this reader or not yet. + pub have_read: bool, +} + +impl ReadTrackingReader { + /// Returns a new ReadTrackingReader. + pub fn new(read: R) -> Self { + Self { + read, + have_read: false, + } + } +} + +impl Read for ReadTrackingReader { + #[inline] + fn read(&mut self, dest: &mut [u8]) -> Result { + match self.read.read(dest) { + Ok(0) => Ok(0), + Ok(len) => { + self.have_read = true; + Ok(len) + } + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod test_utils { + use crate::appointment::{Locator, LOCATOR_LEN}; + use crate::cryptography::get_random_bytes; + use bitcoin::hashes::Hash; + use bitcoin::Txid; + pub use lightning::util::test_utils::TestVecWriter; + + pub fn get_random_locator() -> Locator { + let bytes = get_random_bytes(LOCATOR_LEN); + Locator::from_slice(&bytes).unwrap() + } + + pub fn get_random_txid() -> Txid { + let bytes = get_random_bytes(32); + Txid::from_slice(&bytes).unwrap() + } + + #[allow(dead_code)] + pub fn get_random_string(size: usize) -> String { + let bytes = get_random_bytes(size); + String::from_utf8_lossy(&bytes).into_owned() + } +} + +#[cfg(test)] +pub(super) use test_utils::*; diff --git a/teos/Cargo.toml b/teos/Cargo.toml index b062b467..81e4c3cd 100644 --- a/teos/Cargo.toml +++ b/teos/Cargo.toml @@ -47,6 +47,7 @@ teos-common = { path = "../teos-common" } tonic-build = "0.6" [dev-dependencies] +futures = "0.3.21" jsonrpc-http-server = "17.1.0" rand = "0.8.4" tempdir = "0.3.7" diff --git a/teos/src/api/http.rs b/teos/src/api/http.rs index f35b9b20..5bfd95f0 100644 --- a/teos/src/api/http.rs +++ b/teos/src/api/http.rs @@ -316,13 +316,8 @@ mod test_helpers { use serde::de::DeserializeOwned; use serde_json::Value; - use std::sync::Arc; - use tokio::net::TcpListener; - use tonic::transport::Server; - use crate::api::internal::InternalAPI; - use crate::protos::public_tower_services_server::PublicTowerServicesServer; - use crate::test_utils::{create_api_with_config, ApiConfig, BitcoindStopper}; + use crate::test_utils::get_public_grpc_conn; pub(crate) enum RequestBody<'a> { Jsonify(&'a str), @@ -331,45 +326,12 @@ mod test_helpers { Body(&'a str), } - pub(crate) async fn run_tower_in_background_with_config( - api_config: ApiConfig, - ) -> (SocketAddr, Arc, BitcoindStopper) { - let (internal_rpc_api, bitcoind_stopper) = create_api_with_config(api_config).await; - let cloned = internal_rpc_api.clone(); - - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - tokio::spawn(async move { - Server::builder() - .add_service(PublicTowerServicesServer::new(internal_rpc_api)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) - .await - .unwrap(); - }); - - (addr, cloned, bitcoind_stopper) - } - - pub(crate) async fn run_tower_in_background() -> (SocketAddr, BitcoindStopper) { - let (sock_addr, _, bitcoind_stopper) = - run_tower_in_background_with_config(ApiConfig::default()).await; - - (sock_addr, bitcoind_stopper) - } - pub(crate) async fn check_api_error( endpoint: Endpoint, body: RequestBody<'_>, server_addr: SocketAddr, ) -> (ApiError, StatusCode) { - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let req = match body { RequestBody::Json(j) => warp::test::request() @@ -406,13 +368,7 @@ mod test_helpers { B: Serialize, T: DeserializeOwned, { - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .method("POST") @@ -427,9 +383,11 @@ mod test_helpers { #[cfg(test)] mod tests_failures { - use super::test_helpers::{check_api_error, run_tower_in_background, RequestBody}; + use super::test_helpers::{check_api_error, RequestBody}; use super::*; + use crate::test_utils::{get_public_grpc_conn, run_tower_in_background}; + use teos_common::test_utils::get_random_user_id; #[tokio::test] @@ -561,13 +519,7 @@ mod tests_failures { #[tokio::test] async fn test_empty_request_body() { let (server_addr, _s) = run_tower_in_background().await; - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .method("POST") @@ -581,13 +533,7 @@ mod tests_failures { #[tokio::test] async fn test_payload_too_large() { let (server_addr, _s) = run_tower_in_background().await; - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .method("POST") @@ -602,13 +548,7 @@ mod tests_failures { #[tokio::test] async fn test_wrong_endpoint() { let (server_addr, _s) = run_tower_in_background().await; - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .method("POST") @@ -622,13 +562,7 @@ mod tests_failures { #[tokio::test] async fn test_wrong_method() { let (server_addr, _s) = run_tower_in_background().await; - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .json(&"") @@ -641,14 +575,14 @@ mod tests_failures { #[cfg(test)] mod tests_methods { - use super::test_helpers::{ - check_api_error, request_to_api, run_tower_in_background, - run_tower_in_background_with_config, RequestBody, - }; + use super::test_helpers::{check_api_error, request_to_api, RequestBody}; use super::*; use crate::extended_appointment::UUID; - use crate::test_utils::{generate_dummy_appointment, ApiConfig, DURATION, SLOTS}; + use crate::test_utils::{ + generate_dummy_appointment, run_tower_in_background, run_tower_in_background_with_config, + ApiConfig, DURATION, SLOTS, + }; use teos_common::test_utils::get_random_user_id; use teos_common::{cryptography, UserId}; diff --git a/teos/src/api/lightning.rs b/teos/src/api/lightning.rs new file mode 100644 index 00000000..a2bba6b9 --- /dev/null +++ b/teos/src/api/lightning.rs @@ -0,0 +1,965 @@ +//! Watchtower's Lightning interface. + +use bitcoin::secp256k1::{PublicKey, SecretKey}; +use triggered::Listener; + +use crate::protos::public_tower_services_client::PublicTowerServicesClient; +use tonic::transport::Channel; +use tonic::Code; + +use lightning::io; +use lightning::ln::msgs::{DecodeError, ErrorAction, LightningError, WarningMessage}; +use lightning::ln::peer_handler::{ + CustomMessageHandler, ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, + PeerManager, +}; +use lightning::ln::wire::CustomMessageReader; +use lightning::util::logger::{Level, Logger as LightningLogger, Record}; +use lightning::util::ser::Readable; + +use lightning_net_tokio::SocketDescriptor; + +use std::convert::TryInto; +use std::mem; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +use teos_common::cryptography::get_random_bytes; +use teos_common::lightning::messages::*; +use teos_common::protos as common_msgs; + +// FIXME: Check if we can drop some Arcs here. +type TowerPeerManager = PeerManager< + SocketDescriptor, + Arc, // No channel message handler + Arc, // No routing message handler + Arc, + Arc, // Using our custom message handler +>; + +/// A helper that returns an [`Err(LightningError)`] with the specified warning message. +fn warn_peer(msg_to_peer: &str, msg_to_log: &str) -> Result { + Err(LightningError { + err: msg_to_log.to_owned(), + action: ErrorAction::SendWarningMessage { + msg: WarningMessage { + // Zeros for channel id tells that the warning isn't channel specific. + channel_id: [0; 32], + data: msg_to_peer.to_owned(), + }, + log_level: Level::Warn, + }, + }) +} + +/// A handler to handle the incoming [`TowerMessage`]s. +pub struct TowerMessageHandler { + /// A queue holding the response messages or errors the tower wants to send to its peers. + msg_queue: Mutex>, + // TODO: Will it make more sense using the watcher interface instead of the gRPC? + // since the watcher interface is not async and it does provide richer error codes. + /// A connection to the tower's public internal gRPC API. + grpc_conn: PublicTowerServicesClient, + /// A tokio runtime handle to run gRPC async calls on. + handle: tokio::runtime::Handle, +} + +impl TowerMessageHandler { + fn new(grpc_conn: PublicTowerServicesClient, handle: tokio::runtime::Handle) -> Self { + Self { + msg_queue: Mutex::new(Vec::new()), + grpc_conn, + handle, + } + } + + /// Handles a tower request message by casting it to a gRPC message and send it to the + /// internal API. The API's response is then casted to a tower response and returned. + /// The argument `peer` is used for logging purposes only. + fn handle_tower_message( + &self, + msg: TowerMessage, + peer: &PublicKey, + ) -> Result { + tokio::task::block_in_place(|| { + log::info!("Received {:?} from {}", msg, peer); + let mut grpc_conn = self.grpc_conn.clone(); + match msg { + TowerMessage::Register(msg) => { + let res = self + .handle + .block_on(grpc_conn.register(common_msgs::RegisterRequest::from(msg))); + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) => warn_peer( + e.message(), + &format!("Failed registering {} because {}", peer, e.message()), + ), + } + } + TowerMessage::AddUpdateAppointment(msg) => { + let res = + self.handle.block_on(grpc_conn.add_appointment( + common_msgs::AddAppointmentRequest::from(msg.clone()), + )); + match res { + Ok(r) => Ok(r.into_inner().into()), + // NOTE: The gRPC interface multiplexes the errors and doesn't let us know what they exactly + // were. Possible errors can be found [here](crate::watcher::AddAppointmentFailure). + Err(e) if e.code() == Code::Unauthenticated => Ok(AppointmentRejected { + locator: msg.locator, + rcode: Code::Unauthenticated as u16, + reason: e.message().into(), + } + .into()), + Err(e) if e.code() == Code::AlreadyExists => Ok(AppointmentRejected { + locator: msg.locator, + rcode: Code::AlreadyExists as u16, + reason: e.message().into(), + } + .into()), + Err(e) => { + warn_peer( + e.message(), + &format!( + "Failed accepting appointment from {} with locator {} because {}", + peer, msg.locator, e.message() + ), + ) + } + } + } + TowerMessage::GetAppointment(msg) => { + let res = + self.handle.block_on(grpc_conn.get_appointment( + common_msgs::GetAppointmentRequest::from(msg.clone()), + )); + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) if e.code() == Code::NotFound => Ok(AppointmentNotFound { + locator: msg.locator, + } + .into()), + Err(e) => warn_peer( + e.message(), + &format!( + "GetAppointment request from {} failed because {}", + peer, + e.message() + ), + ), + } + } + TowerMessage::GetSubscriptionInfo(msg) => { + let res = + self.handle.block_on(grpc_conn.get_subscription_info( + common_msgs::GetSubscriptionInfoRequest::from(msg), + )); + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) => warn_peer( + e.message(), + &format!( + "GetSubscriptionInfo request from {} failed because {}", + peer, + e.message() + ), + ), + } + } + // TODO: DeleteAppointment + // TowerMessageHandler as CustomMessageReader won't produce other than the above messages. + _ => unreachable!(), + } + }) + } +} + +impl CustomMessageReader for TowerMessageHandler { + type CustomMessage = TowerMessage; + + fn read( + &self, + message_type: u16, + buffer: &mut R, + ) -> Result, DecodeError> { + match message_type { + Register::TYPE => Ok(Some(Register::read(buffer)?.into())), + AddUpdateAppointment::TYPE => Ok(Some(AddUpdateAppointment::read(buffer)?.into())), + GetAppointment::TYPE => Ok(Some(GetAppointment::read(buffer)?.into())), + GetSubscriptionInfo::TYPE => Ok(Some(GetSubscriptionInfo::read(buffer)?.into())), + // Unknown message. + _ => Ok(None), + } + } +} + +impl CustomMessageHandler for TowerMessageHandler { + fn handle_custom_message( + &self, + msg: TowerMessage, + sender_node_id: &PublicKey, + ) -> Result<(), LightningError> { + self.msg_queue.lock().unwrap().push(( + *sender_node_id, + self.handle_tower_message(msg, sender_node_id)?, + )); + Ok(()) + } + + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, TowerMessage)> { + mem::take(&mut self.msg_queue.lock().unwrap()) + } +} + +/// A translation struct to translate LDK's logs to our logging system's logs. +pub struct Logger; + +impl LightningLogger for Logger { + fn log(&self, record: &Record) { + #[cfg(test)] + // Pass "-- --nocapture" flag to "cargo test" for this println to appear. + println!( + // "\x1B" stuff are terminal colors. Might not work in some terminals though. + "\x1B[42m{}\x1B[0m [\x1B[33m{}:{}\x1B[0m]: {}", + record.level, record.module_path, record.line, record.args + ); + #[cfg(not(test))] + match record.level { + Level::Error => log::error!(target: record.module_path, "{}", record.args), + Level::Warn => log::warn!(target: record.module_path, "{}", record.args), + Level::Info => log::info!(target: record.module_path, "{}", record.args), + Level::Debug => log::debug!(target: record.module_path, "{}", record.args), + Level::Trace => log::trace!(target: record.module_path, "{}", record.args), + _ => {} + } + } +} + +pub async fn serve( + lightning_bind: SocketAddr, + grpc_bind: String, + shutdown_signal: Listener, + tower_sk: SecretKey, +) { + let grpc_conn = loop { + match PublicTowerServicesClient::connect(grpc_bind.clone()).await { + Ok(conn) => break conn, + Err(_) => { + log::error!("Cannot connect to the gRPC server. Retrying shortly"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + }; + let tower_message_handler = Arc::new(TowerMessageHandler::new( + grpc_conn, + tokio::runtime::Handle::current(), + )); + let message_handler = MessageHandler { + chan_handler: Arc::new(ErroringMessageHandler::new()), + route_handler: Arc::new(IgnoringMessageHandler {}), + }; + let ephemeral_bytes: [u8; 32] = get_random_bytes(32).try_into().unwrap(); + let peer_manager = Arc::new(TowerPeerManager::new( + message_handler, + tower_sk, + &ephemeral_bytes, + Arc::new(Logger), + tower_message_handler, + )); + // To suppress an issue similar to https://github.com/rust-lang/rust-clippy/issues/2928 + #[allow(clippy::expect_fun_call)] + let listener = tokio::net::TcpListener::bind(lightning_bind) + .await + .expect(&format!( + "Couldn't bind the lightning server to {}", + lightning_bind + )); + // A tokio runtime handle to run lightning net tokio on. This is to fix a deadlock issue similar + // to https://github.com/lightningdevkit/rust-lightning/issues/1367 which appears with too many + // concurrent requests to the server. + let ldk_handle = Box::leak(Box::new(tokio::runtime::Runtime::new().unwrap())) + .handle() + .clone(); + loop { + let tcp_stream = listener.accept().await.unwrap().0; + if shutdown_signal.is_triggered() { + return; + } + let peer_manager = peer_manager.clone(); + ldk_handle.spawn(async move { + lightning_net_tokio::setup_inbound(peer_manager, tcp_stream.into_std().unwrap()).await; + }); + } +} + +#[cfg(test)] +mod test_lightning_client { + use super::*; + + use std::collections::VecDeque; + + pub(crate) type TestClientPeerManager = PeerManager< + SocketDescriptor, + Arc, // No channel message handler + Arc, // No routing message handler + Arc, + Arc, // Using our custom message handler + >; + + pub(crate) struct TestClientMessageHandler { + msg_queue: Mutex>, + // A vector we store the received messages in to test whether the tower sent correct responses or not. + received_msgs: Mutex>, + } + + impl TestClientMessageHandler { + pub(crate) fn new() -> Self { + Self { + msg_queue: Mutex::new(Vec::new()), + received_msgs: Mutex::new(VecDeque::new()), + } + } + + /// Sends a tower message to `peer`. + /// This works by pushing the message to a pending messages queue and notifying the passed + /// `peer_manager` that there are some events to process. + /// + /// You should only pass the peer manager that is holding a reference of this `TestClientMessageHandler` + /// (`self`) as a custom message handler and not any other peer manager. + pub(crate) fn send_msg( + &self, + peer_manager: &TestClientPeerManager, + msg: TowerMessage, + peer: &PublicKey, + ) { + self.msg_queue.lock().unwrap().push((*peer, msg)); + // Let the peer manager process our pending message. + peer_manager.process_events(); + // The message queue must be empty after the peer manager has processed events. + assert!(self.msg_queue.lock().unwrap().is_empty()); + } + + pub(crate) fn received_msgs_count(&self) -> usize { + self.received_msgs.lock().unwrap().len() + } + + pub(crate) fn pop_oldest_received_msg(&self) -> TowerMessage { + self.received_msgs.lock().unwrap().pop_front().unwrap() + } + } + + impl CustomMessageReader for TestClientMessageHandler { + type CustomMessage = TowerMessage; + + fn read( + &self, + message_type: u16, + buffer: &mut R, + ) -> Result, DecodeError> { + match message_type { + Register::TYPE => Ok(Some(Register::read(buffer)?.into())), // A real client shouldn't have this + SubscriptionDetails::TYPE => Ok(Some(SubscriptionDetails::read(buffer)?.into())), + AddUpdateAppointment::TYPE => Ok(Some(AddUpdateAppointment::read(buffer)?.into())), // ,this + AppointmentAccepted::TYPE => Ok(Some(AppointmentAccepted::read(buffer)?.into())), + AppointmentRejected::TYPE => Ok(Some(AppointmentRejected::read(buffer)?.into())), + GetAppointment::TYPE => Ok(Some(GetAppointment::read(buffer)?.into())), // ,this + AppointmentData::TYPE => Ok(Some(AppointmentData::read(buffer)?.into())), + TrackerData::TYPE => Ok(Some(TrackerData::read(buffer)?.into())), + AppointmentNotFound::TYPE => Ok(Some(AppointmentNotFound::read(buffer)?.into())), + GetSubscriptionInfo::TYPE => Ok(Some(GetSubscriptionInfo::read(buffer)?.into())), // ,and this. + SubscriptionInfo::TYPE => Ok(Some(SubscriptionInfo::read(buffer)?.into())), + // Unknown message. + _ => Ok(None), + } + } + } + + impl CustomMessageHandler for TestClientMessageHandler { + fn handle_custom_message( + &self, + msg: TowerMessage, + _sender_node_id: &PublicKey, + ) -> Result<(), LightningError> { + self.received_msgs.lock().unwrap().push_back(msg); + Ok(()) + } + + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, TowerMessage)> { + mem::take(&mut self.msg_queue.lock().unwrap()) + } + } +} + +#[cfg(test)] +mod test_helpers { + use super::test_lightning_client::*; + use super::*; + + use bitcoin::secp256k1::Secp256k1; + use teos_common::cryptography::get_random_keypair; + + use crate::api::internal::InternalAPI; + use crate::test_utils::{ + get_public_grpc_conn, run_tower_in_background_with_config, ApiConfig, BitcoindStopper, + }; + + pub(crate) const WAIT_DURATION: tokio::time::Duration = tokio::time::Duration::from_millis(10); + + pub(crate) async fn get_tower_message_handler_with_config( + conf: ApiConfig, + ) -> (Arc, Arc, BitcoindStopper) { + let (server_addr, internal_api, bitcoind_stopper) = + run_tower_in_background_with_config(conf).await; + let grpc_conn = get_public_grpc_conn(server_addr).await; + let handle = tokio::runtime::Handle::current(); + ( + Arc::new(TowerMessageHandler::new(grpc_conn, handle)), + internal_api, + bitcoind_stopper, + ) + } + + pub(crate) async fn get_tower_message_handler( + ) -> (Arc, Arc, BitcoindStopper) { + get_tower_message_handler_with_config(ApiConfig::default()).await + } + + pub(crate) async fn request_to_tower_message_handler( + tower: &Arc, + msg: TowerMessage, + peer: PublicKey, + ) -> Result { + let tower = tower.clone(); + // Must use `spawn_blocking` because `handle_tower_message` uses `block_on`. + tokio::task::spawn_blocking(move || tower.handle_tower_message(msg, &peer)) + .await + .unwrap() + } + + /// Spawns a tower and a Lightning server that accepts tower messages. + /// Note that the server might not be fully booted up after this function returns. + pub(crate) async fn run_lightning_tower_with_config( + conf: ApiConfig, + ) -> (SocketAddr, PublicKey, BitcoindStopper) { + let (server_addr, internal_api, bitcoind_stopper) = + run_tower_in_background_with_config(conf).await; + let lightning_bind = { + let unused_port = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + unused_port.local_addr().unwrap() + }; + let grpc_bind = format!("http://{}:{}", server_addr.ip(), server_addr.port()); + let (_, shutdown_signal) = triggered::trigger(); + let tower_sk = internal_api.get_watcher().get_signing_key(); + // To make the tests simple, we won't let the testers await on the task or hand them shutdown triggers. + let _ = tokio::task::spawn(serve(lightning_bind, grpc_bind, shutdown_signal, tower_sk)); + ( + lightning_bind, + PublicKey::from_secret_key(&Secp256k1::new(), &tower_sk), + bitcoind_stopper, + ) + } + + pub(crate) async fn run_lightning_tower() -> (SocketAddr, PublicKey, BitcoindStopper) { + run_lightning_tower_with_config(ApiConfig::default()).await + } + + pub(crate) fn get_test_client_peer_manager() -> ( + Arc, + Arc, + PublicKey, + ) { + let client_message_handler = Arc::new(TestClientMessageHandler::new()); + let (client_sk, client_pk) = get_random_keypair(); + let ephemeral_bytes: [u8; 32] = get_random_bytes(32).try_into().unwrap(); + ( + Arc::new(TestClientPeerManager::new( + MessageHandler { + chan_handler: Arc::new(ErroringMessageHandler::new()), + route_handler: Arc::new(IgnoringMessageHandler {}), + }, + client_sk, + &ephemeral_bytes, + Arc::new(Logger), + client_message_handler.clone(), + )), + client_message_handler, + client_pk, + ) + } + + /// Connects `client_peer_manager` to another peer manager at `tower_addr`. + /// It keeps trying indefinitely till a connection is successful. + pub(crate) async fn connect_to_tower( + client_peer_manager: Arc, + tower_addr: SocketAddr, + tower_pk: PublicKey, + ) { + // From https://lightningdevkit.org/payments/connecting_peers/ + loop { + match lightning_net_tokio::connect_outbound( + client_peer_manager.clone(), + tower_pk, + tower_addr, + ) + .await + { + Some(connection_closed_future) => { + let mut connection_closed_future = Box::pin(connection_closed_future); + loop { + // Make sure the connection is still established. + match futures::poll!(&mut connection_closed_future) { + std::task::Poll::Ready(_) => { + panic!( + "{}@{} disconnected before handshake completed", + tower_pk, tower_addr + ); + } + std::task::Poll::Pending => {} + } + // Wait for the handshake to complete. + match client_peer_manager + .get_peer_node_ids() + .iter() + .find(|id| **id == tower_pk) + { + Some(_) => return, + None => tokio::time::sleep(WAIT_DURATION).await, + } + } + } + None => { + // The server takes some time to boot up. Let's wait a little bit. + tokio::time::sleep(WAIT_DURATION).await; + } + } + } + } +} + +#[cfg(test)] +mod message_handler_tests { + use super::test_helpers::*; + use super::*; + + use teos_common::cryptography::{get_random_keypair, sign}; + use teos_common::test_utils::{generate_random_appointment, get_random_user_id}; + use teos_common::UserId; + + use crate::extended_appointment::UUID; + use crate::test_utils::{ApiConfig, DURATION}; + + #[tokio::test] + async fn test_register() { + let (tower, _, _s) = get_tower_message_handler().await; + let user_id = get_random_user_id(); + let msg = Register { + pubkey: user_id, + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + assert!(matches! { + request_to_tower_message_handler(&tower, msg, user_id.0).await, + Ok(TowerMessage::SubscriptionDetails(SubscriptionDetails { + .. + })) + }) + } + + #[tokio::test] + async fn test_register_max_slots() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::new(u32::MAX, DURATION)).await; + let user_id = get_random_user_id(); + let msg: TowerMessage = Register { + pubkey: user_id, + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // First registration should go through. + assert!(matches!( + request_to_tower_message_handler(&tower, msg.clone(), user_id.0).await, + Ok(TowerMessage::SubscriptionDetails( + SubscriptionDetails { .. } + )) + )); + + // Second one should fail (maximum slots count reached). + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_id.0).await, + Err(LightningError { err, .. }) if err.contains("maximum slots") + )); + } + + #[tokio::test] + async fn test_register_service_unavailable() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::default().bitcoind_unreachable()) + .await; + let user_id = get_random_user_id(); + let msg = Register { + pubkey: user_id, + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_id.0).await, + Err(LightningError { err, .. }) if err.contains("currently unavailable") + )); + } + + #[tokio::test] + async fn test_add_appointment() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // Register with the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob, + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentAccepted( + AppointmentAccepted { locator, .. } + )) if locator == appointment.locator + )); + } + + #[tokio::test] + async fn test_add_appointment_non_registered() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob, + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentRejected( + AppointmentRejected { locator, rcode, .. } + )) if locator == appointment.locator && rcode == Code::Unauthenticated as u16 + )); + } + + #[tokio::test] + async fn test_add_appointment_already_triggered() { + let (tower, internal_api, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // Register with the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob, + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + // Add the appointment to the responder so it counts as triggered. + internal_api + .get_watcher() + .add_random_tracker_to_responder(UUID::new(appointment.locator, UserId(user_pk))); + + // Send the appointment to the tower and assert it rejects because of being already triggered. + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentRejected( + AppointmentRejected { locator, rcode, .. } + )) if locator == appointment.locator && rcode == Code::AlreadyExists as u16 + )); + } + + #[tokio::test] + async fn test_add_appointment_service_unavailable() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::default().bitcoind_unreachable()) + .await; + let (user_sk, user_pk) = get_random_keypair(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob, + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("currently unavailable") + )); + } + + #[tokio::test] + async fn test_get_appointment() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // Register with the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob.clone(), + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + // Send the appointment to the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let signature = sign( + format!("get appointment {}", appointment.locator).as_bytes(), + &user_sk, + ) + .unwrap(); + let msg = GetAppointment { + locator: appointment.locator, + signature, + } + .into(); + + // Assert the tower has the appointment we just sent. + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentData(AppointmentData { + locator, encrypted_blob + })) if locator == appointment.locator && encrypted_blob == appointment.encrypted_blob + )); + } + + #[tokio::test] + async fn test_get_appointment_non_registered() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let appointment = generate_random_appointment(None); + let signature = sign( + format!("get appointment {}", appointment.locator).as_bytes(), + &user_sk, + ) + .unwrap(); + let msg = GetAppointment { + locator: appointment.locator, + signature, + } + .into(); + + // Assert the tower cannot authenticate us. + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("cannot be authenticated") + )); + } + + #[tokio::test] + async fn test_get_appointment_not_found() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // Register with the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let appointment = generate_random_appointment(None); + let signature = sign( + format!("get appointment {}", appointment.locator).as_bytes(), + &user_sk, + ) + .unwrap(); + let msg = GetAppointment { + locator: appointment.locator, + signature, + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentNotFound(AppointmentNotFound { + locator + })) if locator == appointment.locator + )); + } + + #[tokio::test] + async fn test_get_appointment_service_unavailable() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::default().bitcoind_unreachable()) + .await; + let (user_sk, user_pk) = get_random_keypair(); + let appointment = generate_random_appointment(None); + let signature = sign( + format!("get appointment {}", appointment.locator).as_bytes(), + &user_sk, + ) + .unwrap(); + let msg = GetAppointment { + locator: appointment.locator, + signature, + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("currently unavailable") + )); + } + + #[tokio::test] + async fn test_get_subscription_info() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let signature = sign(format!("get subscription info").as_bytes(), &user_sk).unwrap(); + let msg = GetSubscriptionInfo { signature }.into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::SubscriptionInfo(SubscriptionInfo { .. })) + )); + } + + #[tokio::test] + async fn test_get_subscription_info_non_registered() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let signature = sign(format!("get subscription info").as_bytes(), &user_sk).unwrap(); + let msg = GetSubscriptionInfo { signature }.into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("User not found") + )); + } + + #[tokio::test] + async fn test_get_subscription_info_service_unavailable() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::default().bitcoind_unreachable()) + .await; + let (user_sk, user_pk) = get_random_keypair(); + let signature = sign(format!("get subscription info").as_bytes(), &user_sk).unwrap(); + let msg = GetSubscriptionInfo { signature }.into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("currently unavailable") + )); + } +} + +#[cfg(test)] +mod peer_manager_tests { + use super::test_helpers::*; + use super::*; + + use teos_common::UserId; + + #[tokio::test] + async fn simple_test() { + let (tower_addr, tower_pk, _s) = run_lightning_tower().await; + let (client_peer_manager, client_messenger, client_pk) = get_test_client_peer_manager(); + connect_to_tower(client_peer_manager.clone(), tower_addr, tower_pk).await; + + let msg = Register { + pubkey: UserId(client_pk), + appointment_slots: 8778, + subscription_period: 6726, + } + .into(); + + // Send the register message to the tower. + client_messenger.send_msg(&client_peer_manager, msg, &tower_pk); + // And wait till we get a response. + while client_messenger.received_msgs_count() != 1 { + tokio::time::sleep(WAIT_DURATION).await; + } + + let received_msg = client_messenger.pop_oldest_received_msg(); + assert!(matches!( + received_msg, + TowerMessage::SubscriptionDetails(SubscriptionDetails { .. }) + )); + } +} diff --git a/teos/src/api/mod.rs b/teos/src/api/mod.rs index e901b046..76ad53d8 100644 --- a/teos/src/api/mod.rs +++ b/teos/src/api/mod.rs @@ -1,4 +1,5 @@ pub mod http; pub mod internal; +pub mod lightning; pub mod serde; pub mod tor; diff --git a/teos/src/config.rs b/teos/src/config.rs index 54e7a996..bc1acac8 100644 --- a/teos/src/config.rs +++ b/teos/src/config.rs @@ -46,13 +46,17 @@ impl std::error::Error for ConfigError {} #[structopt(rename_all = "lowercase")] #[structopt(version = env!("CARGO_PKG_VERSION"), about = "The Eye of Satoshi - Lightning watchtower")] pub struct Opt { - /// Address teos HTTP(s) API will bind to [default: localhost] + /// Address teos API will bind to [default: localhost] #[structopt(long)] pub api_bind: Option, + /// Port teos Lightning API will bind to [default: 9815] + #[structopt(long)] + pub lightning_port: Option, + /// Port teos HTTP(s) API will bind to [default: 9814] #[structopt(long)] - pub api_port: Option, + pub http_port: Option, /// Address teos RPC server will bind to [default: localhost] #[structopt(long)] @@ -122,7 +126,8 @@ pub struct Opt { pub struct Config { // API pub api_bind: String, - pub api_port: u16, + pub lightning_port: u16, + pub http_port: u16, // RPC pub rpc_bind: String, @@ -163,8 +168,11 @@ impl Config { if options.api_bind.is_some() { self.api_bind = options.api_bind.unwrap(); } - if options.api_port.is_some() { - self.api_port = options.api_port.unwrap(); + if options.lightning_port.is_some() { + self.lightning_port = options.lightning_port.unwrap(); + } + if options.http_port.is_some() { + self.http_port = options.http_port.unwrap(); } if options.rpc_bind.is_some() { self.rpc_bind = options.rpc_bind.unwrap(); @@ -254,7 +262,8 @@ impl Default for Config { fn default() -> Self { Self { api_bind: "127.0.0.1".into(), - api_port: 9814, + http_port: 9814, + lightning_port: 9815, tor_support: false, tor_control_port: 9051, onion_hidden_service_port: 9814, @@ -288,7 +297,8 @@ mod tests { fn default() -> Self { Self { api_bind: None, - api_port: None, + http_port: None, + lightning_port: None, tor_support: false, tor_control_port: None, onion_hidden_service_port: None, diff --git a/teos/src/main.rs b/teos/src/main.rs index 19863109..5f5c6227 100644 --- a/teos/src/main.rs +++ b/teos/src/main.rs @@ -18,8 +18,9 @@ use lightning_block_sync::poll::{ }; use lightning_block_sync::{BlockSource, SpvClient, UnboundedCache}; +use teos::api; use teos::api::internal::InternalAPI; -use teos::api::{http, tor::TorAPI}; +use teos::api::tor::TorAPI; use teos::bitcoin_cli::BitcoindClient; use teos::carrier::Carrier; use teos::chain_monitor::ChainMonitor; @@ -250,6 +251,7 @@ async fn main() { let (shutdown_trigger, shutdown_signal_rpc_api) = triggered::trigger(); let shutdown_signal_internal_rpc_api = shutdown_signal_rpc_api.clone(); + let shutdown_signal_lightning = shutdown_signal_rpc_api.clone(); let shutdown_signal_http = shutdown_signal_rpc_api.clone(); let shutdown_signal_cm = shutdown_signal_rpc_api.clone(); let shutdown_signal_tor = shutdown_signal_rpc_api.clone(); @@ -274,12 +276,15 @@ async fn main() { log::info!("Bootstrap completed. Turning on interfaces"); // Build interfaces - let http_api_addr = format!("{}:{}", conf.api_bind, conf.api_port) + let lightning_api_addr = format!("{}:{}", conf.api_bind, conf.lightning_port) + .parse() + .unwrap(); + let http_api_addr = format!("{}:{}", conf.api_bind, conf.http_port) .parse() .unwrap(); let mut addresses = vec![msgs::NetworkAddress::from_ipv4( conf.api_bind.clone(), - conf.api_port, + conf.http_port, )]; // Create Tor endpoint if required @@ -293,7 +298,7 @@ async fn main() { .await; addresses.push(msgs::NetworkAddress::from_torv3( tor_api.get_onion_address(), - conf.api_port, + conf.onion_hidden_service_port, )); Some(tor_api) @@ -351,7 +356,14 @@ async fn main() { }); let (http_service_ready, ready_signal_http) = triggered::trigger(); - let http_api_task = task::spawn(http::serve( + let lightning_api_task = task::spawn(api::lightning::serve( + lightning_api_addr, + internal_rpc_api_uri.clone(), + shutdown_signal_lightning, + tower_sk, + )); + + let http_api_task = task::spawn(api::http::serve( http_api_addr, internal_rpc_api_uri, http_service_ready, @@ -382,6 +394,7 @@ async fn main() { chain_monitor.monitor_chain().await; // Wait until shutdown + lightning_api_task.await.unwrap(); http_api_task.await.unwrap(); private_api_task.await.unwrap(); public_api_task.await.unwrap(); diff --git a/teos/src/test_utils.rs b/teos/src/test_utils.rs index b7959fd3..c88c2801 100644 --- a/teos/src/test_utils.rs +++ b/teos/src/test_utils.rs @@ -8,6 +8,7 @@ */ use rand::Rng; +use std::net::SocketAddr; use std::sync::{Arc, Condvar, Mutex}; use std::thread; @@ -15,6 +16,8 @@ use jsonrpc_http_server::jsonrpc_core::error::ErrorCode as JsonRpcErrorCode; use jsonrpc_http_server::jsonrpc_core::{Error as JsonRpcError, IoHandler, Params, Value}; use jsonrpc_http_server::{CloseHandle, Server, ServerBuilder}; +use tonic::transport; + use bitcoincore_rpc::{Auth, Client as BitcoindClient}; use bitcoin::blockdata::block::{Block, BlockHeader}; @@ -46,6 +49,8 @@ use crate::dbm::DBM; use crate::extended_appointment::{ExtendedAppointment, UUID}; use crate::gatekeeper::{Gatekeeper, UserInfo}; use crate::protos as msgs; +use crate::protos::public_tower_services_client::PublicTowerServicesClient; +use crate::protos::public_tower_services_server::PublicTowerServicesServer; use crate::responder::{ConfirmationStatus, Responder, TransactionTracker}; use crate::rpc_errors; use crate::watcher::{Breach, Watcher}; @@ -530,6 +535,45 @@ impl Drop for BitcoindStopper { } } +pub(crate) async fn run_tower_in_background_with_config( + api_config: ApiConfig, +) -> (SocketAddr, Arc, BitcoindStopper) { + let (internal_rpc_api, bitcoind_stopper) = create_api_with_config(api_config).await; + let cloned = internal_rpc_api.clone(); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + transport::Server::builder() + .add_service(PublicTowerServicesServer::new(internal_rpc_api)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await + .unwrap(); + }); + + (addr, cloned, bitcoind_stopper) +} + +pub(crate) async fn run_tower_in_background() -> (SocketAddr, BitcoindStopper) { + let (sock_addr, _, bitcoind_stopper) = + run_tower_in_background_with_config(ApiConfig::default()).await; + + (sock_addr, bitcoind_stopper) +} + +pub(crate) async fn get_public_grpc_conn( + server_addr: SocketAddr, +) -> PublicTowerServicesClient { + PublicTowerServicesClient::connect(format!( + "http://{}:{}", + server_addr.ip(), + server_addr.port() + )) + .await + .unwrap() +} + pub(crate) struct BitcoindMock { pub url: String, pub server: Server, diff --git a/teos/src/watcher.rs b/teos/src/watcher.rs index a9672a12..a8f9a7a5 100644 --- a/teos/src/watcher.rs +++ b/teos/src/watcher.rs @@ -809,6 +809,10 @@ mod tests { self.responder .add_random_tracker(uuid, ConfirmationStatus::ConfirmedIn(100)) } + + pub(crate) fn get_signing_key(&self) -> SecretKey { + self.signing_key + } } async fn init_watcher(chain: &mut Blockchain) -> (Watcher, BitcoindStopper) {