From 8c186f3f564a67b32db84cdd8bb1439af4047351 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Fri, 12 Apr 2024 10:28:03 +0700 Subject: [PATCH] Run blocking stream code on a seperate thread --- src/rtsp/stream.rs | 119 +++++++++++++++++++++++++-------------------- 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/src/rtsp/stream.rs b/src/rtsp/stream.rs index 1829da95..1350b293 100644 --- a/src/rtsp/stream.rs +++ b/src/rtsp/stream.rs @@ -725,63 +725,76 @@ async fn send_to_appsrc> + Unpin>( log::info!("Setting stream to pause"); appsrc.set_state(gstreamer::State::Paused).unwrap(); - while let Some(Ok(data)) = stream.next().await { - check_live(appsrc)?; // Stop if appsrc is dropped - - // Start on iframes - if wait_for_iframe && !data.keyframe { - continue; - } else if wait_for_iframe { - wait_for_iframe = false; - } + let (tx, mut rx) = tokio::sync::mpsc::channel::(2000); + + // Run blocking code on a seperate thread + let appsrc = appsrc.clone(); + std::thread::spawn(move || { + while let Some(data) = rx.blocking_recv() { + check_live(&appsrc)?; // Stop if appsrc is dropped + if wait_for_iframe && !data.keyframe { + continue; + } else if wait_for_iframe { + wait_for_iframe = false; + } - if let Some(rt_i) = get_runtime(appsrc) { - rt = rt_i; - } - let buf = { - // let mut gst_buf = pool.acquire_buffer(None).unwrap(); - let msg_size = data.data.len(); - let pool = pools.entry(msg_size).or_insert_with_key(|size| { - log::info!("new pool: {}", size); - let pool = gstreamer::BufferPool::new(); - let mut pool_config = pool.config(); - pool_config.set_params(None, (*size) as u32, 8, 32); - pool.set_config(pool_config).unwrap(); - // let (allocator, alloc_parms) = pool.allocator().unwrap(); - pool.set_active(true).unwrap(); - log::info!("Options: {:?}", pool.options()); - pool - }); - log::info!("buffer size: {}", data.data.len()); - let mut gst_buf = pool.acquire_buffer(None).unwrap(); - // let mut gst_buf = gstreamer::Buffer::with_size(data.data.len()).unwrap(); - { - let gst_buf_mut = gst_buf.get_mut().unwrap(); - let time = ClockTime::from_useconds(rt.as_micros() as u64); - gst_buf_mut.set_dts(ClockTime::from_useconds(dts)); - dts += 1; - gst_buf_mut.set_pts(time); - let mut gst_buf_data = gst_buf_mut.map_writable().unwrap(); - gst_buf_data.copy_from_slice(data.data.as_slice()); + if let Some(rt_i) = get_runtime(&appsrc) { + rt = rt_i; } - gst_buf - }; + let buf = { + // let mut gst_buf = pool.acquire_buffer(None).unwrap(); + let msg_size = data.data.len(); + let pool = pools.entry(msg_size).or_insert_with_key(|size| { + log::info!("new pool: {}", size); + let pool = gstreamer::BufferPool::new(); + let mut pool_config = pool.config(); + pool_config.set_params(None, (*size) as u32, 8, 32); + pool.set_config(pool_config).unwrap(); + // let (allocator, alloc_parms) = pool.allocator().unwrap(); + pool.set_active(true).unwrap(); + log::info!("Options: {:?}", pool.options()); + pool + }); + log::info!("buffer size: {}", data.data.len()); + let mut gst_buf = pool.acquire_buffer(None).unwrap(); + // let mut gst_buf = gstreamer::Buffer::with_size(data.data.len()).unwrap(); + { + let gst_buf_mut = gst_buf.get_mut().unwrap(); + let time = ClockTime::from_useconds(rt.as_micros() as u64); + gst_buf_mut.set_dts(ClockTime::from_useconds(dts)); + dts += 1; + gst_buf_mut.set_pts(time); + let mut gst_buf_data = gst_buf_mut.map_writable().unwrap(); + gst_buf_data.copy_from_slice(data.data.as_slice()); + } + gst_buf + }; - match appsrc.push_buffer(buf) { - Ok(_) => Ok(()), - Err(FlowError::Flushing) => { - // Buffer is full just skip - // - // But ensure we start with an iframe to reduce gray screens - wait_for_iframe = true; - Ok(()) + match appsrc.push_buffer(buf) { + Ok(_) => Ok(()), + Err(FlowError::Flushing) => { + // Buffer is full just skip + // + // But ensure we start with an iframe to reduce gray screens + wait_for_iframe = true; + Ok(()) + } + Err(e) => Err(anyhow!("Error in streaming: {e:?}")), + }?; + if !buffer_inited && appsrc.current_level_bytes() >= appsrc.max_bytes() { + log::info!("Setting stream to play"); + appsrc.set_state(gstreamer::State::Playing).unwrap(); + buffer_inited = true; } - Err(e) => Err(anyhow!("Error in streaming: {e:?}")), - }?; - if !buffer_inited && appsrc.current_level_bytes() >= appsrc.max_bytes() { - log::info!("Setting stream to play"); - appsrc.set_state(gstreamer::State::Playing).unwrap(); - buffer_inited = true; + } + AnyResult::Ok(()) + }); + + // Send to the blocking thread + while let Some(Ok(data)) = stream.next().await { + // Start on iframes + if tx.send(data).await.is_err() { + break; } } Ok(())