Skip to content

Commit

Permalink
start on sigint in srt-transmit
Browse files Browse the repository at this point in the history
  • Loading branch information
russelltg committed Oct 19, 2023
1 parent 36f92e6 commit e313f57
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
2 changes: 2 additions & 0 deletions srt-transmit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bytes = "1.0"
anyhow = "1"
pretty_env_logger = { version = "0.5", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
ctrlc = "3"
console-subscriber = { version = "0.1", optional = true }

[dependencies.tokio]
Expand All @@ -35,6 +36,7 @@ features = ["full"]
pretty_assertions = "1.3.0"
rand = "0.8"
tokio-stream = "0.1"
nix = "0.23"

[features]
default = []
Expand Down
37 changes: 37 additions & 0 deletions srt-transmit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio::{
net::TcpListener,
net::TcpStream,
net::UdpSocket,
sync::mpsc::channel,
spawn,
};
use tokio_util::{codec::BytesCodec, codec::Framed, codec::FramedWrite, udp::UdpFramed};
Expand Down Expand Up @@ -620,6 +621,9 @@ async fn run() -> Result<(), Error> {
.format_timestamp_micros()
.init();

let (ctrlc_tx, ctrlc_rx) = channel(1);
ctrlc::set_handler(move || ctrlc_tx.blocking_send(()).unwrap());

let app = Command::new("srt-transmit")
.version("1.0")
.author("Russell Greene")
Expand Down Expand Up @@ -680,5 +684,38 @@ async fn run() -> Result<(), Error> {
}

sinks.close().await?;

// let mut connect_send = async move {
// // poll sink and stream in parallel, only yielding when there is something ready for the sink and the stream is good.
// while let (_, Some(stream)) = try_join!(
// future::poll_fn(|cx| Pin::new(&mut sinks).poll_ready(cx)),
// stream_stream.try_next()
// )? {
// // let a: () = &mut *stream;
// sinks.send_all(&mut stream.map(Ok)).await?;
// }

// Ok::<_, Error>(sinks)
// }
// .boxed()
// .fuse();

// let ctrlc_count = select! {
// res = connect_send => res.map(|_| 0)?,
// // _ = ctrlc_rx.next() => 1,
// };

// // cancel close on second ctrl+c
// // let 2x_ctrlc = async move {
// // for _ in ctrlc_count..2 {
// // ctrlc_rx.next().await;
// // }
// // };

// select! {
// e = sinks.close() => e?,
// // _ = 2x_ctrlc => (),
// };

Ok(())
}
54 changes: 54 additions & 0 deletions srt-transmit/tests/timeout.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
use bytes::Bytes;

use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};

use srt_tokio::SrtSocketBuilder;
use std::{env, path::PathBuf, process::Stdio, time::Instant};

use bytes::Bytes;
Expand Down Expand Up @@ -108,3 +116,49 @@ async fn sender_timeout() {
};
futures::join!(sender, recvr);
}

// I could not find an easy way to send Ctrl+C from code on windows
#[cfg(not(windows))]
#[tokio::test]
async fn ctrlc() {
let _ = pretty_env_logger::try_init();

let b = SrtSocketBuilder::new_listen().local_port(1880).connect();

let mut a = Command::new(&find_stransmit_rs())
.args(&["srt://localhost:1880", "-"])
.stdout(Stdio::piped())
.spawn()
.unwrap();

let sender = async move {
let mut b = b.await.unwrap();

for _ in 0..100 {
if b.send((Instant::now(), Bytes::from_static(b"asdf\n")))
.await
.is_err()
{
return;
}
sleep(Duration::from_millis(10)).await;
}
assert!(false);
};

let recvr = async move {
let mut out = BufReader::new(a.stdout.as_mut().unwrap());
let mut line = String::new();
for _ in 0..10 {
out.read_line(&mut line).await.unwrap();
}

let pid = Pid::from_raw(a.id().unwrap() as i32);
kill(pid, Signal::SIGINT).unwrap();

sleep(Duration::from_millis(500)).await;
a.try_wait().unwrap().unwrap();
};

futures::join!(sender, recvr);
}

0 comments on commit e313f57

Please sign in to comment.