Skip to content

Commit

Permalink
Run blocking stream code on a seperate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
QuantumEntangledAndy committed Apr 12, 2024
1 parent 87e5bcb commit 8c186f3
Showing 1 changed file with 66 additions and 53 deletions.
119 changes: 66 additions & 53 deletions src/rtsp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,63 +725,76 @@ async fn send_to_appsrc<E, T: Stream<Item = Result<StampedData, E>> + Unpin>(
log::info!("Setting stream to pause");
appsrc.set_state(gstreamer::State::Paused).unwrap();

while let Some(Ok(data)) = stream.next().await {
check_live(appsrc)?; // Stop if appsrc is dropped

// Start on iframes
if wait_for_iframe && !data.keyframe {
continue;
} else if wait_for_iframe {
wait_for_iframe = false;
}
let (tx, mut rx) = tokio::sync::mpsc::channel::<StampedData>(2000);

// Run blocking code on a seperate thread
let appsrc = appsrc.clone();
std::thread::spawn(move || {
while let Some(data) = rx.blocking_recv() {
check_live(&appsrc)?; // Stop if appsrc is dropped
if wait_for_iframe && !data.keyframe {
continue;
} else if wait_for_iframe {
wait_for_iframe = false;
}

if let Some(rt_i) = get_runtime(appsrc) {
rt = rt_i;
}
let buf = {
// let mut gst_buf = pool.acquire_buffer(None).unwrap();
let msg_size = data.data.len();
let pool = pools.entry(msg_size).or_insert_with_key(|size| {
log::info!("new pool: {}", size);
let pool = gstreamer::BufferPool::new();
let mut pool_config = pool.config();
pool_config.set_params(None, (*size) as u32, 8, 32);
pool.set_config(pool_config).unwrap();
// let (allocator, alloc_parms) = pool.allocator().unwrap();
pool.set_active(true).unwrap();
log::info!("Options: {:?}", pool.options());
pool
});
log::info!("buffer size: {}", data.data.len());
let mut gst_buf = pool.acquire_buffer(None).unwrap();
// let mut gst_buf = gstreamer::Buffer::with_size(data.data.len()).unwrap();
{
let gst_buf_mut = gst_buf.get_mut().unwrap();
let time = ClockTime::from_useconds(rt.as_micros() as u64);
gst_buf_mut.set_dts(ClockTime::from_useconds(dts));
dts += 1;
gst_buf_mut.set_pts(time);
let mut gst_buf_data = gst_buf_mut.map_writable().unwrap();
gst_buf_data.copy_from_slice(data.data.as_slice());
if let Some(rt_i) = get_runtime(&appsrc) {
rt = rt_i;
}
gst_buf
};
let buf = {
// let mut gst_buf = pool.acquire_buffer(None).unwrap();
let msg_size = data.data.len();
let pool = pools.entry(msg_size).or_insert_with_key(|size| {
log::info!("new pool: {}", size);
let pool = gstreamer::BufferPool::new();
let mut pool_config = pool.config();
pool_config.set_params(None, (*size) as u32, 8, 32);
pool.set_config(pool_config).unwrap();
// let (allocator, alloc_parms) = pool.allocator().unwrap();
pool.set_active(true).unwrap();
log::info!("Options: {:?}", pool.options());
pool
});
log::info!("buffer size: {}", data.data.len());
let mut gst_buf = pool.acquire_buffer(None).unwrap();
// let mut gst_buf = gstreamer::Buffer::with_size(data.data.len()).unwrap();
{
let gst_buf_mut = gst_buf.get_mut().unwrap();
let time = ClockTime::from_useconds(rt.as_micros() as u64);
gst_buf_mut.set_dts(ClockTime::from_useconds(dts));
dts += 1;
gst_buf_mut.set_pts(time);
let mut gst_buf_data = gst_buf_mut.map_writable().unwrap();
gst_buf_data.copy_from_slice(data.data.as_slice());
}
gst_buf
};

match appsrc.push_buffer(buf) {
Ok(_) => Ok(()),
Err(FlowError::Flushing) => {
// Buffer is full just skip
//
// But ensure we start with an iframe to reduce gray screens
wait_for_iframe = true;
Ok(())
match appsrc.push_buffer(buf) {
Ok(_) => Ok(()),
Err(FlowError::Flushing) => {
// Buffer is full just skip
//
// But ensure we start with an iframe to reduce gray screens
wait_for_iframe = true;
Ok(())
}
Err(e) => Err(anyhow!("Error in streaming: {e:?}")),
}?;
if !buffer_inited && appsrc.current_level_bytes() >= appsrc.max_bytes() {
log::info!("Setting stream to play");
appsrc.set_state(gstreamer::State::Playing).unwrap();
buffer_inited = true;
}
Err(e) => Err(anyhow!("Error in streaming: {e:?}")),
}?;
if !buffer_inited && appsrc.current_level_bytes() >= appsrc.max_bytes() {
log::info!("Setting stream to play");
appsrc.set_state(gstreamer::State::Playing).unwrap();
buffer_inited = true;
}
AnyResult::Ok(())
});

// Send to the blocking thread
while let Some(Ok(data)) = stream.next().await {
// Start on iframes
if tx.send(data).await.is_err() {
break;
}
}
Ok(())
Expand Down

0 comments on commit 8c186f3

Please sign in to comment.