From ed587d2e6111397b5f66abc025330b82b659e62a Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sat, 23 Mar 2024 11:54:58 -0700 Subject: [PATCH] Add unit test cases for server.rs Signed-off-by: Chanhee Lee --- rust/rti/src/net_common.rs | 2 +- rust/rti/src/server.rs | 723 ++++++++++++++++++++++++++++++++++++- rust/rti/src/tag.rs | 8 +- 3 files changed, 719 insertions(+), 14 deletions(-) diff --git a/rust/rti/src/net_common.rs b/rust/rti/src/net_common.rs index d33c8d1..b45949c 100644 --- a/rust/rti/src/net_common.rs +++ b/rust/rti/src/net_common.rs @@ -195,7 +195,7 @@ impl ErrType { } } -#[derive(PartialEq, Clone)] +#[derive(PartialEq, Clone, Debug)] pub enum SocketType { TCP, UDP, diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 8938acb..22244c4 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -58,6 +58,10 @@ pub struct Server { } impl Server { + pub fn port(&self) -> String { + self.port.clone() + } + pub fn create_rti_server( rti_remote: &mut RTIRemote, port: u16, @@ -98,9 +102,9 @@ impl Server { } pub fn wait_for_federates(&mut self, _f_rti: RTIRemote) { - println!("Server listening on port {}", self.port); + println!("Server listening on port {}", self.port()); let mut address = String::from("0.0.0.0:"); - address.push_str(self.port.as_str()); + address.push_str(self.port().as_str()); // TODO: Handle unwrap() properly. let socket = TcpListener::bind(address).unwrap(); let start_time = Arc::new(Mutex::new(StartTime::new())); @@ -247,7 +251,6 @@ impl Server { ); match MsgType::to_msg_type(buffer[0]) { MsgType::Timestamp => Self::handle_timestamp( - // &buffer, fed_id.try_into().unwrap(), &mut stream, cloned_rti.clone(), @@ -777,10 +780,7 @@ impl Server { std::process::exit(1); } } - // Close the socket. - stream - .shutdown(Shutdown::Both) - .expect("shutdown call failed"); + // TODO: Close the stream or handle process termination. } fn receive_connection_information( @@ -1032,6 +1032,7 @@ impl Server { let bytes_read = NetUtil::read_from_socket(stream, &mut buffer, fed_id); if bytes_read < mem::size_of::() { println!("ERROR reading timestamp from federate_info {}.", fed_id); + std::process::exit(1); } // FIXME: Check whether swap_bytes_if_big_endian_int64() is implemented correctly @@ -2384,3 +2385,711 @@ impl Server { ); } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::initialize_federates; + use crate::initialize_rti; + use crate::process_args; + use crate::start_rti_server; + use crate::STARTING_PORT; + + use socket_server_mocker::server_mocker::ServerMocker; + use socket_server_mocker::server_mocker_instruction::{ + ServerMockerInstruction, ServerMockerInstructionsList, + }; + use socket_server_mocker::tcp_server_mocker::TcpServerMocker; + + use rand::{rngs::StdRng, RngCore, SeedableRng}; + + const FED_ID: u16 = 0; + const I32_SIZE: usize = mem::size_of::(); + const I64_SIZE: usize = mem::size_of::(); + const U16_SIZE: usize = mem::size_of::(); + const U32_SIZE: usize = mem::size_of::(); + const LOCAL_HOST: &str = "127.0.0.1"; + + #[test] + fn test_port_positive() { + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + let server = start_rti_server(&mut rti); + assert_eq!( + server.expect("Failed to start a server.").port(), + STARTING_PORT.to_string() + ); + } + + #[test] + fn test_create_rti_server_tcp_type_positive() { + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + let _ = Server::create_rti_server(&mut rti, STARTING_PORT, SocketType::TCP); + } + + #[test] + fn test_create_rti_server_udp_type_positive() { + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + let _ = Server::create_rti_server(&mut rti, STARTING_PORT, SocketType::UDP); + } + + #[test] + fn test_socket_type_tcp_positive() { + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + let server = Server::create_rti_server(&mut rti, STARTING_PORT, SocketType::TCP); + assert_eq!(server.socket_type(), SocketType::TCP); + } + + #[test] + fn test_socket_type_udp_positive() { + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + let server = Server::create_rti_server(&mut rti, STARTING_PORT, SocketType::UDP); + assert_eq!(server.socket_type(), SocketType::UDP); + } + + #[test] + fn test_receive_and_check_fed_id_message_positive() { + let port_num = 7000; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let mut msg = generate_random_bytes(1 + U16_SIZE + 1); + msg[0] = MsgType::FedIds.to_byte(); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let server = start_rti_server(&mut rti); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + server + .expect("Failed to receive_and_check_fed_id_message().") + .receive_and_check_fed_id_message(&mut stream, cloned_rti.clone()); + } + + #[test] + fn test_receive_and_check_fed_id_message_wrong_msg_type_negative() { + let port_num = 7001; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let mut msg = generate_random_bytes(1 + U16_SIZE + 1); + msg[0] = MsgType::FedIds.to_byte() + 1; + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let server = start_rti_server(&mut rti); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + server + .expect("Failed to receive_and_check_fed_id_message().") + .receive_and_check_fed_id_message(&mut stream, cloned_rti.clone()); + } + + #[test] + fn test_receive_connection_information_positive() { + let port_num = 8000; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let connection_info_header = + generate_random_bytes(MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE.try_into().unwrap()); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(connection_info_header)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let server = start_rti_server(&mut rti); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + server + .expect("Failed to receive_connection_information().") + .receive_connection_information(FED_ID, &mut stream, cloned_rti.clone()); + } + + #[test] + fn test_receive_udp_message_and_set_up_clock_sync_positive() { + let port_num = 9000; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let response = generate_random_bytes(U16_SIZE); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(response)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let server = start_rti_server(&mut rti); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + server + .expect("Failed to receive_udp_message_and_set_up_clock_sync().") + .receive_udp_message_and_set_up_clock_sync(FED_ID, &mut stream, cloned_rti.clone()); + } + + #[test] + fn test_handle_timestamp_positive() { + let port_num = 10000; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let timestamp = generate_random_bytes(I64_SIZE); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(timestamp)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + rti.base_mut().set_number_of_scheduling_nodes(0); // To pass the logic to wait all federates. + let start_time = Arc::new(Mutex::new(StartTime::new())); + let received_start_times = Arc::new((Mutex::new(false), Condvar::new())); + let sent_start_time = Arc::new((Mutex::new(false), Condvar::new())); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + let cloned_received_start_times = Arc::clone(&received_start_times); + let cloned_sent_start_time = Arc::clone(&sent_start_time); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_timestamp( + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + cloned_received_start_times.clone(), + cloned_sent_start_time.clone(), + ); + } + + fn generate_random_bytes(buffer_size: usize) -> Vec { + let seed = [0u8; 32]; + let mut rng: StdRng = SeedableRng::from_seed(seed); + let mut bytes = vec![0 as u8; buffer_size]; + rng.fill_bytes(&mut bytes); + bytes.to_vec() + } + + fn get_rti_arguments() -> Vec { + let mut args: Vec = Vec::new(); + args.push(String::from("target/debug/rti")); + args.push(String::from("-n")); + args.push(String::from("2")); + args + } + + #[test] + fn test_handle_federate_resign_positive() { + let port_num = 10010; + let _tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let stream = TcpStream::connect(ip_address).unwrap(); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let start_time = Arc::new(Mutex::new(StartTime::new())); + let sent_start_time = Arc::new((Mutex::new(false), Condvar::new())); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + let cloned_sent_start_time = Arc::clone(&sent_start_time); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_federate_resign( + FED_ID, + cloned_rti.clone(), + cloned_start_time.clone(), + cloned_sent_start_time.clone(), + ); + } + + #[test] + fn test_handle_timed_message_positive() { + let port_num = 10020; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let mut timed_header = + generate_random_bytes(U16_SIZE + U16_SIZE + I32_SIZE + I64_SIZE + U32_SIZE); + // Set federate_id as 0 to avoid out of bound exception. + timed_header[2] = 0; + timed_header[3] = 0; + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(timed_header)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let start_time = Arc::new(Mutex::new(StartTime::new())); + let sent_start_time = Arc::new((Mutex::new(false), Condvar::new())); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + let cloned_sent_start_time = Arc::clone(&sent_start_time); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_timed_message( + MsgType::TaggedMessage.to_byte(), + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + cloned_sent_start_time.clone(), + ); + } + + #[test] + fn test_handle_next_event_tag_positive() { + let port_num = 10030; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let msg = generate_random_bytes(I64_SIZE + U32_SIZE); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let mut zero_start_time = StartTime::new(); + zero_start_time.set_start_time(0); + let start_time = Arc::new(Mutex::new(zero_start_time)); + let sent_start_time = Arc::new((Mutex::new(false), Condvar::new())); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + let cloned_sent_start_time = Arc::clone(&sent_start_time); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_next_event_tag( + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + cloned_sent_start_time.clone(), + ); + } + + #[test] + fn test_handle_latest_tag_complete_positive() { + let port_num = 10040; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let msg = generate_random_bytes(I64_SIZE + U32_SIZE); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let mut zero_start_time = StartTime::new(); + zero_start_time.set_start_time(0); + let start_time = Arc::new(Mutex::new(zero_start_time)); + let sent_start_time = Arc::new((Mutex::new(false), Condvar::new())); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + let cloned_sent_start_time = Arc::clone(&sent_start_time); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_latest_tag_complete( + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + cloned_sent_start_time.clone(), + ); + } + + #[test] + fn test_handle_stop_request_message_positive() { + let port_num = 10050; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let msg = generate_random_bytes(I64_SIZE + U32_SIZE); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let mut zero_start_time = StartTime::new(); + zero_start_time.set_start_time(0); + let start_time = Arc::new(Mutex::new(zero_start_time)); + let stop_granted = Arc::new(Mutex::new(StopGranted::new())); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + let cloned_stop_granted = Arc::clone(&stop_granted); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_stop_request_message( + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + cloned_stop_granted.clone(), + ); + } + + #[test] + fn test_handle_stop_request_reply_positive() { + let port_num = 10060; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let msg = generate_random_bytes(I64_SIZE + U32_SIZE); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let mut zero_start_time = StartTime::new(); + zero_start_time.set_start_time(0); + let start_time = Arc::new(Mutex::new(zero_start_time)); + let stop_granted = Arc::new(Mutex::new(StopGranted::new())); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + let cloned_stop_granted = Arc::clone(&stop_granted); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_stop_request_reply( + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + cloned_stop_granted.clone(), + ); + } + + #[test] + fn test_handle_address_query_positive() { + let port_num = 10070; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let mut msg = vec![0 as u8; U16_SIZE]; + msg[0] = 0; + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg.to_vec())].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let mut zero_start_time = StartTime::new(); + zero_start_time.set_start_time(0); + let start_time = Arc::new(Mutex::new(zero_start_time)); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_address_query( + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + ); + } + + #[test] + fn test_handle_address_ad_positive() { + let port_num = 10080; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + let msg = generate_random_bytes(U16_SIZE); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let mut zero_start_time = StartTime::new(); + zero_start_time.set_start_time(0); + let start_time = Arc::new(Mutex::new(zero_start_time)); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + Server::handle_address_ad( + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + ); + } + + #[test] + fn test_handle_port_absent_message_positive() { + let port_num = 10090; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + // TODO: Generate tag information also. + let msg = generate_random_bytes(U16_SIZE); + let _ = tcp_server_mocker.add_mock_instructions_list( + ServerMockerInstructionsList::new_with_instructions( + [ServerMockerInstruction::SendMessage(msg)].as_slice(), + ), + ); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let mut zero_start_time = StartTime::new(); + zero_start_time.set_start_time(0); + let start_time = Arc::new(Mutex::new(zero_start_time)); + let sent_start_time = Arc::new((Mutex::new(false), Condvar::new())); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + let cloned_start_time = Arc::clone(&start_time); + let cloned_sent_start_time = Arc::clone(&sent_start_time); + { + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[0]; + fed.set_stream( + stream + .try_clone() + .expect("Fail to clone a mocked TCP stream."), + ); + } + + let mut buffer = vec![0 as u8; 1]; + buffer[0] = MsgType::PortAbsent.to_byte(); + Server::handle_port_absent_message( + &buffer, + FED_ID, + &mut stream, + cloned_rti.clone(), + cloned_start_time.clone(), + cloned_sent_start_time.clone(), + ); + } + + #[test] + fn test_handle_physical_clock_sync_message_with_udp_positive() { + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + + Server::handle_physical_clock_sync_message_with_udp(FED_ID, cloned_rti.clone()); + } + + #[test] + fn test_handle_physical_clock_sync_message_with_tcp_positive() { + let port_num = 10092; + let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap(); + let mut ip_address = LOCAL_HOST.to_owned(); + ip_address.push_str(":"); + ip_address.push_str(&port_num.to_string()); + let mut stream = TcpStream::connect(ip_address).unwrap(); + + let mut rti = initialize_rti(); + let _ = process_args(&mut rti, &get_rti_arguments()); + initialize_federates(&mut rti); + let arc_rti = Arc::new(RwLock::new(rti)); + let cloned_rti = Arc::clone(&arc_rti); + + Server::handle_physical_clock_sync_message_with_tcp( + FED_ID, + cloned_rti.clone(), + &mut stream, + ); + } +} diff --git a/rust/rti/src/tag.rs b/rust/rti/src/tag.rs index 5939d74..0b17bfd 100644 --- a/rust/rti/src/tag.rs +++ b/rust/rti/src/tag.rs @@ -326,10 +326,8 @@ mod tests { let mut r1 = Tag::new(0, 0); let mut r2 = Tag::new(0, 0); let mut r3 = Tag::new(0, 0); - let mut r4 = Tag::new(0, 0); let fv_tag = Tag::forever_tag(); - let nv_tag = Tag::never_tag(); let zr_tag = Tag::zero_tag(); assert_eq!(t1, Tag::lf_delay_tag(&t1, int_1)); @@ -380,20 +378,18 @@ mod tests { #[test] fn test_time() { - let mut time1: i64 = 67; let tag1 = Tag::new(18, 3); - time1 = tag1.time(); + let time1 = tag1.time(); assert_eq!(time1, tag1.time()); } #[test] fn test_microstep() { - let mut step1: u32 = 5; let tag1 = Tag::new(18, 3); - step1 = tag1.microstep(); + let step1 = tag1.microstep(); assert_eq!(step1, tag1.microstep()); }