Skip to content

Commit

Permalink
Merged in master
Browse files Browse the repository at this point in the history
  • Loading branch information
dnwiebe committed Jul 27, 2022
2 parents 6195129 + b487dc7 commit 74a5586
Show file tree
Hide file tree
Showing 42 changed files with 867 additions and 452 deletions.
1 change: 0 additions & 1 deletion masq/tests/startup_shutdown_tests_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ fn masq_terminates_based_on_loss_of_connection_to_the_daemon_integration() {
);
}

#[ignore]
#[test]
fn handles_startup_and_shutdown_integration() {
let dir_path = ensure_node_home_directory_exists(
Expand Down
1 change: 1 addition & 0 deletions multinode_integration_tests/ci/all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ popd

pushd "$CI_DIR/.."
export RUSTFLAGS="-D warnings -Anon-snake-case"
ci/lint.sh
cargo test --release -- --nocapture --test-threads=1
popd
15 changes: 12 additions & 3 deletions multinode_integration_tests/src/big_data_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
use crossbeam_channel::{unbounded, Sender};
use std::io::Read;
use std::io::Write;
use std::net::{SocketAddr, TcpListener};
use std::net::{SocketAddr, TcpListener, ToSocketAddrs};
use std::thread;
use std::time::Duration;

const CHUNK_SIZE: usize = 131072;

pub struct BigDataServer {
tx: Sender<()>,
local_addr: SocketAddr,
}

impl Drop for BigDataServer {
Expand All @@ -20,8 +21,12 @@ impl Drop for BigDataServer {
}

impl BigDataServer {
pub fn start(socket_addr: SocketAddr, size: usize) -> BigDataServer {
pub fn start(
socket_addr: &dyn ToSocketAddrs<Iter = std::vec::IntoIter<SocketAddr>>,
size: usize,
) -> BigDataServer {
let listener = TcpListener::bind(socket_addr).unwrap();
let local_addr = listener.local_addr().unwrap();
let (tx, rx) = unbounded();
thread::spawn(move || {
let mut buf = [0u8; CHUNK_SIZE];
Expand Down Expand Up @@ -83,7 +88,11 @@ impl BigDataServer {
}
});
thread::sleep(Duration::from_secs(1));
BigDataServer { tx }
BigDataServer { tx, local_addr }
}

pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

fn make_header(size: usize) -> Vec<u8> {
Expand Down
11 changes: 7 additions & 4 deletions multinode_integration_tests/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@

use crate::command::Command;
use crate::masq_node::MASQNodeUtils;
use crate::utils::UrlHolder;
use node_lib::test_utils;
use std::net::{IpAddr, Ipv4Addr};

pub struct BlockchainServer<'a> {
pub name: &'a str,
}

impl<'a> UrlHolder for BlockchainServer<'a> {
fn url(&self) -> String {
format!("http://{}:18545", self.ip().unwrap().trim())
}
}

impl<'a> BlockchainServer<'a> {
pub fn start(&self) {
MASQNodeUtils::clean_up_existing_container(self.name);
Expand Down Expand Up @@ -42,10 +49,6 @@ impl<'a> BlockchainServer<'a> {
command.stdout_or_stderr()
}

pub fn service_url(&self) -> String {
format!("http://{}:18545", self.ip().unwrap().trim())
}

pub fn wait_until_ready(&self) {
test_utils::wait_for(Some(500), Some(10000), || {
let mut cmd = Command::new("docker", Command::strings(vec!["logs", "ganache-cli"]));
Expand Down
19 changes: 14 additions & 5 deletions multinode_integration_tests/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ impl Command {
}
}

pub fn stdout_and_stderr(&mut self) -> String {
let exit_code = self.wait_for_exit();
self.combine_exit_code_stdout_and_stderr(exit_code)
}

fn combine_exit_code_stdout_and_stderr(&self, exit_code: i32) -> String {
format!(
"EXIT CODE: {}\nSTDOUT:\n{}\n\nSTDERR:\n{}\n\n",
exit_code,
self.stdout_as_string(),
self.stderr_as_string()
)
}

fn diagnosis(&self) -> String {
let stdout = self.stdout_as_string();
let stderr = self.stderr_as_string();
Expand All @@ -54,11 +68,6 @@ impl Command {
}
}

pub fn stdout_and_stderr(&mut self) -> String {
self.wait_for_exit();
self.stdout_as_string() + self.stderr_as_string().as_str()
}

pub fn stdout_as_string(&self) -> String {
let text = String::from_utf8(self.output.as_ref().unwrap().stdout.clone()).unwrap();
println!("{}", Self::truncate_long_string(text.clone()));
Expand Down
4 changes: 2 additions & 2 deletions multinode_integration_tests/src/masq_mock_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl MASQNode for MASQMockNode {
}

fn rate_pack(&self) -> RatePack {
self.guts.rate_pack.clone()
self.guts.rate_pack
}

fn chain(&self) -> Chain {
Expand Down Expand Up @@ -185,7 +185,7 @@ impl MASQMockNode {
node_addr,
earning_wallet,
consuming_wallet,
rate_pack: DEFAULT_RATE_PACK.clone(),
rate_pack: DEFAULT_RATE_PACK,
cryptde_enum,
framer,
chain: TEST_DEFAULT_MULTINODE_CHAIN,
Expand Down
6 changes: 4 additions & 2 deletions multinode_integration_tests/src/masq_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ pub struct MASQNodeUtils {}

impl MASQNodeUtils {
pub fn clean_up_existing_container(name: &str) {
let mut command = Command::new("docker", Command::strings(vec!["rm", name]));
command.wait_for_exit(); // success, failure, don't care
let mut command = Command::new("docker", Command::strings(vec!["stop", "-t", "0", name]));
command.stdout_and_stderr(); // success, failure, don't care
let mut command = Command::new("docker", Command::strings(vec!["rm", "-f", name]));
command.stdout_and_stderr(); // success, failure, don't care
}

pub fn find_project_root() -> String {
Expand Down
73 changes: 55 additions & 18 deletions multinode_integration_tests/src/masq_node_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use node_lib::sub_lib::cryptde::PublicKey;
use std::collections::HashMap;
use std::collections::HashSet;
use std::env;
use std::net::{IpAddr, Ipv4Addr};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, ToSocketAddrs};

pub struct MASQNodeCluster {
startup_configs: HashMap<(String, usize), NodeStartupConfig>,
Expand All @@ -23,6 +23,7 @@ pub struct MASQNodeCluster {

impl MASQNodeCluster {
pub fn start() -> Result<MASQNodeCluster, String> {
MASQNodeCluster::docker_version()?;
MASQNodeCluster::cleanup()?;
MASQNodeCluster::create_network()?;
let host_node_parent_dir = match env::var("HOST_NODE_PARENT_DIR") {
Expand All @@ -42,14 +43,6 @@ impl MASQNodeCluster {
})
}

pub fn host_ip_addr() -> IpAddr {
if Self::is_in_jenkins() {
IpAddr::V4(Ipv4Addr::new(172, 18, 0, 2))
} else {
IpAddr::V4(Ipv4Addr::new(172, 18, 0, 1))
}
}

pub fn next_index(&self) -> usize {
self.next_index
}
Expand All @@ -76,7 +69,7 @@ impl MASQNodeCluster {

pub fn start_named_real_node(
&mut self,
name: String,
name: &str,
index: usize,
config: NodeStartupConfig,
) -> MASQRealNode {
Expand Down Expand Up @@ -244,14 +237,7 @@ impl MASQNodeCluster {
}

fn remove_network_if_running() -> Result<(), String> {
let mut command = Command::new("docker", Command::strings(vec!["network", "ls"]));
if command.wait_for_exit() != 0 {
return Err(format!(
"Could not list networks: {}",
command.stderr_as_string()
));
}
let output = command.stdout_as_string();
let output = Self::list_network()?;
if !output.contains("integration_net") {
return Ok(());
}
Expand All @@ -261,13 +247,41 @@ impl MASQNodeCluster {
);
match command.wait_for_exit() {
0 => Ok(()),
_ if command
.stderr_as_string()
.starts_with("Error: No such network: integration_net") =>
{
Ok(())
}
_ => Err(format!(
"Could not remove network integration_net: {}",
command.stderr_as_string()
)),
}
}

fn docker_version() -> Result<String, String> {
let mut command = Command::new("docker", Command::strings(vec!["--version"]));
if command.wait_for_exit() != 0 {
return Err(format!(
"Could not get Docker version: {}",
command.stderr_as_string()
));
}
Ok(command.stdout_as_string())
}

fn list_network() -> Result<String, String> {
let mut command = Command::new("docker", Command::strings(vec!["network", "ls"]));
if command.wait_for_exit() != 0 {
return Err(format!(
"Could not list networks: {}",
command.stderr_as_string()
));
}
Ok(command.stdout_as_string())
}

fn create_network() -> Result<(), String> {
let mut command = Command::new(
"docker",
Expand Down Expand Up @@ -301,3 +315,26 @@ impl MASQNodeCluster {
}
}
}

pub struct DockerHostSocketAddr {
socket_addrs: Vec<SocketAddr>,
}

impl ToSocketAddrs for DockerHostSocketAddr {
type Iter = std::vec::IntoIter<SocketAddr>;

fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
Ok(self.socket_addrs.clone().into_iter())
}
}

impl DockerHostSocketAddr {
pub fn new(port: u16) -> Self {
Self {
socket_addrs: vec![
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(172, 18, 0, 2), port)),
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(172, 18, 0, 1), port)),
],
}
}
}
12 changes: 6 additions & 6 deletions multinode_integration_tests/src/masq_node_server.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved.

use crate::masq_node_cluster::MASQNodeCluster;
use crate::masq_node_cluster::DockerHostSocketAddr;
use crate::utils;
use std::io;
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::time::Duration;

pub struct MASQNodeServer {
socket_addr: SocketAddr,
local_addr: SocketAddr,
listener: TcpListener,
stream_opt: Option<TcpStream>,
}

impl MASQNodeServer {
pub fn new(port: u16) -> MASQNodeServer {
let socket_addr = SocketAddr::new(MASQNodeCluster::host_ip_addr(), port);
let socket_addr = DockerHostSocketAddr::new(port);
let listener = TcpListener::bind(socket_addr).unwrap();
MASQNodeServer {
socket_addr,
local_addr: listener.local_addr().unwrap(),
listener,
stream_opt: None,
}
}

pub fn socket_addr(&self) -> SocketAddr {
self.socket_addr
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

pub fn send_chunk(&mut self, chunk: &[u8]) {
Expand Down
8 changes: 4 additions & 4 deletions multinode_integration_tests/src/masq_node_ui_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ impl MASQNodeUIClient {
}

pub fn wait_for_response(&self, context_id: u64, timeout: Duration) -> MessageBody {
return self.buffered_or_incoming(MessagePath::Conversation(context_id), timeout);
self.buffered_or_incoming(MessagePath::Conversation(context_id), timeout)
}

pub fn wait_for_broadcast(&self, timeout: Duration) -> MessageBody {
return self.buffered_or_incoming(MessagePath::FireAndForget, timeout);
self.buffered_or_incoming(MessagePath::FireAndForget, timeout)
}

fn buffered_or_incoming(&self, path: MessagePath, timeout: Duration) -> MessageBody {
if let Some(target) = self.check_for_buffered_message(path) {
return target;
}
return self.wait_for_message(path, timeout);
self.wait_for_message(path, timeout)
}

fn wait_for_message(&self, path: MessagePath, timeout: Duration) -> MessageBody {
Expand Down Expand Up @@ -106,7 +106,7 @@ impl MASQNodeUIClient {
}
});
inner.buffer = new_buffer;
return target_opt;
target_opt
}

fn check_for_waiting_message(
Expand Down
Loading

0 comments on commit 74a5586

Please sign in to comment.