Skip to content

Commit

Permalink
tests: Add test utils for peer manager testing
Browse files Browse the repository at this point in the history
These test utils include:
- A test lightning client: this simulates a client's peer manager that is trying to connect to the tower and send/receive tower messages. For a real client, this should be part of a bigger application (say a lightning node) and the custom message handler adds the capabilities of speaking with the tower over Lightning.
- Some utils to launch a Lightning server and connect one peer manager to another
- A simple proof of concept test
  • Loading branch information
mariocynicys committed Feb 5, 2023
1 parent 34cf636 commit b361508
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions teos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ teos-common = { path = "../teos-common" }
tonic-build = "0.6"

[dev-dependencies]
futures = "0.3.21"
jsonrpc-http-server = "17.1.0"
rand = "0.8.4"
tempdir = "0.3.7"
Expand Down
247 changes: 247 additions & 0 deletions teos/src/api/lightning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ pub struct Logger;

impl LightningLogger for Logger {
fn log(&self, record: &Record) {
#[cfg(test)]
// Pass "-- --nocapture" flag to "cargo test" for this println to appear.
println!(
// "\x1B" stuff are terminal colors. Might not work in some terminals though.
"\x1B[42m{}\x1B[0m [\x1B[33m{}:{}\x1B[0m]: {}",
record.level, record.module_path, record.line, record.args
);
#[cfg(not(test))]
match record.level {
Level::Error => log::error!(target: record.module_path, "{}", record.args),
Level::Warn => log::warn!(target: record.module_path, "{}", record.args),
Expand Down Expand Up @@ -279,15 +287,119 @@ pub async fn serve(
}
}

#[cfg(test)]
mod test_lightning_client {
use super::*;

use std::collections::VecDeque;

pub(crate) type TestClientPeerManager = PeerManager<
SocketDescriptor,
Arc<ErroringMessageHandler>, // No channel message handler
Arc<IgnoringMessageHandler>, // No routing message handler
Arc<Logger>,
Arc<TestClientMessageHandler>, // Using our custom message handler
>;

pub(crate) struct TestClientMessageHandler {
msg_queue: Mutex<Vec<(PublicKey, TowerMessage)>>,
// A vector we store the received messages in to test whether the tower sent correct responses or not.
received_msgs: Mutex<VecDeque<TowerMessage>>,
}

impl TestClientMessageHandler {
pub(crate) fn new() -> Self {
Self {
msg_queue: Mutex::new(Vec::new()),
received_msgs: Mutex::new(VecDeque::new()),
}
}

/// Sends a tower message to `peer`.
/// This works by pushing the message to a pending messages queue and notifying the passed
/// `peer_manager` that there are some events to process.
///
/// You should only pass the peer manager that is holding a reference of this `TestClientMessageHandler`
/// (`self`) as a custom message handler and not any other peer manager.
pub(crate) fn send_msg(
&self,
peer_manager: &TestClientPeerManager,
msg: TowerMessage,
peer: &PublicKey,
) {
self.msg_queue.lock().unwrap().push((*peer, msg));
// Let the peer manager process our pending message.
peer_manager.process_events();
// The message queue must be empty after the peer manager has processed events.
assert!(self.msg_queue.lock().unwrap().is_empty());
}

pub(crate) fn received_msgs_count(&self) -> usize {
self.received_msgs.lock().unwrap().len()
}

pub(crate) fn pop_oldest_received_msg(&self) -> TowerMessage {
self.received_msgs.lock().unwrap().pop_front().unwrap()
}
}

impl CustomMessageReader for TestClientMessageHandler {
type CustomMessage = TowerMessage;

fn read<R: io::Read>(
&self,
message_type: u16,
buffer: &mut R,
) -> Result<Option<TowerMessage>, DecodeError> {
match message_type {
Register::TYPE => Ok(Some(Register::read(buffer)?.into())), // A real client shouldn't have this
SubscriptionDetails::TYPE => Ok(Some(SubscriptionDetails::read(buffer)?.into())),
AddUpdateAppointment::TYPE => Ok(Some(AddUpdateAppointment::read(buffer)?.into())), // ,this
AppointmentAccepted::TYPE => Ok(Some(AppointmentAccepted::read(buffer)?.into())),
AppointmentRejected::TYPE => Ok(Some(AppointmentRejected::read(buffer)?.into())),
GetAppointment::TYPE => Ok(Some(GetAppointment::read(buffer)?.into())), // ,this
AppointmentData::TYPE => Ok(Some(AppointmentData::read(buffer)?.into())),
TrackerData::TYPE => Ok(Some(TrackerData::read(buffer)?.into())),
AppointmentNotFound::TYPE => Ok(Some(AppointmentNotFound::read(buffer)?.into())),
GetSubscriptionInfo::TYPE => Ok(Some(GetSubscriptionInfo::read(buffer)?.into())), // ,and this.
SubscriptionInfo::TYPE => Ok(Some(SubscriptionInfo::read(buffer)?.into())),
// Unknown message.
_ => Ok(None),
}
}
}

impl CustomMessageHandler for TestClientMessageHandler {
fn handle_custom_message(
&self,
msg: TowerMessage,
_sender_node_id: &PublicKey,
) -> Result<(), LightningError> {
self.received_msgs.lock().unwrap().push_back(msg);
Ok(())
}

fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, TowerMessage)> {
mem::take(&mut self.msg_queue.lock().unwrap())
}
}
}

#[cfg(test)]
mod test_helpers {
use super::test_lightning_client::*;
use super::*;

use bitcoin::secp256k1::Secp256k1;
use teos_common::cryptography::get_random_keypair;

use crate::api::internal::InternalAPI;
use crate::test_utils::{
get_public_grpc_conn, run_tower_in_background_with_config, ApiConfig, BitcoindStopper,
};

pub(crate) const WAIT_DURATION: tokio::time::Duration = tokio::time::Duration::from_millis(10);

pub(crate) async fn get_tower_message_handler_with_config(
conf: ApiConfig,
) -> (Arc<TowerMessageHandler>, Arc<InternalAPI>, BitcoindStopper) {
Expand Down Expand Up @@ -318,6 +430,105 @@ mod test_helpers {
.await
.unwrap()
}

/// Spawns a tower and a Lightning server that accepts tower messages.
/// Note that the server might not be fully booted up after this function returns.
pub(crate) async fn run_lightning_tower_with_config(
conf: ApiConfig,
) -> (SocketAddr, PublicKey, BitcoindStopper) {
let (server_addr, internal_api, bitcoind_stopper) =
run_tower_in_background_with_config(conf).await;
let lightning_bind = {
let unused_port = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
unused_port.local_addr().unwrap()
};
let grpc_bind = format!("http://{}:{}", server_addr.ip(), server_addr.port());
let (_, shutdown_signal) = triggered::trigger();
let tower_sk = internal_api.get_watcher().get_signing_key();
// To make the tests simple, we won't let the testers await on the task or hand them shutdown triggers.
let _ = tokio::task::spawn(serve(lightning_bind, grpc_bind, shutdown_signal, tower_sk));
(
lightning_bind,
PublicKey::from_secret_key(&Secp256k1::new(), &tower_sk),
bitcoind_stopper,
)
}

pub(crate) async fn run_lightning_tower() -> (SocketAddr, PublicKey, BitcoindStopper) {
run_lightning_tower_with_config(ApiConfig::default()).await
}

pub(crate) fn get_test_client_peer_manager() -> (
Arc<TestClientPeerManager>,
Arc<TestClientMessageHandler>,
PublicKey,
) {
let client_message_handler = Arc::new(TestClientMessageHandler::new());
let (client_sk, client_pk) = get_random_keypair();
let ephemeral_bytes: [u8; 32] = get_random_bytes(32).try_into().unwrap();
(
Arc::new(TestClientPeerManager::new(
MessageHandler {
chan_handler: Arc::new(ErroringMessageHandler::new()),
route_handler: Arc::new(IgnoringMessageHandler {}),
},
client_sk,
&ephemeral_bytes,
Arc::new(Logger),
client_message_handler.clone(),
)),
client_message_handler,
client_pk,
)
}

/// Connects `client_peer_manager` to another peer manager at `tower_addr`.
/// It keeps trying indefinitely till a connection is successful.
pub(crate) async fn connect_to_tower(
client_peer_manager: Arc<TestClientPeerManager>,
tower_addr: SocketAddr,
tower_pk: PublicKey,
) {
// From https://lightningdevkit.org/payments/connecting_peers/
loop {
match lightning_net_tokio::connect_outbound(
client_peer_manager.clone(),
tower_pk,
tower_addr,
)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
loop {
// Make sure the connection is still established.
match futures::poll!(&mut connection_closed_future) {
std::task::Poll::Ready(_) => {
panic!(
"{}@{} disconnected before handshake completed",
tower_pk, tower_addr
);
}
std::task::Poll::Pending => {}
}
// Wait for the handshake to complete.
match client_peer_manager
.get_peer_node_ids()
.iter()
.find(|id| **id == tower_pk)
{
Some(_) => return,
None => tokio::time::sleep(WAIT_DURATION).await,
}
}
}
None => {
// The server takes some time to boot up. Let's wait a little bit.
tokio::time::sleep(WAIT_DURATION).await;
}
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -711,3 +922,39 @@ mod message_handler_tests {
));
}
}

#[cfg(test)]
mod peer_manager_tests {
use super::test_helpers::*;
use super::*;

use teos_common::UserId;

// Needs to be "multi_thread" because we "block_in_place" without using "spawn_blocking".
#[tokio::test(flavor = "multi_thread")]
async fn simple_test() {
let (tower_addr, tower_pk, _s) = run_lightning_tower().await;
let (client_peer_manager, client_messenger, client_pk) = get_test_client_peer_manager();
connect_to_tower(client_peer_manager.clone(), tower_addr, tower_pk).await;

let msg = Register {
pubkey: UserId(client_pk),
appointment_slots: 8778,
subscription_period: 6726,
}
.into();

// Send the register message to the tower.
client_messenger.send_msg(&client_peer_manager, msg, &tower_pk);
// And wait till we get a response.
while client_messenger.received_msgs_count() != 1 {
tokio::time::sleep(WAIT_DURATION).await;
}

let received_msg = client_messenger.pop_oldest_received_msg();
assert!(matches!(
received_msg,
TowerMessage::SubscriptionDetails(SubscriptionDetails { .. })
));
}
}

0 comments on commit b361508

Please sign in to comment.