From d09874eca5000edf59a7ed70a50f6e6676f79ced Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Fri, 25 Oct 2024 13:45:18 +0700 Subject: [PATCH 01/12] Test the windows path on the runner --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 13d3de1a..82bb6fb2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -73,6 +73,7 @@ jobs: # Set up pkgconfig for gstreamer $env:PKG_CONFIG_PATH += ';' + $env:GSTREAMER_1_0_ROOT_MSVC_X86_64 + '\lib\pkgconfig' + dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\lib\pkgconfig" # Set github vars Add-Content -Path $env:GITHUB_ENV -Value "GSTREAMER_1_0_ROOT_MSVC_X86_64=$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" From 5279a34bcb5f5fe878776dfd22addc56d07bb34b Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Fri, 25 Oct 2024 13:57:36 +0700 Subject: [PATCH 02/12] More investigation of windows build error on the runner --- .github/workflows/build.yml | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 82bb6fb2..3a147783 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -33,13 +33,13 @@ jobs: - uses: actions/checkout@v4 name: Checkout onto ${{ runner.os }} - if: runner.os == 'Linux' - name: apt install gstreamer + name: apt install linux deps run: | sudo apt update sudo apt install -y aptitude sudo aptitude install -y libgstrtspserver-1.0-dev libgstreamer1.0-dev libgtk2.0-dev protobuf-compiler libssl-dev - if: runner.os == 'Windows' - name: Install Gstreamer + name: Install Windows deps run: | # Gstreamer choco install -y --no-progress gstreamer --version=1.20.0 @@ -73,7 +73,12 @@ jobs: # Set up pkgconfig for gstreamer $env:PKG_CONFIG_PATH += ';' + $env:GSTREAMER_1_0_ROOT_MSVC_X86_64 + '\lib\pkgconfig' - dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\lib\pkgconfig" + echo "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" + dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" + echo "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib" + dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib" + echo $env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib\\pkgconfig" + dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib\\pkgconfig" # Set github vars Add-Content -Path $env:GITHUB_ENV -Value "GSTREAMER_1_0_ROOT_MSVC_X86_64=$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" @@ -86,7 +91,7 @@ jobs: dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" dir "$env:OPENSSL_DIR" - if: runner.os == 'macOS' - name: Install Gstreamer on macOS + name: Install macOS deps run: | curl -L 'https://gstreamer.freedesktop.org/data/pkg/osx/1.20.4/gstreamer-1.0-devel-1.20.4-universal.pkg' -o "$(pwd)/gstreamer-devel.pkg" sudo installer -verbose -pkg "$(pwd)/gstreamer-devel.pkg" -target / From 81f77f461c426ddd9f555c3db5181039b4284edc Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Fri, 25 Oct 2024 13:59:46 +0700 Subject: [PATCH 03/12] Fix quotes --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3a147783..58e7e7f8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -77,7 +77,7 @@ jobs: dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" echo "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib" dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib" - echo $env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib\\pkgconfig" + echo "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib\\pkgconfig" dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib\\pkgconfig" # Set github vars From fb956c399e517cde4a1125a961f74f67748d1a05 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Fri, 25 Oct 2024 15:29:44 +0700 Subject: [PATCH 04/12] Bump gstreamer for windows build --- .github/workflows/build.yml | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 58e7e7f8..6fa7ffe9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,8 +42,8 @@ jobs: name: Install Windows deps run: | # Gstreamer - choco install -y --no-progress gstreamer --version=1.20.0 - choco install -y --no-progress gstreamer-devel --version=1.20.0 + choco install -y --no-progress gstreamer --version=1.24.2 + choco install -y --no-progress gstreamer-devel --version=1.24.2 $env:GSTREAMER_1_0_ROOT_MSVC_X86_64=$env:SYSTEMDRIVE + '\gstreamer\1.0\msvc_x86_64\' # Github runners work on both C or D drive and figuring out which was used is difficult if (-not (Test-Path -Path "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" -PathType Container)) { @@ -71,15 +71,6 @@ jobs: $env:OPENSSL_DIR='D:\\Program Files\OpenSSL\' } - # Set up pkgconfig for gstreamer - $env:PKG_CONFIG_PATH += ';' + $env:GSTREAMER_1_0_ROOT_MSVC_X86_64 + '\lib\pkgconfig' - echo "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" - dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" - echo "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib" - dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib" - echo "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib\\pkgconfig" - dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\\lib\\pkgconfig" - # Set github vars Add-Content -Path $env:GITHUB_ENV -Value "GSTREAMER_1_0_ROOT_MSVC_X86_64=$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" Add-Content -Path $env:GITHUB_PATH -Value "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\bin" From 4f8fb6c7b5edceaee7fbc21dbaae9ad2c39e6a40 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Fri, 25 Oct 2024 15:30:46 +0700 Subject: [PATCH 05/12] Remove the extra path var in windows build --- .github/workflows/build.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6fa7ffe9..f9364755 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -76,7 +76,6 @@ jobs: Add-Content -Path $env:GITHUB_PATH -Value "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\bin" Add-Content -Path $env:GITHUB_PATH -Value "%GSTREAMER_1_0_ROOT_MSVC_X86_64%\bin" Add-Content -Path $env:GITHUB_ENV -Value "OPENSSL_DIR=$env:OPENSSL_DIR" - Add-Content -Path $env:GITHUB_ENV -Value "PKG_CONFIG_PATH=$env:PKG_CONFIG_PATH" # One last check on directories dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" From ed4a5ea001e31ebd1b06215effb083f673f40a67 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Sat, 26 Oct 2024 14:33:19 +0700 Subject: [PATCH 06/12] Style fixes --- Cargo.lock | 25 +++---------------- crates/core/src/bc/de.rs | 6 ++--- crates/core/src/bc/model.rs | 11 +++----- .../core/src/bc_protocol/connection/bcsub.rs | 4 +-- crates/core/src/bc_protocol/resolution.rs | 2 +- crates/core/src/bcudp/codex.rs | 9 ------- crates/mailnoti/src/main.rs | 8 ------ src/common/instance.rs | 3 +-- src/mqtt/mqttc.rs | 2 +- 9 files changed, 13 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 567f61f1..c0e32a35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -839,25 +839,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "fcm-push-listener" -version = "3.0.0" -dependencies = [ - "base64 0.21.7", - "ece", - "log", - "prost", - "prost-build", - "rand", - "reqwest", - "serde", - "serde_with", - "tokio", - "tokio-rustls 0.23.4", - "uuid", - "webpki-roots", -] - [[package]] name = "fixedbitset" version = "0.4.2" @@ -1895,7 +1876,7 @@ dependencies = [ "crossbeam-channel", "dirs", "env_logger 0.11.3", - "fcm-push-listener 2.0.3", + "fcm-push-listener", "futures", "gstreamer", "gstreamer-app", @@ -2339,7 +2320,7 @@ dependencies = [ "anyhow", "clap", "env_logger 0.10.2", - "fcm-push-listener 2.0.3", + "fcm-push-listener", "lazy_static", "log", "neolink_core", diff --git a/crates/core/src/bc/de.rs b/crates/core/src/bc/de.rs index d8da749c..182bcc7c 100644 --- a/crates/core/src/bc/de.rs +++ b/crates/core/src/bc/de.rs @@ -90,10 +90,8 @@ fn bc_modern_msg<'a>( E::add_context(input, ctx, E::from_error_kind(input, kind)) } - let ext_len = match header.payload_offset { - Some(off) => off, - _ => 0, // If missing payload_offset treat all as payload - }; + // If missing payload_offset treat all as payload + let ext_len = header.payload_offset.unwrap_or_default(); let (buf, ext_buf) = take(ext_len)(buf)?; let payload_len = header.body_len - ext_len; diff --git a/crates/core/src/bc/model.rs b/crates/core/src/bc/model.rs index 2e8dd052..6449fb3c 100644 --- a/crates/core/src/bc/model.rs +++ b/crates/core/src/bc/model.rs @@ -192,8 +192,11 @@ pub struct BcMeta { pub stream_type: u8, /// On modern messages this is the response code /// When sending a command it is set to `0`. The reply from the camera can be + /// /// - `200` for OK + /// /// - `400` for bad request + /// /// A malformed packet will return a `400` code pub response_code: u16, /// A message ID is used to match replies with requests. The camera will parrot back @@ -213,14 +216,6 @@ pub struct BcMeta { pub class: u16, } -/// The components of the Baichuan header that must be filled out after the body is serialized, or -/// is needed for the deserialization of the body (strictly part of the wire format of the message) -#[derive(Debug, PartialEq, Eq)] -pub(super) struct BcSendInfo { - pub body_len: u32, - pub payload_offset: Option, -} - #[derive(Debug)] pub(crate) struct BcContext { pub(crate) credentials: Credentials, diff --git a/crates/core/src/bc_protocol/connection/bcsub.rs b/crates/core/src/bc_protocol/connection/bcsub.rs index 89335352..6f8d21c3 100644 --- a/crates/core/src/bc_protocol/connection/bcsub.rs +++ b/crates/core/src/bc_protocol/connection/bcsub.rs @@ -20,9 +20,9 @@ pub struct BcStream<'a> { rx: &'a mut ReceiverStream>, } -impl<'a> Unpin for BcStream<'a> {} +impl Unpin for BcStream<'_> {} -impl<'a> Stream for BcStream<'a> { +impl Stream for BcStream<'_> { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { diff --git a/crates/core/src/bc_protocol/resolution.rs b/crates/core/src/bc_protocol/resolution.rs index 17e99b2b..1fb308bc 100644 --- a/crates/core/src/bc_protocol/resolution.rs +++ b/crates/core/src/bc_protocol/resolution.rs @@ -214,7 +214,7 @@ impl ToSocketAddrsOrUid for SocketAddrV6 { } } -impl<'a> ToSocketAddrsOrUid for &'a [SocketAddr] { +impl ToSocketAddrsOrUid for &'_ [SocketAddr] { type UidIter = std::vec::IntoIter; fn to_socket_addrs_or_uid(&self) -> Result { diff --git a/crates/core/src/bcudp/codex.rs b/crates/core/src/bcudp/codex.rs index fb54e6ec..d612dc43 100644 --- a/crates/core/src/bcudp/codex.rs +++ b/crates/core/src/bcudp/codex.rs @@ -34,15 +34,6 @@ impl Decoder for BcUdpCodex { type Item = BcUdp; type Error = Error; - /// Since frames can cross EOF boundaries we overload this so it doesn't error if - /// there are bytes left on the stream - // fn decode_eof(&mut self, buf: &mut BytesMut) -> Result> { - // match self.decode(buf)? { - // Some(frame) => Ok(Some(frame)), - // None => Ok(None), - // } - // } - fn decode(&mut self, src: &mut BytesMut) -> Result> { log::trace!("Decoding:"); if src.is_empty() { diff --git a/crates/mailnoti/src/main.rs b/crates/mailnoti/src/main.rs index 4ffc54e2..a223a733 100644 --- a/crates/mailnoti/src/main.rs +++ b/crates/mailnoti/src/main.rs @@ -53,14 +53,6 @@ async fn main() -> Result<()> { Ok(()) } -fn get_local_ip() -> Result { - get_if_addrs::get_if_addrs()? - .iter() - .find(|i| !i.is_loopback() && matches!(i.addr, get_if_addrs::IfAddr::V4(_))) - .map(|iface| Ok(iface.ip())) - .unwrap_or_else(|| Err(anyhow!("No Local Ip Address Found"))) -} - async fn cam_tasks(name: &str, camera: BcCamera, addr: SocketAddr) -> Result<()> { let support = camera.get_support().await?; if support.email.is_some_and(|v| v > 0) { diff --git a/src/common/instance.rs b/src/common/instance.rs index f3ecfc6c..4b5811da 100644 --- a/src/common/instance.rs +++ b/src/common/instance.rs @@ -26,7 +26,6 @@ use neolink_core::{ bc_protocol::{BcCamera, StreamKind}, bcmedia::model::BcMedia, }; -#[cfg(feature = "gstreamer")] /// This instance is the primary interface used throughout the app /// @@ -423,7 +422,7 @@ impl NeoInstance { }), )); - #[cfg(pushnoti)] + #[cfg(feature = "pushnoti")] { // Creates a permit for controlling based on the PN let pn_permit = counter.create_deactivated().await?; diff --git a/src/mqtt/mqttc.rs b/src/mqtt/mqttc.rs index c7f8d06f..f82ec709 100644 --- a/src/mqtt/mqttc.rs +++ b/src/mqtt/mqttc.rs @@ -339,7 +339,7 @@ impl<'a> MqttBackend<'a> { } } -impl<'a> Drop for MqttBackend<'a> { +impl Drop for MqttBackend<'_> { fn drop(&mut self) { self.cancel.cancel(); } From e6c6f5ca62654ed36d7cb31f9a7025f04febc2f5 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Sat, 26 Oct 2024 14:39:30 +0700 Subject: [PATCH 07/12] Try and fix the gstreamer only cfg --- src/common/neocam.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/neocam.rs b/src/common/neocam.rs index 22a22db7..55d18bee 100644 --- a/src/common/neocam.rs +++ b/src/common/neocam.rs @@ -27,7 +27,7 @@ use super::{ use super::{PnRequest, PushNoti}; use crate::{config::CameraConfig, AnyResult, Result}; use neolink_core::bc_protocol::BcCamera; -#[cfg(feature = "gstreamer")] + #[allow(dead_code)] pub(crate) enum NeoCamCommand { HangUp, From 15170775649578c204f6c3faaee80c48c5049f17 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Sat, 26 Oct 2024 15:13:48 +0700 Subject: [PATCH 08/12] More fixes to features --- .github/workflows/style_checks.yml | 4 + src/common/instance.rs | 262 ++--------------------------- src/common/neocam.rs | 1 - 3 files changed, 14 insertions(+), 253 deletions(-) diff --git a/.github/workflows/style_checks.yml b/.github/workflows/style_checks.yml index 0200d1e7..4d068d52 100644 --- a/.github/workflows/style_checks.yml +++ b/.github/workflows/style_checks.yml @@ -35,9 +35,13 @@ jobs: rustup override set nightly - name: Run clippy manually run: | + echo "All Features" cargo +nightly clippy --workspace --all-targets --all-features || exit 1 + echo "No Features" cargo +nightly clippy --workspace --all-targets --no-default-features || exit 1 + echo "Gstreamer Only" cargo +nightly clippy --workspace --all-targets --no-default-features --features=gstreamer || exit 1 + echo "Pushnoti Only" cargo +nightly clippy --workspace --all-targets --no-default-features --features=pushnoti || exit 1 check_fmt: diff --git a/src/common/instance.rs b/src/common/instance.rs index 4b5811da..25db1a72 100644 --- a/src/common/instance.rs +++ b/src/common/instance.rs @@ -5,27 +5,25 @@ //! The camera watch is used as an event to be triggered //! whenever the camera is lost/updated use anyhow::{anyhow, Context}; -use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; +use futures::TryFutureExt; use std::sync::{Arc, Weak}; -#[cfg(feature = "pushnoti")] -use tokio::sync::watch::channel as watch; use tokio::{ sync::{ - mpsc::Receiver as MpscReceiver, mpsc::Sender as MpscSender, oneshot::channel as oneshot, - watch::Receiver as WatchReceiver, + mpsc::Sender as MpscSender, oneshot::channel as oneshot, watch::Receiver as WatchReceiver, }, time::{sleep, Duration}, }; use tokio_util::sync::CancellationToken; -#[cfg(feature = "pushnoti")] -use super::PushNoti; -use super::{MdState, NeoCamCommand, NeoCamThreadState, Permit, UseCounter}; +use super::{MdState, NeoCamCommand, NeoCamThreadState, Permit}; use crate::{config::CameraConfig, AnyResult, Result}; -use neolink_core::{ - bc_protocol::{BcCamera, StreamKind}, - bcmedia::model::BcMedia, -}; +use neolink_core::bc_protocol::BcCamera; + +#[cfg(feature = "gstreamer")] +mod gst; + +#[cfg(feature = "pushnoti")] +mod pushnoti; /// This instance is the primary interface used throughout the app /// @@ -232,50 +230,6 @@ impl NeoInstance { } } - #[cfg(feature = "pushnoti")] - pub(crate) async fn uid(&self) -> Result { - let (reply_tx, reply_rx) = oneshot(); - self.camera_control - .send(NeoCamCommand::GetUid(reply_tx)) - .await?; - Ok(reply_rx.await?) - } - - #[cfg(feature = "pushnoti")] - pub(crate) async fn push_notifications(&self) -> Result>> { - let uid = self.uid().await?; - let (instance_tx, instance_rx) = oneshot(); - self.camera_control - .send(NeoCamCommand::PushNoti(instance_tx)) - .await?; - let mut source_watch = instance_rx.await?; - - let (fwatch_tx, fwatch_rx) = watch(None); - tokio::task::spawn(async move { - loop { - match source_watch - .wait_for(|i| { - fwatch_tx.borrow().as_ref() != i.as_ref() - && i.as_ref() - .is_some_and(|i| i.message.contains(&format!("\"{uid}\""))) - }) - .await - { - Ok(pn) => { - log::trace!("Forwarding push notification about {}", uid); - let _ = fwatch_tx.send_replace(pn.clone()); - } - Err(e) => { - break Err(e); - } - } - }?; - AnyResult::Ok(()) - }); - - Ok(fwatch_rx) - } - pub(crate) async fn motion(&self) -> Result> { let (instance_tx, instance_rx) = oneshot(); self.camera_control @@ -345,202 +299,6 @@ impl NeoInstance { timeout, } } - - /// Streams a camera source while not paused - pub(crate) async fn stream_while_live( - &self, - stream: StreamKind, - ) -> AnyResult> { - let config = self.config().await?.borrow().clone(); - let name = config.name.clone(); - - let media_rx = if config.pause.on_motion { - let (media_tx, media_rx) = tokio::sync::mpsc::channel(100); - let counter = UseCounter::new().await; - - let mut md = self.motion().await?; - let mut tasks = FuturesUnordered::new(); - // Stream for 5s on a new client always - // This lets us negotiate the camera stream type - let init_permit = counter.create_activated().await?; - tokio::spawn( - async { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - drop(init_permit); - } - .map(|e| { - log::debug!("Init permit thread stopped {e:?}"); - e - }), - ); - - // Create the permit for controlling the motion - let mut md_permit = { - let md_state = md.borrow_and_update().clone(); - match md_state { - MdState::Start(_) => { - log::info!("{name}::{stream:?}: Starting with Motion"); - counter.create_activated().await? - } - MdState::Stop(_) | MdState::Unknown => { - log::info!("{name}::{stream:?}: Waiting with Motion"); - counter.create_deactivated().await? - } - } - }; - // Now listen to the motion - let thread_name = name.clone(); - tasks.push(tokio::spawn( - async move { - loop { - match md.changed().await { - Ok(_) => { - let md_state: MdState = md.borrow_and_update().clone(); - match md_state { - MdState::Start(_) => { - log::info!("{thread_name}::{stream:?}: Motion Started"); - md_permit.activate().await?; - } - MdState::Stop(_) => { - log::info!("{thread_name}::{stream:?}: Motion Stopped"); - md_permit.deactivate().await?; - } - MdState::Unknown => {} - } - } - Err(e) => { - // Use break here so we can define the full type on the async closure - break AnyResult::Err(e.into()); - } - } - }?; - AnyResult::Ok(()) - } - .map(|e| { - log::debug!("Motion thread stopped {e:?}"); - e - }), - )); - - #[cfg(feature = "pushnoti")] - { - // Creates a permit for controlling based on the PN - let pn_permit = counter.create_deactivated().await?; - let mut pn = self.push_notifications().await?; - pn.borrow_and_update(); // Ignore any PNs that have already been sent before this - let thread_name = name.clone(); - tasks.push(tokio::spawn( - async move { - loop { - let noti: Option = pn.borrow_and_update().clone(); - if let Some(noti) = noti { - if noti.message.contains("Motion Alert from") { - log::info!( - "{thread_name}::{stream:?}: Push Notification Recieved" - ); - let mut new_pn_permit = pn_permit.subscribe(); - new_pn_permit.activate().await?; - tokio::spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_secs(30)) - .await; - drop(new_pn_permit); - }); - } - } - if let Err(e) = pn.changed().await { - break Err(e); - } - }?; - AnyResult::Ok(()) - } - .map(|e| { - log::debug!("PN thread stopped {e:?}"); - e - }), - )); - } - - // Send the camera when the pemit is active - let camera_permit = counter.create_deactivated().await?; - let thread_camera = self.clone(); - tokio::spawn( - async move { - loop { - if let Err(e) = camera_permit.aquired_users().await { - break AnyResult::Err(e); - } - log::debug!("Starting stream"); - tokio::select! { - v = camera_permit.dropped_users() => { - log::debug!("Dropped users: {v:?}"); - v - }, - v = async { - log::debug!("Getting stream"); - let mut stream = thread_camera.stream(stream).await?; - log::debug!("Got stream"); - while let Some(media) = stream.recv().await { - media_tx.send(media).await?; - } - AnyResult::Ok(()) - } => { - log::debug!("Stopped stream: {v:?}"); - v - }, - v = tasks.next() => { - log::debug!("Task failed: {v:?}"); - Err(anyhow!("Task ended prematurly: {v:?}")) - } - }?; - log::debug!("Pausing stream"); - }?; - drop(counter); // Make sure counter is owned by this thread - AnyResult::Ok(()) - } - .map(|e| { - log::debug!("Stream thread stopped {e:?}"); - e - }), - ); - - Ok(media_rx) - } else { - self.stream(stream).await - }?; - - Ok(media_rx) - } - - /// Streams a camera source - pub(crate) async fn stream(&self, stream: StreamKind) -> AnyResult> { - let (media_tx, media_rx) = tokio::sync::mpsc::channel(100); - let config = self.config().await?.borrow().clone(); - let strict = config.strict; - let thread_camera = self.clone(); - tokio::task::spawn( - tokio::task::spawn(async move { - thread_camera - .run_task(move |cam| { - let media_tx = media_tx.clone(); - Box::pin(async move { - let mut media_stream = cam.start_video(stream, 0, strict).await?; - log::trace!("Camera started"); - while let Ok(media) = media_stream.get_data().await? { - media_tx.send(media).await?; - } - AnyResult::Ok(()) - }) - }) - .await - }) - .and_then(|res| async move { - log::debug!("Camera finished streaming: {res:?}"); - Ok(()) - }), - ); - - Ok(media_rx) - } } // A task that is run on a camera when the structure is dropped diff --git a/src/common/neocam.rs b/src/common/neocam.rs index 55d18bee..8633eedd 100644 --- a/src/common/neocam.rs +++ b/src/common/neocam.rs @@ -81,7 +81,6 @@ impl NeoCam { // other threads let sender_cancel = me.cancel.clone(); let mut commander_rx = ReceiverStream::new(commander_rx); - #[cfg(feature = "gstreamer")] let thread_commander_tx = commander_tx.clone(); let thread_watch_config_rx = watch_config_rx.clone(); #[cfg(feature = "pushnoti")] From fb9df1d58671d188ff57c79996202f4a76a70a3f Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Sat, 26 Oct 2024 15:14:15 +0700 Subject: [PATCH 09/12] Use explicit rust version for proper control in scripts --- .github/workflows/build.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f9364755..686a26a3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -178,6 +178,7 @@ jobs: echo "${HOME}/.cargo/bin" >> "${GITHUB_PATH}" - name: Install ${{ matrix.arch }} Rust toolchain run: | + rustup default 1.84.0 rustup target add ${TARGET} env: TARGET: ${{ matrix.target }} @@ -282,8 +283,9 @@ jobs: echo "TAGS=${tagstr}" >> "${GITHUB_OUTPUT}" env: REPO_NAME: ${{ steps.docker_repo.outputs.DOCKER_NWO }} - - name: Install latest rust + - name: Install rust run: | + rustup default 1.84.0 rustup toolchain install stable - name: Install toml-cli run: | @@ -341,8 +343,9 @@ jobs: steps: - name: Checkout code uses: actions/checkout@v4 - - name: Install latest rust + - name: Install rust run: | + rustup default 1.84.0 rustup toolchain install stable - name: Install toml-cli run: | From 1499503789ef5175ab7e2c6a5cfbfa454d5976c4 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Sat, 26 Oct 2024 15:16:07 +0700 Subject: [PATCH 10/12] Single point of control on rust version --- .github/workflows/build.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 686a26a3..ddeb8303 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,6 +5,9 @@ on: pull_request: workflow_dispatch: +env: + rust_version: "1.84.0" + jobs: pre_job: # continue-on-error: true # Uncomment once integration is finished @@ -178,7 +181,7 @@ jobs: echo "${HOME}/.cargo/bin" >> "${GITHUB_PATH}" - name: Install ${{ matrix.arch }} Rust toolchain run: | - rustup default 1.84.0 + rustup default "${{ env.rust_version }}" rustup target add ${TARGET} env: TARGET: ${{ matrix.target }} @@ -285,7 +288,7 @@ jobs: REPO_NAME: ${{ steps.docker_repo.outputs.DOCKER_NWO }} - name: Install rust run: | - rustup default 1.84.0 + rustup default "${{ env.rust_version }}" rustup toolchain install stable - name: Install toml-cli run: | @@ -345,7 +348,7 @@ jobs: uses: actions/checkout@v4 - name: Install rust run: | - rustup default 1.84.0 + rustup default "${{ env.rust_version }}" rustup toolchain install stable - name: Install toml-cli run: | From e9e66b1a440f296b5a63ea2fbe2db874ae05f395 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Sat, 26 Oct 2024 16:41:58 +0700 Subject: [PATCH 11/12] Rustc 1.82.0 --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ddeb8303..e20f6b71 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -6,7 +6,7 @@ on: workflow_dispatch: env: - rust_version: "1.84.0" + rust_version: "1.82.0" jobs: pre_job: From 18c4d568262677952a0d42067f5a9f02fd62ee28 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Sat, 26 Oct 2024 16:53:28 +0700 Subject: [PATCH 12/12] Don't forget the two includes --- src/common/instance/gst.rs | 207 ++++++++++++++++++++++++++++++++ src/common/instance/pushnoti.rs | 48 ++++++++ 2 files changed, 255 insertions(+) create mode 100644 src/common/instance/gst.rs create mode 100644 src/common/instance/pushnoti.rs diff --git a/src/common/instance/gst.rs b/src/common/instance/gst.rs new file mode 100644 index 00000000..7dccd3bf --- /dev/null +++ b/src/common/instance/gst.rs @@ -0,0 +1,207 @@ +use super::*; + +use crate::common::UseCounter; +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use neolink_core::{bc_protocol::StreamKind, bcmedia::model::BcMedia}; +use tokio::sync::mpsc::Receiver as MpscReceiver; + +#[cfg(feature = "pushnoti")] +use crate::common::PushNoti; + +impl NeoInstance { + /// Streams a camera source while not paused + pub(crate) async fn stream_while_live( + &self, + stream: StreamKind, + ) -> AnyResult> { + let config = self.config().await?.borrow().clone(); + let name = config.name.clone(); + + let media_rx = if config.pause.on_motion { + let (media_tx, media_rx) = tokio::sync::mpsc::channel(100); + let counter = UseCounter::new().await; + + let mut md = self.motion().await?; + let mut tasks = FuturesUnordered::new(); + // Stream for 5s on a new client always + // This lets us negotiate the camera stream type + let init_permit = counter.create_activated().await?; + tokio::spawn( + async { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + drop(init_permit); + } + .map(|e| { + log::debug!("Init permit thread stopped {e:?}"); + e + }), + ); + + // Create the permit for controlling the motion + let mut md_permit = { + let md_state = md.borrow_and_update().clone(); + match md_state { + MdState::Start(_) => { + log::info!("{name}::{stream:?}: Starting with Motion"); + counter.create_activated().await? + } + MdState::Stop(_) | MdState::Unknown => { + log::info!("{name}::{stream:?}: Waiting with Motion"); + counter.create_deactivated().await? + } + } + }; + // Now listen to the motion + let thread_name = name.clone(); + tasks.push(tokio::spawn( + async move { + loop { + match md.changed().await { + Ok(_) => { + let md_state: MdState = md.borrow_and_update().clone(); + match md_state { + MdState::Start(_) => { + log::info!("{thread_name}::{stream:?}: Motion Started"); + md_permit.activate().await?; + } + MdState::Stop(_) => { + log::info!("{thread_name}::{stream:?}: Motion Stopped"); + md_permit.deactivate().await?; + } + MdState::Unknown => {} + } + } + Err(e) => { + // Use break here so we can define the full type on the async closure + break AnyResult::Err(e.into()); + } + } + }?; + AnyResult::Ok(()) + } + .map(|e| { + log::debug!("Motion thread stopped {e:?}"); + e + }), + )); + + #[cfg(feature = "pushnoti")] + { + // Creates a permit for controlling based on the PN + let pn_permit = counter.create_deactivated().await?; + let mut pn = self.push_notifications().await?; + pn.borrow_and_update(); // Ignore any PNs that have already been sent before this + let thread_name = name.clone(); + tasks.push(tokio::spawn( + async move { + loop { + let noti: Option = pn.borrow_and_update().clone(); + if let Some(noti) = noti { + if noti.message.contains("Motion Alert from") { + log::info!( + "{thread_name}::{stream:?}: Push Notification Recieved" + ); + let mut new_pn_permit = pn_permit.subscribe(); + new_pn_permit.activate().await?; + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_secs(30)) + .await; + drop(new_pn_permit); + }); + } + } + if let Err(e) = pn.changed().await { + break Err(e); + } + }?; + AnyResult::Ok(()) + } + .map(|e| { + log::debug!("PN thread stopped {e:?}"); + e + }), + )); + } + + // Send the camera when the pemit is active + let camera_permit = counter.create_deactivated().await?; + let thread_camera = self.clone(); + tokio::spawn( + async move { + loop { + if let Err(e) = camera_permit.aquired_users().await { + break AnyResult::Err(e); + } + log::debug!("Starting stream"); + tokio::select! { + v = camera_permit.dropped_users() => { + log::debug!("Dropped users: {v:?}"); + v + }, + v = async { + log::debug!("Getting stream"); + let mut stream = thread_camera.stream(stream).await?; + log::debug!("Got stream"); + while let Some(media) = stream.recv().await { + media_tx.send(media).await?; + } + AnyResult::Ok(()) + } => { + log::debug!("Stopped stream: {v:?}"); + v + }, + v = tasks.next() => { + log::debug!("Task failed: {v:?}"); + Err(anyhow!("Task ended prematurly: {v:?}")) + } + }?; + log::debug!("Pausing stream"); + }?; + drop(counter); // Make sure counter is owned by this thread + AnyResult::Ok(()) + } + .map(|e| { + log::debug!("Stream thread stopped {e:?}"); + e + }), + ); + + Ok(media_rx) + } else { + self.stream(stream).await + }?; + + Ok(media_rx) + } + + /// Streams a camera source + pub(crate) async fn stream(&self, stream: StreamKind) -> AnyResult> { + let (media_tx, media_rx) = tokio::sync::mpsc::channel(100); + let config = self.config().await?.borrow().clone(); + let strict = config.strict; + let thread_camera = self.clone(); + tokio::task::spawn( + tokio::task::spawn(async move { + thread_camera + .run_task(move |cam| { + let media_tx = media_tx.clone(); + Box::pin(async move { + let mut media_stream = cam.start_video(stream, 0, strict).await?; + log::trace!("Camera started"); + while let Ok(media) = media_stream.get_data().await? { + media_tx.send(media).await?; + } + AnyResult::Ok(()) + }) + }) + .await + }) + .and_then(|res| async move { + log::debug!("Camera finished streaming: {res:?}"); + Ok(()) + }), + ); + + Ok(media_rx) + } +} diff --git a/src/common/instance/pushnoti.rs b/src/common/instance/pushnoti.rs new file mode 100644 index 00000000..89722fcd --- /dev/null +++ b/src/common/instance/pushnoti.rs @@ -0,0 +1,48 @@ +use super::*; + +use crate::common::PushNoti; +use tokio::sync::watch::channel as watch; + +impl NeoInstance { + pub(crate) async fn uid(&self) -> Result { + let (reply_tx, reply_rx) = oneshot(); + self.camera_control + .send(NeoCamCommand::GetUid(reply_tx)) + .await?; + Ok(reply_rx.await?) + } + + pub(crate) async fn push_notifications(&self) -> Result>> { + let uid = self.uid().await?; + let (instance_tx, instance_rx) = oneshot(); + self.camera_control + .send(NeoCamCommand::PushNoti(instance_tx)) + .await?; + let mut source_watch = instance_rx.await?; + + let (fwatch_tx, fwatch_rx) = watch(None); + tokio::task::spawn(async move { + loop { + match source_watch + .wait_for(|i| { + fwatch_tx.borrow().as_ref() != i.as_ref() + && i.as_ref() + .is_some_and(|i| i.message.contains(&format!("\"{uid}\""))) + }) + .await + { + Ok(pn) => { + log::trace!("Forwarding push notification about {}", uid); + let _ = fwatch_tx.send_replace(pn.clone()); + } + Err(e) => { + break Err(e); + } + } + }?; + AnyResult::Ok(()) + }); + + Ok(fwatch_rx) + } +}