Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable clock synchronization with federates #49

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion rust/rti/src/federate_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct FederateInfo {
// a federate when handling lf_request_stop().
// TODO: lf_thread_t thread_id;
stream: Option<TcpStream>, // The TCP socket descriptor for communicating with this federate.
// TODO: struct sockaddr_in UDP_addr;
udp_addr: SocketAddr,
clock_synchronization_enabled: bool, // Indicates the status of clock synchronization
// for this federate. Enabled by default.
in_transit_message_tags: InTransitMessageQueue, // Record of in-transit messages to this federate that are not
Expand All @@ -54,6 +54,7 @@ impl FederateInfo {
enclave: SchedulingNode::new(),
requested_stop: false,
stream: None::<TcpStream>,
udp_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
clock_synchronization_enabled: true,
in_transit_message_tags: InTransitMessageQueue::new(),
server_hostname: String::from("localhost"),
Expand All @@ -78,6 +79,14 @@ impl FederateInfo {
&self.stream
}

pub fn stream_mut(&mut self) -> &mut Option<TcpStream> {
&mut self.stream
}

pub fn udp_addr(&self) -> SocketAddr {
self.udp_addr.clone()
}

pub fn clock_synchronization_enabled(&self) -> bool {
self.clock_synchronization_enabled
}
Expand Down Expand Up @@ -110,6 +119,10 @@ impl FederateInfo {
self.stream = Some(stream);
}

pub fn set_udp_addr(&mut self, udp_addr: SocketAddr) {
self.udp_addr = udp_addr;
}

pub fn set_clock_synchronization_enabled(&mut self, clock_synchronization_enabled: bool) {
self.clock_synchronization_enabled = clock_synchronization_enabled;
}
Expand Down
111 changes: 88 additions & 23 deletions rust/rti/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::error::Error;

use crate::constants::*;
use crate::federate_info::*;
use crate::net_common::SocketType;
use crate::rti_common::*;
use crate::rti_remote::*;
use crate::trace::Trace;
Expand All @@ -29,7 +30,7 @@ use server::Server;

const RTI_TRACE_FILE_NAME: &str = "rti.lft";

#[derive(PartialEq, PartialOrd, Clone)]
#[derive(PartialEq, PartialOrd, Clone, Debug)]
pub enum ClockSyncStat {
ClockSyncOff,
ClockSyncInit,
Expand All @@ -49,18 +50,15 @@ impl ClockSyncStat {
pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static str> {
let mut idx = 1;
let argc = argv.len();
// println!("argv = {:?}", argv);
while idx < argc {
let arg = argv[idx].as_str();
// println!("arg = {}", arg); // TODO: Remove this debugging code
if arg == "-i" || arg == "--id" {
if argc < idx + 2 {
println!("--id needs a string argument.");
usage(argc, argv);
return Err("Fail to handle id option");
}
idx += 1;
// println!("idx = {}", idx); // TODO: Remove this debugging code
println!("RTI: Federation ID: {}", arg);
rti.set_federation_id(argv[idx].clone());
} else if arg == "-n" || arg == "--number_of_federates" {
Expand Down Expand Up @@ -125,7 +123,7 @@ pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static
return Err("Fail to handle clock_sync option");
}
idx += 1;
// TODO: idx += process_clock_sync_args();
idx += process_clock_sync_args(rti, argc - idx, &argv[idx..]);
} else if arg == "-t" || arg == "--tracing" {
rti.base_mut().set_tracing_enabled(true);
} else if arg == " " {
Expand Down Expand Up @@ -175,6 +173,84 @@ fn usage(argc: usize, argv: &[String]) {
}
}

/**
* Process command-line arguments related to clock synchronization. Will return
* the last read position of argv if all related arguments are parsed or an
* invalid argument is read.
*/
fn process_clock_sync_args(rti: &mut RTIRemote, argc: usize, argv: &[String]) -> usize {
for mut i in 0..argc {
let arg = argv[i].as_str();
if arg == "off" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOff);
println!("RTI: Clock sync: off");
} else if arg == "init" || arg == "initial" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncInit);
println!("RTI: Clock sync: init");
} else if arg == "on" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOn);
println!("RTI: Clock sync: on");
} else if arg == "period" {
if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn {
println!("[ERROR] clock sync period can only be set if --clock-sync is set to on.");
usage(argc, argv);
i += 1;
continue; // Try to parse the rest of the arguments as clock sync args.
} else if argc < i + 2 {
println!("[ERROR] clock sync period needs a time (in nanoseconds) argument.");
usage(argc, argv);
continue;
}
i += 1;
match argv[i].as_str().parse::<u64>() {
Ok(period_ns) => {
if period_ns == 0 || period_ns == u64::MAX {
println!("[ERROR] clock sync period value is invalid.");
continue; // Try to parse the rest of the arguments as clock sync args.
}
rti.set_clock_sync_period_ns(period_ns);
println!("RTI: Clock sync period: {}", rti.clock_sync_period_ns());
}
Err(_) => {
println!("Failed to parse clock sync period.");
}
}
} else if argv[i] == "exchanges-per-interval" {
if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn
&& rti.clock_sync_global_status() != ClockSyncStat::ClockSyncInit
{
println!("[ERROR] clock sync exchanges-per-interval can only be set if\n--clock-sync is set to on or init.");
usage(argc, argv);
continue; // Try to parse the rest of the arguments as clock sync args.
} else if argc < i + 2 {
println!("[ERROR] clock sync exchanges-per-interval needs an integer argument.");
usage(argc, argv);
continue; // Try to parse the rest of the arguments as clock sync args.
}
i += 1;
let exchanges: u32 = 10;
if exchanges == 0 || exchanges == u32::MAX || exchanges == u32::MIN {
println!("[ERROR] clock sync exchanges-per-interval value is invalid.");
continue; // Try to parse the rest of the arguments as clock sync args.
}
rti.set_clock_sync_exchanges_per_interval(exchanges); // FIXME: Loses numbers on 64-bit machines
println!(
"RTI: Clock sync exchanges per interval: {}",
rti.clock_sync_exchanges_per_interval()
);
} else if arg == " " {
// Tolerate spaces
continue;
} else {
// Either done with the clock sync args or there is an invalid
// character. In either case, let the parent function deal with
// the rest of the characters;
return i;
}
}
argc
}

