From 6c401f5501d72255787510bcafadb2e1b3337594 Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Tue, 12 Jul 2022 09:48:49 -0700 Subject: [PATCH 1/4] Add interval argument to emitter --- chrono_examples/src/bin/emitter.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/chrono_examples/src/bin/emitter.rs b/chrono_examples/src/bin/emitter.rs index 089ddc9..c778d0c 100644 --- a/chrono_examples/src/bin/emitter.rs +++ b/chrono_examples/src/bin/emitter.rs @@ -1,4 +1,6 @@ use std::net::{IpAddr, UdpSocket, SocketAddr}; +use std::thread::sleep; +use std::time::Duration; use std::pin::Pin; use clap::Parser; @@ -15,6 +17,8 @@ struct Args { port: u16, #[clap(short, long, env, value_parser = clap::value_parser!(u16).range(1..1500))] size: u16, + #[clap(long, env, default_value_t = 0)] + interval: u64 } pub fn main() -> std::io::Result<()> { @@ -43,6 +47,8 @@ pub fn main() -> std::io::Result<()> { archived_message.modify_app_sequence_number(seqno + 1); socket.send(&message_bytes)?; + + sleep(Duration::from_millis(args.interval)); // on Unix, doesn't even do a syscall with arg of 0 } } From e30a48a698cbaf953f487cc051c99cb35ea3bf19 Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Tue, 12 Jul 2022 09:50:14 -0700 Subject: [PATCH 2/4] Run cargo fmt and cargo clippy --- chrono_examples/src/bin/emitter.rs | 36 ++++++++++++++---------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/chrono_examples/src/bin/emitter.rs b/chrono_examples/src/bin/emitter.rs index c778d0c..ad53562 100644 --- a/chrono_examples/src/bin/emitter.rs +++ b/chrono_examples/src/bin/emitter.rs @@ -1,12 +1,14 @@ -use std::net::{IpAddr, UdpSocket, SocketAddr}; -use std::thread::sleep; -use std::time::Duration; -use std::pin::Pin; +use std::{ + net::{IpAddr, SocketAddr, UdpSocket}, + pin::Pin, + thread::sleep, + time::Duration, +}; use clap::Parser; use rand::random; -use sequencer_common::{ArchivedSequencerMessage, LengthTag, SequencerMessage}; +use sequencer_common::SequencerMessage; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -18,37 +20,33 @@ struct Args { #[clap(short, long, env, value_parser = clap::value_parser!(u16).range(1..1500))] size: u16, #[clap(long, env, default_value_t = 0)] - interval: u64 + interval: u64, } pub fn main() -> std::io::Result<()> { let args = Args::parse(); - + // prepare port to send to sequencer let socket = UdpSocket::bind("127.0.0.1:0")?; socket.connect(SocketAddr::new(args.ip_address, args.port))?; - + // generate a random message payload - let payload :Vec = (0..args.size).map(|_| random()).collect(); - let message = SequencerMessage::new( - random(), - random(), - random(), - 0, - payload, - ); + let payload: Vec = (0..args.size).map(|_| random()).collect(); + let message = SequencerMessage::new(random(), random(), random(), 0, payload); let mut message_bytes = rkyv::util::to_bytes::<_, 256>(&message).unwrap(); rkyv::check_archived_root::(&message_bytes).unwrap(); loop { - let archived_message = unsafe { rkyv::archived_root_mut::(Pin::new(&mut message_bytes)) }; + let archived_message = + unsafe { rkyv::archived_root_mut::(Pin::new(&mut message_bytes)) }; let seqno = archived_message.app_sequence_number; archived_message.modify_app_sequence_number(seqno + 1); - + socket.send(&message_bytes)?; - sleep(Duration::from_millis(args.interval)); // on Unix, doesn't even do a syscall with arg of 0 + sleep(Duration::from_millis(args.interval)); // on Unix, doesn't even do + // a syscall with arg of 0 } } From 478cf7d23c9d0a599ef21068cc50400fc1916e96 Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Tue, 12 Jul 2022 13:05:46 -0700 Subject: [PATCH 3/4] Implement flow-control example --- Cargo.lock | 1 + chrono_examples/Cargo.toml | 1 + chrono_examples/src/bin/emitter.rs | 12 +-- chrono_examples/src/bin/flowcontrol.rs | 122 +++++++++++++++++++++++++ 4 files changed, 125 insertions(+), 11 deletions(-) create mode 100644 chrono_examples/src/bin/flowcontrol.rs diff --git a/Cargo.lock b/Cargo.lock index 744171e..4e5077e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,7 @@ name = "chrono_examples" version = "0.1.0" dependencies = [ "clap 3.2.7", + "iodaemon", "rand", "rkyv", "sequencer_common", diff --git a/chrono_examples/Cargo.toml b/chrono_examples/Cargo.toml index 4b6ec08..1e177f0 100644 --- a/chrono_examples/Cargo.toml +++ b/chrono_examples/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] sequencer_common = { path = "../sequencer_common" } +iodaemon = { path = "../iodaemon" } rkyv = { version = "0.7", features = ["validation", "size_32", "alloc"] } diff --git a/chrono_examples/src/bin/emitter.rs b/chrono_examples/src/bin/emitter.rs index ad53562..3b3decf 100644 --- a/chrono_examples/src/bin/emitter.rs +++ b/chrono_examples/src/bin/emitter.rs @@ -17,7 +17,7 @@ struct Args { ip_address: IpAddr, #[clap(short, long, env)] port: u16, - #[clap(short, long, env, value_parser = clap::value_parser!(u16).range(1..1500))] + #[clap(short, long, env, value_parser = clap::value_parser!(u16).range(1..=1500))] size: u16, #[clap(long, env, default_value_t = 0)] interval: u64, @@ -50,13 +50,3 @@ pub fn main() -> std::io::Result<()> { } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} diff --git a/chrono_examples/src/bin/flowcontrol.rs b/chrono_examples/src/bin/flowcontrol.rs new file mode 100644 index 0000000..4344a32 --- /dev/null +++ b/chrono_examples/src/bin/flowcontrol.rs @@ -0,0 +1,122 @@ +use std::{ + net::{IpAddr, SocketAddr, UdpSocket}, + pin::Pin, + thread::{sleep, spawn}, + time::{Duration, Instant}, + sync::{Arc, atomic::{AtomicUsize, Ordering}}, + path::PathBuf, +}; + +use clap::Parser; +use rand::random; + +use sequencer_common::SequencerMessage; +use iodaemon::BlockingConsumer; + +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +struct Args { + #[clap(short, long, env)] + ip_address: IpAddr, + #[clap(short, long, env)] + port: u16, + #[clap(short, long, env, value_parser = clap::value_parser!(u16).range(1..=1500))] + size: u16, + #[clap(long, env, default_value_t = 0)] + interval: u64, + #[clap(long, env, default_value_t = 0)] + timeout: u64, + #[clap(short, long, env, default_value_t = 1)] + window: usize, + #[clap(long, env)] + path: Option, +} + +pub fn main() -> std::io::Result<()> { + let args = Args::parse(); + + // prepare port to send to sequencer + let socket = UdpSocket::bind("127.0.0.1:0")?; + socket.connect(SocketAddr::new(args.ip_address, args.port))?; + + // generate a random message payload + let payload: Vec = (0..args.size).map(|_| random()).collect(); + let application_id = random(); + let message = SequencerMessage::new(application_id, random(), random(), 0, payload); + let mut message_bytes = rkyv::util::to_bytes::<_, 256>(&message).unwrap(); + rkyv::check_archived_root::(&message_bytes).unwrap(); + + let last_received_arc = Arc::new(AtomicUsize::new(0)); + let mut sequence_number = 0; + + let _sending = spawn({ + let last_received = last_received_arc.clone(); + move || { + loop { + let archived_message = + unsafe { rkyv::archived_root_mut::(Pin::new(&mut message_bytes)) }; + + archived_message.modify_app_sequence_number(sequence_number); + let timer = Instant::now(); + while last_received.load(Ordering::Relaxed) + args.window < sequence_number as usize { + // spinlock is fine for now + if timer.elapsed() > Duration::from_millis(args.timeout) { + // jump back on sequence numbers to last received + sequence_number = last_received.load(Ordering::Relaxed) as u64; + continue; + } + } + + if socket.send(&message_bytes).is_err() { + break; + } + + sequence_number += 1; + + sleep(Duration::from_millis(args.interval)); // on Unix, doesn't even do + // a syscall with arg of 0 + } + } + }); + + let receiving = spawn({ + let last_received = last_received_arc.clone(); + move || { + let input_file = if args.path.is_some() { + args.path.unwrap() + } else { + PathBuf::from("/var/run/iodaemon/output") + }; + + let mut reader = BlockingConsumer::new(input_file); + + while let Ok(message) = reader.read_message() { + let appid = message.app_id; + let seqno = message.app_sequence_number; + let expected_seqno = last_received.load(Ordering::Acquire) + 1 ; + if appid == application_id { + if seqno == expected_seqno as u64 { + last_received.store(seqno as usize, Ordering::Release); + } + else { + eprintln!("WARNING: Expected sequence number {}, got {}", expected_seqno, seqno); + } + } + }} + }); + + receiving.join().expect("Couldn't join receiver thread."); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} From e3303049b87005abd6bc16439802c8749bc210fd Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Wed, 13 Jul 2022 15:21:12 -0700 Subject: [PATCH 4/4] Run format and clippy --- chrono_examples/src/bin/emitter.rs | 1 - chrono_examples/src/bin/flowcontrol.rs | 110 ++++++++++++++----------- 2 files changed, 60 insertions(+), 51 deletions(-) diff --git a/chrono_examples/src/bin/emitter.rs b/chrono_examples/src/bin/emitter.rs index 3b3decf..e8fd341 100644 --- a/chrono_examples/src/bin/emitter.rs +++ b/chrono_examples/src/bin/emitter.rs @@ -49,4 +49,3 @@ pub fn main() -> std::io::Result<()> { // a syscall with arg of 0 } } - diff --git a/chrono_examples/src/bin/flowcontrol.rs b/chrono_examples/src/bin/flowcontrol.rs index 4344a32..42ae3bb 100644 --- a/chrono_examples/src/bin/flowcontrol.rs +++ b/chrono_examples/src/bin/flowcontrol.rs @@ -1,17 +1,20 @@ use std::{ net::{IpAddr, SocketAddr, UdpSocket}, + path::PathBuf, pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, thread::{sleep, spawn}, time::{Duration, Instant}, - sync::{Arc, atomic::{AtomicUsize, Ordering}}, - path::PathBuf, }; use clap::Parser; use rand::random; -use sequencer_common::SequencerMessage; use iodaemon::BlockingConsumer; +use sequencer_common::SequencerMessage; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -48,65 +51,72 @@ pub fn main() -> std::io::Result<()> { let last_received_arc = Arc::new(AtomicUsize::new(0)); let mut sequence_number = 0; - - let _sending = spawn({ + + let _sending = spawn({ let last_received = last_received_arc.clone(); move || { - loop { - let archived_message = - unsafe { rkyv::archived_root_mut::(Pin::new(&mut message_bytes)) }; - - archived_message.modify_app_sequence_number(sequence_number); - let timer = Instant::now(); - while last_received.load(Ordering::Relaxed) + args.window < sequence_number as usize { - // spinlock is fine for now - if timer.elapsed() > Duration::from_millis(args.timeout) { - // jump back on sequence numbers to last received - sequence_number = last_received.load(Ordering::Relaxed) as u64; - continue; + loop { + let archived_message = unsafe { + rkyv::archived_root_mut::(Pin::new(&mut message_bytes)) + }; + + archived_message.modify_app_sequence_number(sequence_number); + let timer = Instant::now(); + while last_received.load(Ordering::Relaxed) + args.window < sequence_number as usize + { + // spinlock is fine for now + if timer.elapsed() > Duration::from_millis(args.timeout) { + // jump back on sequence numbers to last received + sequence_number = last_received.load(Ordering::Relaxed) as u64; + continue; + } } - } - if socket.send(&message_bytes).is_err() { - break; - } - - sequence_number += 1; + if socket.send(&message_bytes).is_err() { + break; + } - sleep(Duration::from_millis(args.interval)); // on Unix, doesn't even do - // a syscall with arg of 0 - } + sequence_number += 1; + + sleep(Duration::from_millis(args.interval)); // on Unix, doesn't + // even do + // a syscall with + // arg of 0 + } } }); - - let receiving = spawn({ - let last_received = last_received_arc.clone(); + + let receiving = spawn({ + let last_received = last_received_arc; move || { - let input_file = if args.path.is_some() { - args.path.unwrap() - } else { - PathBuf::from("/var/run/iodaemon/output") - }; - - let mut reader = BlockingConsumer::new(input_file); - - while let Ok(message) = reader.read_message() { - let appid = message.app_id; - let seqno = message.app_sequence_number; - let expected_seqno = last_received.load(Ordering::Acquire) + 1 ; - if appid == application_id { - if seqno == expected_seqno as u64 { - last_received.store(seqno as usize, Ordering::Release); - } - else { - eprintln!("WARNING: Expected sequence number {}, got {}", expected_seqno, seqno); + let input_file = if args.path.is_some() { + args.path.unwrap() + } else { + PathBuf::from("/var/run/iodaemon/output") + }; + + let mut reader = BlockingConsumer::new(input_file); + + while let Ok(message) = reader.read_message() { + let appid = message.app_id; + let seqno = message.app_sequence_number; + let expected_seqno = last_received.load(Ordering::Acquire) + 1; + if appid == application_id { + if seqno == expected_seqno as u64 { + last_received.store(seqno as usize, Ordering::Release); + } else { + eprintln!( + "WARNING: Expected sequence number {}, got {}", + expected_seqno, seqno + ); + } } } - }} + } }); - + receiving.join().expect("Couldn't join receiver thread."); - + Ok(()) }