pub fn initialize_federates(rti: &mut RTIRemote) {
if rti.base().tracing_enabled() {
let _lf_number_of_workers = rti.base().number_of_scheduling_nodes();
Expand Down Expand Up @@ -205,27 +281,16 @@ fn initialize_federate(fed: &mut FederateInfo, id: u16) {

pub fn start_rti_server(_f_rti: &mut RTIRemote) -> Result<Server, Box<dyn Error>> {
// TODO: _lf_initialize_clock();
Ok(Server::create_server(
_f_rti.user_specified_port().to_string(),
))
let server = Server::create_rti_server(_f_rti, _f_rti.user_specified_port(), SocketType::TCP);
if _f_rti.clock_sync_global_status() >= ClockSyncStat::ClockSyncOn {
let final_tcp_port = u16::from(_f_rti.final_port_tcp());
Server::create_rti_server(_f_rti, final_tcp_port + 1, SocketType::UDP);
}
Ok(server)
}

/**
* Process command-line arguments related to clock synchronization. Will return
* the last read position of argv if all related arguments are parsed or an
* invalid argument is read.
*
* @param argc: Number of arguments in the list
* @param argv: The list of arguments as a string
* @return Current position (head) of argv;
*/
// TODO: implement this function
// fn process_clock_sync_args(_argc: i32, _argv: &[String]) -> i32 {
// 0
// }

/**
* Initialize the _RTI instance.
* Initialize the RTI instance.
*/
pub fn initialize_rti() -> RTIRemote {
RTIRemote::new()
Expand Down
18 changes: 18 additions & 0 deletions rust/rti/src/net_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ pub enum MsgType {
AddressAdvertisement,
P2pSendingFedId,
P2pTaggedMessage,
ClockSyncT1,
ClockSyncT3,
ClockSyncT4,
ClockSyncCodedProbe,
PortAbsent,
NeighborStructure,
Ignore,
Expand All @@ -129,6 +133,10 @@ impl MsgType {
MsgType::AddressAdvertisement => 14,
MsgType::P2pSendingFedId => 15,
MsgType::P2pTaggedMessage => 17,
MsgType::ClockSyncT1 => 19,
MsgType::ClockSyncT3 => 20,
MsgType::ClockSyncT4 => 21,
MsgType::ClockSyncCodedProbe => 22,
MsgType::PortAbsent => 23,
MsgType::NeighborStructure => 24,
MsgType::Ignore => 250,
Expand All @@ -150,6 +158,10 @@ impl MsgType {
12 => MsgType::StopGranted,
13 => MsgType::AddressQuery,
14 => MsgType::AddressAdvertisement,
19 => MsgType::ClockSyncT1,
20 => MsgType::ClockSyncT3,
21 => MsgType::ClockSyncT4,
22 => MsgType::ClockSyncCodedProbe,
23 => MsgType::PortAbsent,
_ => MsgType::Ignore,
}
Expand Down Expand Up @@ -183,6 +195,12 @@ impl ErrType {
}
}

#[derive(PartialEq, Clone)]
pub enum SocketType {
TCP,
UDP,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 3 additions & 3 deletions rust/rti/src/net_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl NetUtil {
} {}
}

pub fn read_from_stream(stream: &mut TcpStream, buffer: &mut Vec<u8>, fed_id: u16) -> usize {
pub fn read_from_socket(stream: &mut TcpStream, buffer: &mut Vec<u8>, fed_id: u16) -> usize {
let mut bytes_read = 0;
while match stream.read(buffer) {
Ok(msg_size) => {
Expand Down Expand Up @@ -209,7 +209,7 @@ mod tests {
}

#[test]
fn test_read_from_stream_positive() {
fn test_read_from_socket_positive() {
let port_num = 35642;
let tcp_server_mocker = TcpServerMocker::new(port_num).unwrap();
let mut ip_address = LOCAL_HOST.to_owned();
Expand All @@ -225,7 +225,7 @@ mod tests {
),
);
let mut buffer = vec![0 as u8; buffer_size];
let read_size = NetUtil::read_from_stream(&mut stream, &mut buffer, 0);
let read_size = NetUtil::read_from_socket(&mut stream, &mut buffer, 0);
assert!(buffer == msg);
assert!(buffer_size == read_size);
}
Expand Down
51 changes: 48 additions & 3 deletions rust/rti/src/rti_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::constants::*;
use crate::ClockSyncStat;
use crate::RTICommon;

use std::net::UdpSocket;

/**
* Structure that an RTI instance uses to keep track of its own and its
* corresponding federates' state.
Expand Down Expand Up @@ -61,7 +63,7 @@ pub struct RTIRemote {
final_port_udp: u16,

/** The UDP socket descriptor for the socket server. */
socket_descriptor_udp: i32,
socket_descriptor_udp: Option<UdpSocket>,

/************* Clock synchronization information *************/
/* Thread performing PTP clock sync sessions periodically. */
Expand All @@ -80,7 +82,7 @@ pub struct RTIRemote {
/**
* Number of messages exchanged for each clock sync attempt.
*/
clock_sync_exchanges_per_interval: i32,
clock_sync_exchanges_per_interval: u32,

/**
* Boolean indicating that authentication is enabled.
Expand All @@ -105,7 +107,7 @@ impl RTIRemote {
final_port_tcp: 0,
socket_descriptor_tcp: -1,
final_port_udp: u16::MAX,
socket_descriptor_udp: -1,
socket_descriptor_udp: None,
clock_sync_global_status: ClockSyncStat::ClockSyncInit,
clock_sync_period_ns: 10 * 1000000,
clock_sync_exchanges_per_interval: 10,
Expand Down Expand Up @@ -138,6 +140,14 @@ impl RTIRemote {
self.user_specified_port
}

pub fn final_port_tcp(&self) -> u16 {
self.final_port_tcp
}

pub fn socket_descriptor_udp(&mut self) -> &mut Option<UdpSocket> {
&mut self.socket_descriptor_udp
}

pub fn final_port_udp(&self) -> u16 {
self.final_port_udp
}
Expand All @@ -146,6 +156,14 @@ impl RTIRemote {
self.clock_sync_global_status.clone()
}

pub fn clock_sync_period_ns(&self) -> u64 {
self.clock_sync_period_ns
}

pub fn clock_sync_exchanges_per_interval(&self) -> u32 {
self.clock_sync_exchanges_per_interval
}

pub fn stop_in_progress(&self) -> bool {
self.stop_in_progress
}
Expand All @@ -167,6 +185,33 @@ impl RTIRemote {
self.user_specified_port = user_specified_port;
}

pub fn set_final_port_tcp(&mut self, final_port_tcp: u16) {
self.final_port_tcp = final_port_tcp;
}

pub fn set_socket_descriptor_udp(&mut self, socket_descriptor_udp: Option<UdpSocket>) {
self.socket_descriptor_udp = socket_descriptor_udp;
}

pub fn set_final_port_udp(&mut self, final_port_udp: u16) {
self.final_port_udp = final_port_udp;
}

pub fn set_clock_sync_global_status(&mut self, clock_sync_global_status: ClockSyncStat) {
self.clock_sync_global_status = clock_sync_global_status;
}

pub fn set_clock_sync_period_ns(&mut self, clock_sync_period_ns: u64) {
self.clock_sync_period_ns = clock_sync_period_ns;
}

pub fn set_clock_sync_exchanges_per_interval(
&mut self,
clock_sync_exchanges_per_interval: u32,
) {
self.clock_sync_exchanges_per_interval = clock_sync_exchanges_per_interval;
}

pub fn set_stop_in_progress(&mut self, stop_in_progress: bool) {
self.stop_in_progress = stop_in_progress;
}
Expand Down
Loading
Loading