From 72ca5e5f33d9ef1c688cff5bea7f7900acb18d7c Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Fri, 24 Jan 2025 16:35:42 +0700 Subject: [PATCH] Add visitor to mqtt --- Cargo.lock | 25 +- README.md | 22 ++ crates/core/src/bc/de.rs | 6 +- crates/core/src/bc/model.rs | 9 +- crates/core/src/bc/xml.rs | 17 +- crates/core/src/bc_protocol.rs | 2 +- .../core/src/bc_protocol/connection/bcsub.rs | 4 +- crates/core/src/bc_protocol/motion.rs | 236 ++++++++++++------ crates/core/src/bc_protocol/resolution.rs | 2 +- crates/core/src/bcudp/codex.rs | 4 +- crates/mailnoti/src/main.rs | 14 +- src/common/instance.rs | 21 +- src/common/mdthread.rs | 84 ++++++- src/common/neocam.rs | 22 +- src/config.rs | 6 + src/mqtt/mod.rs | 62 ++++- src/mqtt/mqttc.rs | 4 +- 17 files changed, 400 insertions(+), 140 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 567f61f1..c0e32a35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -839,25 +839,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "fcm-push-listener" -version = "3.0.0" -dependencies = [ - "base64 0.21.7", - "ece", - "log", - "prost", - "prost-build", - "rand", - "reqwest", - "serde", - "serde_with", - "tokio", - "tokio-rustls 0.23.4", - "uuid", - "webpki-roots", -] - [[package]] name = "fixedbitset" version = "0.4.2" @@ -1895,7 +1876,7 @@ dependencies = [ "crossbeam-channel", "dirs", "env_logger 0.11.3", - "fcm-push-listener 2.0.3", + "fcm-push-listener", "futures", "gstreamer", "gstreamer-app", @@ -2339,7 +2320,7 @@ dependencies = [ "anyhow", "clap", "env_logger 0.10.2", - "fcm-push-listener 2.0.3", + "fcm-push-listener", "lazy_static", "log", "neolink_core", diff --git a/README.md b/README.md index 316f653c..a2035845 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,20 @@ Status Messages: pir status - `/status/motion` Contains the motion detection alarm status. `on` for motion and `off` for still, only published when `enable_moton` is true in the config +- `/status/visitor` Contains the visitor detection status, which occurs when a + doorbell is pressed. +- `/status/ai` Contains the ai detection status, which occurs when a + the camera's internal ai detects something. The value is `person/car/other` +- `/status/ai/person` Contains the ai detection status of the last person, + which occurs when a the camera's internal ai detects a person. +- `/status/ai/car` Contains the ai detection status of the last car, which + occurs when a the camera's internal ai detects a car. +- `/status/ai/other` Contains the ai detection status from the camera, which + occurs when a the camera's internal ai detects something. + + (This is a placeholder until I can fiquire out everything that the camera + can detect. If you have list for me please open an issue with it, I don't + have a cam with AI to check myself) - `/status/ptz/preset` Sent in reply to a `/query/ptz/preset` an XML encoded version of the PTZ presets - `/status/preview` a base64 encoded camera image updated every 2s. Not @@ -325,6 +339,14 @@ enable_motion = false # motion detection # (limited battery drain since it # is a passive listening connection) # +enable_visitor = false # visitor press for doorbell camera + # (limited battery drain since it + # is a passive listening connection) + # +enable_ai = false # report detected ai by the camera + # (limited battery drain since it + # is a passive listening connection) + # enable_light = false # flood lights only available on some camera # (limited battery drain since it # is a passive listening connection) diff --git a/crates/core/src/bc/de.rs b/crates/core/src/bc/de.rs index d8da749c..182bcc7c 100644 --- a/crates/core/src/bc/de.rs +++ b/crates/core/src/bc/de.rs @@ -90,10 +90,8 @@ fn bc_modern_msg<'a>( E::add_context(input, ctx, E::from_error_kind(input, kind)) } - let ext_len = match header.payload_offset { - Some(off) => off, - _ => 0, // If missing payload_offset treat all as payload - }; + // If missing payload_offset treat all as payload + let ext_len = header.payload_offset.unwrap_or_default(); let (buf, ext_buf) = take(ext_len)(buf)?; let payload_len = header.body_len - ext_len; diff --git a/crates/core/src/bc/model.rs b/crates/core/src/bc/model.rs index 2e8dd052..d13d5eee 100644 --- a/crates/core/src/bc/model.rs +++ b/crates/core/src/bc/model.rs @@ -194,6 +194,7 @@ pub struct BcMeta { /// When sending a command it is set to `0`. The reply from the camera can be /// - `200` for OK /// - `400` for bad request + /// /// A malformed packet will return a `400` code pub response_code: u16, /// A message ID is used to match replies with requests. The camera will parrot back @@ -213,14 +214,6 @@ pub struct BcMeta { pub class: u16, } -/// The components of the Baichuan header that must be filled out after the body is serialized, or -/// is needed for the deserialization of the body (strictly part of the wire format of the message) -#[derive(Debug, PartialEq, Eq)] -pub(super) struct BcSendInfo { - pub body_len: u32, - pub payload_offset: Option, -} - #[derive(Debug)] pub(crate) struct BcContext { pub(crate) credentials: Credentials, diff --git a/crates/core/src/bc/xml.rs b/crates/core/src/bc/xml.rs index c046d24f..aaea8f7d 100644 --- a/crates/core/src/bc/xml.rs +++ b/crates/core/src/bc/xml.rs @@ -633,11 +633,15 @@ pub struct AlarmEvent { /// The channel the event occured on. Usually zero unless from an NVR #[serde(rename = "channelId")] pub channel_id: u8, - /// Motion status. Known values are `"MD"` or `"none"` + /// Motion status. Known values are `"MD"` or `"none"` or `"visitor"` pub status: String, /// AI status. Known values are `"people"` or `"none"` - #[serde(rename = "AItype", skip_serializing_if = "Option::is_none")] - pub ai_type: Option, + #[serde( + rename = "AItype", + default = "ai_type_default", + skip_serializing_if = "ai_type_skip" + )] + pub ai_type: String, /// The recording status. Known values `0` or `1` pub recording: i32, /// The timestamp associated with the recording. `0` if not recording @@ -645,6 +649,13 @@ pub struct AlarmEvent { pub timeStamp: i32, } +fn ai_type_default() -> String { + "none".to_string() +} +fn ai_type_skip(value: &String) -> bool { + value == "none" +} + /// The Ptz messages used to move the camera #[derive(PartialEq, Default, Debug, Deserialize, Serialize)] pub struct PtzControl { diff --git a/crates/core/src/bc_protocol.rs b/crates/core/src/bc_protocol.rs index cf678094..a544bf4a 100644 --- a/crates/core/src/bc_protocol.rs +++ b/crates/core/src/bc_protocol.rs @@ -47,7 +47,7 @@ pub use credentials::*; pub use errors::Error; pub use ledstate::LightState; pub use login::MaxEncryption; -pub use motion::{MotionData, MotionStatus}; +pub use motion::{AiKind, MotionData, MotionEvents}; pub use pirstate::PirState; pub use ptz::Direction; pub use pushinfo::PhoneType; diff --git a/crates/core/src/bc_protocol/connection/bcsub.rs b/crates/core/src/bc_protocol/connection/bcsub.rs index 89335352..6f8d21c3 100644 --- a/crates/core/src/bc_protocol/connection/bcsub.rs +++ b/crates/core/src/bc_protocol/connection/bcsub.rs @@ -20,9 +20,9 @@ pub struct BcStream<'a> { rx: &'a mut ReceiverStream>, } -impl<'a> Unpin for BcStream<'a> {} +impl Unpin for BcStream<'_> {} -impl<'a> Stream for BcStream<'a> { +impl Stream for BcStream<'_> { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { diff --git a/crates/core/src/bc_protocol/motion.rs b/crates/core/src/bc_protocol/motion.rs index d910ee49..f103976b 100644 --- a/crates/core/src/bc_protocol/motion.rs +++ b/crates/core/src/bc_protocol/motion.rs @@ -5,25 +5,46 @@ use tokio::sync::mpsc::{channel, error::TryRecvError, Receiver}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -/// Motion Status that the callback can send +/// Motion Events that the callback can send #[derive(Clone, Copy, Debug)] -pub enum MotionStatus { +pub enum MotionEvents { /// Sent when motion is first detected Start(Instant), /// Sent when motion stops Stop(Instant), + /// Sent when a doorbell is pressed + Vistor(Instant), + /// Sent when an AI is recieved + Ai(Instant, AiKind), /// Sent when an Alarm about something other than motion was received NoChange(Instant), } +#[derive(Clone, Copy, Debug)] +/// The kinds of objects the AI can detect +pub enum AiKind { + /// A person + Person, + /// A car + Car, + /// Something else, open a PR to fix with actual thing + Other, +} + /// A handle on current motion related events comming from the camera /// /// When this object is dropped the motion events are stopped pub struct MotionData { handle: JoinSet>, cancel: CancellationToken, - rx: Receiver>, - last_update: MotionStatus, + rx: Receiver>, + history: MotionHistory, +} + +pub struct MotionHistory { + motion: MotionEvents, + visitor: MotionEvents, + ai: MotionEvents, } impl MotionData { @@ -33,10 +54,10 @@ impl MotionData { /// An error is raised if the motion connection to the camera is dropped pub fn motion_detected(&mut self) -> Result> { self.consume_motion_events()?; - Ok(match &self.last_update { - MotionStatus::Start(_) => Some(true), - MotionStatus::Stop(_) => Some(false), - MotionStatus::NoChange(_) => None, + Ok(match &self.history.motion { + MotionEvents::Start(_) => Some(true), + MotionEvents::Stop(_) => Some(false), + _ => None, }) } @@ -46,44 +67,64 @@ impl MotionData { /// An error is raised if the motion connection to the camera is dropped pub fn motion_detected_within(&mut self, duration: Duration) -> Result> { self.consume_motion_events()?; - Ok(match &self.last_update { - MotionStatus::Start(_) => Some(true), - MotionStatus::Stop(time) => Some((Instant::now() - *time) < duration), - MotionStatus::NoChange(_) => None, + Ok(match &self.history.motion { + MotionEvents::Start(_) => Some(true), + MotionEvents::Stop(time) => Some((Instant::now() - *time) < duration), + _ => None, }) } - /// Consume the motion events diretly + /// Consume all the motion events on the queue and update the history /// /// An error is raised if the motion connection to the camera is dropped - pub fn consume_motion_events(&mut self) -> Result> { - let mut results: Vec = vec![]; - loop { - match self.rx.try_recv() { - Ok(motion) => results.push(motion?), - Err(TryRecvError::Empty) => break, - Err(e) => return Err(Error::from(e)), + pub fn consume_motion_events(&mut self) -> Result<()> { + while self.try_recv()?.is_some() { + // Tight loop.... + // Should not be many of such messages in the queue though + } + Ok(()) + } + + fn update_history(&mut self, latest_event: &MotionEvents) { + match latest_event { + MotionEvents::Start(_) => { + self.history.motion = *latest_event; } + MotionEvents::Stop(_) => { + self.history.motion = *latest_event; + } + MotionEvents::Vistor(_) => { + self.history.visitor = *latest_event; + } + MotionEvents::Ai(_, _) => { + self.history.ai = *latest_event; + } + MotionEvents::NoChange(_) => {} } - if let Some(last) = results.last() { - self.last_update = *last; + } + + /// Try to recv a new motion event on the pipeline + pub fn try_recv(&mut self) -> Result> { + match self.rx.try_recv() { + Ok(motion) => { + let motion = motion?; + self.update_history(&motion); + Ok(Some(motion)) + } + Err(TryRecvError::Empty) => Ok(None), + Err(e) => Err(Error::from(e)), } - Ok(results) } /// Await a new motion event - /// - /// - pub async fn next_motion(&mut self) -> Result { - let motions = self.consume_motion_events()?; - if let Some(last) = motions.last() { - Ok(*last) - } else if let Some(moition) = self.rx.recv().await { - let moition = moition?; - self.last_update = moition; - Ok(moition) - } else { - Err(Error::Other("Motion dropped")) + pub async fn recv(&mut self) -> Result> { + match self.rx.recv().await { + Some(motion) => { + let motion = motion?; + self.update_history(&motion); + Ok(Some(motion)) + } + None => Ok(None), } } @@ -91,10 +132,10 @@ impl MotionData { /// /// It must be stopped for at least the given duration pub async fn await_stop(&mut self, duration: Duration) -> Result<()> { - let motions = self.consume_motion_events()?; - let mut last_motion = motions.last().copied(); + self.consume_motion_events()?; + let mut last_motion = self.history.motion; loop { - if let Some(MotionStatus::Stop(time)) = last_motion { + if let MotionEvents::Stop(time) = last_motion { // In stop state if duration.is_zero() || (Instant::now() - time) > duration { return Ok(()); @@ -105,22 +146,37 @@ impl MotionData { _ = tokio::time::sleep(remaining_sleep) => {None}, v = async { loop { - match self.next_motion().await { - n @ Ok(MotionStatus::Start(_)) => {return n;}, + match self.recv().await { + n @ Ok(Some(MotionEvents::Start(_))) => {return n;}, + n @ Ok(None) => {return n;} // End of data n @ Err(_) => {return n;}, _ => {continue;} } } } => {Some(v)} }; - if let Some(v) = result { - v?; - } else { - return Ok(()); + match result { + Some(Ok(None)) => { + // Camera has dropped motion + return Ok(()); + } + Some(Ok(Some(_))) => { + // conitnue to next data + } + Some(Err(e)) => return Err(e), + None => { + // Timoout occured motion stopped + return Ok(()); + } } } } - last_motion = Some(self.next_motion().await?); + if let Some(next_motion) = self.recv().await? { + last_motion = next_motion; + } else { + // Camera had dropped motion + return Ok(()); + } } } @@ -128,10 +184,10 @@ impl MotionData { /// /// The motion must have a minimum duration as given pub async fn await_start(&mut self, duration: Duration) -> Result<()> { - let motions = self.consume_motion_events()?; - let mut last_motion = motions.last().copied(); + self.consume_motion_events()?; + let mut last_motion = self.history.motion; loop { - if let Some(MotionStatus::Start(time)) = last_motion { + if let MotionEvents::Start(time) = last_motion { // In start state if duration.is_zero() || (Instant::now() - time) > duration { return Ok(()); @@ -141,22 +197,37 @@ impl MotionData { _ = tokio::time::sleep(duration - (Instant::now() - time)) => {None}, v = async { loop { - match self.next_motion().await { - n @ Ok(MotionStatus::Stop(_)) => {return n;}, + match self.recv().await { + n @ Ok(Some(MotionEvents::Stop(_))) => {return n;}, + n @ Ok(None) => {return n;} // End of data n @ Err(_) => {return n;}, _ => {continue;} } } } => {Some(v)} }; - if let Some(v) = result { - v?; - } else { - return Ok(()); + match result { + Some(Ok(None)) => { + // Camera has dropped motion + return Ok(()); + } + Some(Ok(Some(_))) => { + // conitnue to next data + } + Some(Err(e)) => return Err(e), + None => { + // Timeout occured motion started + return Ok(()); + } } } } - last_motion = Some(self.next_motion().await?); + if let Some(next_motion) = self.recv().await? { + last_motion = next_motion; + } else { + // Camera had dropped motion + return Ok(()); + } } } } @@ -225,7 +296,7 @@ impl BcCamera { loop { tokio::task::yield_now().await; let msg = sub.recv().await; - let status = match msg { + let status_list = match msg { Ok(motion_msg) => { if let BcBody::ModernMsg(ModernMsg { payload: @@ -236,36 +307,51 @@ impl BcCamera { .. }) = motion_msg.body { - let mut result = MotionStatus::NoChange(Instant::now()); + let mut result = vec![]; for alarm_event in &alarm_event_list.alarm_events { if alarm_event.channel_id == channel_id { - if alarm_event.status != "none" - || alarm_event - .ai_type - .as_ref() - .map(|ai_type| ai_type != "none") - .unwrap_or(false) - { - result = MotionStatus::Start(Instant::now()); - break; + if alarm_event.status == "visitor" { + result.push(MotionEvents::Vistor(Instant::now())); + } else if alarm_event.status == "MD" { + result.push(MotionEvents::Start(Instant::now())); + } else if alarm_event.ai_type != "none" { + result.push(MotionEvents::Ai(Instant::now(), + match alarm_event.ai_type.as_ref() { + "person" | "people" | "human" => AiKind::Person, + "car" | "vehicle" => AiKind::Car, + _ => AiKind::Other, + } + )); } else { - result = MotionStatus::Stop(Instant::now()); - break; + result.push(MotionEvents::Stop(Instant::now())); } } } Ok(result) } else { - Ok(MotionStatus::NoChange(Instant::now())) + Ok(vec![]) } } // On connection drop we stop Err(e) => Err(e), }; - if tx.send(status).await.is_err() { - // Motion reciever has been dropped - break; + match status_list { + Ok(mut status_list) => { + for status in status_list.drain(..) { + if tx.send(Ok(status)).await.is_err() { + // Motion reciever has been dropped + break; + } + } + }, + Err(e) => { + // Err from camera + if tx.send(Err(e)).await.is_err() { + // Motion reciever has been dropped + } + break; + } } } Ok(()) @@ -277,7 +363,11 @@ impl BcCamera { handle: set, cancel, rx, - last_update: MotionStatus::NoChange(Instant::now()), + history: MotionHistory { + motion: MotionEvents::NoChange(Instant::now()), + visitor: MotionEvents::NoChange(Instant::now()), + ai: MotionEvents::NoChange(Instant::now()), + }, }) } } diff --git a/crates/core/src/bc_protocol/resolution.rs b/crates/core/src/bc_protocol/resolution.rs index 17e99b2b..1fb308bc 100644 --- a/crates/core/src/bc_protocol/resolution.rs +++ b/crates/core/src/bc_protocol/resolution.rs @@ -214,7 +214,7 @@ impl ToSocketAddrsOrUid for SocketAddrV6 { } } -impl<'a> ToSocketAddrsOrUid for &'a [SocketAddr] { +impl ToSocketAddrsOrUid for &'_ [SocketAddr] { type UidIter = std::vec::IntoIter; fn to_socket_addrs_or_uid(&self) -> Result { diff --git a/crates/core/src/bcudp/codex.rs b/crates/core/src/bcudp/codex.rs index fb54e6ec..d0648a48 100644 --- a/crates/core/src/bcudp/codex.rs +++ b/crates/core/src/bcudp/codex.rs @@ -34,8 +34,8 @@ impl Decoder for BcUdpCodex { type Item = BcUdp; type Error = Error; - /// Since frames can cross EOF boundaries we overload this so it doesn't error if - /// there are bytes left on the stream + // /// Since frames can cross EOF boundaries we overload this so it doesn't error if + // /// there are bytes left on the stream // fn decode_eof(&mut self, buf: &mut BytesMut) -> Result> { // match self.decode(buf)? { // Some(frame) => Ok(Some(frame)), diff --git a/crates/mailnoti/src/main.rs b/crates/mailnoti/src/main.rs index 4ffc54e2..84877861 100644 --- a/crates/mailnoti/src/main.rs +++ b/crates/mailnoti/src/main.rs @@ -53,13 +53,13 @@ async fn main() -> Result<()> { Ok(()) } -fn get_local_ip() -> Result { - get_if_addrs::get_if_addrs()? - .iter() - .find(|i| !i.is_loopback() && matches!(i.addr, get_if_addrs::IfAddr::V4(_))) - .map(|iface| Ok(iface.ip())) - .unwrap_or_else(|| Err(anyhow!("No Local Ip Address Found"))) -} +// fn get_local_ip() -> Result { +// get_if_addrs::get_if_addrs()? +// .iter() +// .find(|i| !i.is_loopback() && matches!(i.addr, get_if_addrs::IfAddr::V4(_))) +// .map(|iface| Ok(iface.ip())) +// .unwrap_or_else(|| Err(anyhow!("No Local Ip Address Found"))) +// } async fn cam_tasks(name: &str, camera: BcCamera, addr: SocketAddr) -> Result<()> { let support = camera.get_support().await?; diff --git a/src/common/instance.rs b/src/common/instance.rs index f3ecfc6c..3273e7e1 100644 --- a/src/common/instance.rs +++ b/src/common/instance.rs @@ -20,13 +20,12 @@ use tokio_util::sync::CancellationToken; #[cfg(feature = "pushnoti")] use super::PushNoti; -use super::{MdState, NeoCamCommand, NeoCamThreadState, Permit, UseCounter}; +use super::{AiState, MdState, NeoCamCommand, NeoCamThreadState, Permit, UseCounter, VisitorState}; use crate::{config::CameraConfig, AnyResult, Result}; use neolink_core::{ bc_protocol::{BcCamera, StreamKind}, bcmedia::model::BcMedia, }; -#[cfg(feature = "gstreamer")] /// This instance is the primary interface used throughout the app /// @@ -285,6 +284,22 @@ impl NeoInstance { Ok(instance_rx.await?) } + pub(crate) async fn visitor(&self) -> Result> { + let (instance_tx, instance_rx) = oneshot(); + self.camera_control + .send(NeoCamCommand::Visitor(instance_tx)) + .await?; + Ok(instance_rx.await?) + } + + pub(crate) async fn ai(&self) -> Result> { + let (instance_tx, instance_rx) = oneshot(); + self.camera_control + .send(NeoCamCommand::Ai(instance_tx)) + .await?; + Ok(instance_rx.await?) + } + pub(crate) async fn config(&self) -> Result> { let (instance_tx, instance_rx) = oneshot(); self.camera_control @@ -423,7 +438,7 @@ impl NeoInstance { }), )); - #[cfg(pushnoti)] + #[cfg(feature = "pushnoti")] { // Creates a permit for controlling based on the PN let pn_permit = counter.create_deactivated().await?; diff --git a/src/common/mdthread.rs b/src/common/mdthread.rs index 03748b12..34528e33 100644 --- a/src/common/mdthread.rs +++ b/src/common/mdthread.rs @@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken; use super::NeoInstance; use crate::{AnyResult, Result}; -use neolink_core::bc_protocol::MotionStatus; +use neolink_core::bc_protocol::{AiKind, MotionEvents}; #[derive(Clone, Debug)] #[allow(dead_code)] @@ -25,8 +25,26 @@ pub(crate) enum MdState { Unknown, } +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub(crate) enum VisitorState { + Visted(Instant), + Unknown, +} + +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub(crate) enum AiState { + Person(Instant), + Car(Instant), + Other(Instant), + Unknown, +} + pub(crate) struct NeoCamMdThread { md_watcher: Arc>, + visitor_watcher: Arc>, + ai_watcher: Arc>, md_request_rx: MpscReceiver, cancel: CancellationToken, instance: NeoInstance, @@ -39,8 +57,17 @@ impl NeoCamMdThread { ) -> Result { let (md_watcher, _) = watch(MdState::Unknown); let md_watcher = Arc::new(md_watcher); + + let (visitor_watcher, _) = watch(VisitorState::Unknown); + let visitor_watcher = Arc::new(visitor_watcher); + + let (ai_watcher, _) = watch(AiState::Unknown); + let ai_watcher = Arc::new(ai_watcher); + Ok(Self { md_watcher, + visitor_watcher, + ai_watcher, md_request_rx, cancel: CancellationToken::new(), instance, @@ -50,6 +77,8 @@ impl NeoCamMdThread { pub(crate) async fn run(&mut self) -> Result<()> { let thread_cancel = self.cancel.clone(); let watcher = self.md_watcher.clone(); + let visitor_watcher = self.visitor_watcher.clone(); + let ai_watcher = self.ai_watcher.clone(); let md_instance = self.instance.clone(); tokio::select! { _ = thread_cancel.cancelled() => { @@ -58,11 +87,21 @@ impl NeoCamMdThread { v = async { while let Some(request) = self.md_request_rx.recv().await { match request { - MdRequest::Get { + MdRequest::Md { sender } => { let _ = sender.send(self.md_watcher.subscribe()); }, + MdRequest::Visitor { + sender + } => { + let _ = sender.send(self.visitor_watcher.subscribe()); + }, + MdRequest::Ai { + sender + } => { + let _ = sender.send(self.ai_watcher.subscribe()); + }, } } Ok(()) @@ -71,23 +110,46 @@ impl NeoCamMdThread { loop { let r: AnyResult<()> = md_instance.run_passive_task(|cam| { let watcher = watcher.clone(); + let visitor_watcher = visitor_watcher.clone(); + let ai_watcher = ai_watcher.clone(); Box::pin( async move { let mut md = cam.listen_on_motion().await.with_context(|| "Error in getting MD listen_on_motion")?; loop { - let event = md.next_motion().await.with_context(|| "Error in getting MD next_motion")?; + let event = md.recv().await.with_context(|| "Error in getting MD next_motion")?.ok_or(anyhow::anyhow!("MD connection was dropped"))?; match event { - MotionStatus::Start(at) => { + MotionEvents::Start(at) => { watcher.send_replace( MdState::Start(at.into()) ); } - MotionStatus::Stop(at) => { + MotionEvents::Stop(at) => { watcher.send_replace( MdState::Stop(at.into()) ); - } - MotionStatus::NoChange(_) => {}, + }, + MotionEvents::Vistor(at) => { + visitor_watcher.send_replace( + VisitorState::Visted(at.into()) + ); + }, + MotionEvents::Ai(at, kind) => { + ai_watcher.send_replace( + match kind { + AiKind::Person => { + AiState::Person(at.into()) + } + AiKind::Car => { + AiState::Car(at.into()) + } + AiKind::Other => { + AiState::Other(at.into()) + } + } + + ); + }, + _ => {}, } } } @@ -110,7 +172,13 @@ impl Drop for NeoCamMdThread { /// Used to pass messages to the MdThread pub(crate) enum MdRequest { - Get { + Md { sender: OneshotSender>, }, + Visitor { + sender: OneshotSender>, + }, + Ai { + sender: OneshotSender>, + }, } diff --git a/src/common/neocam.rs b/src/common/neocam.rs index 22a22db7..838accfb 100644 --- a/src/common/neocam.rs +++ b/src/common/neocam.rs @@ -20,8 +20,8 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use super::{ - MdRequest, MdState, NeoCamMdThread, NeoCamThread, NeoCamThreadState, NeoInstance, Permit, - UseCounter, + AiState, MdRequest, MdState, NeoCamMdThread, NeoCamThread, NeoCamThreadState, NeoInstance, + Permit, UseCounter, VisitorState, }; #[cfg(feature = "pushnoti")] use super::{PnRequest, PushNoti}; @@ -33,6 +33,8 @@ pub(crate) enum NeoCamCommand { HangUp, Instance(OneshotSender>), Motion(OneshotSender>), + Visitor(OneshotSender>), + Ai(OneshotSender>), Config(OneshotSender>), Disconnect(OneshotSender<()>), Connect(OneshotSender<()>), @@ -110,7 +112,21 @@ impl NeoCam { } NeoCamCommand::Motion(sender) => { md_request_tx.send( - MdRequest::Get { + MdRequest::Md { + sender, + } + ).await?; + }, + NeoCamCommand::Visitor(sender) => { + md_request_tx.send( + MdRequest::Visitor { + sender, + } + ).await?; + }, + NeoCamCommand::Ai(sender) => { + md_request_tx.send( + MdRequest::Ai { sender, } ).await?; diff --git a/src/config.rs b/src/config.rs index 6ca22ad2..c0f6a491 100644 --- a/src/config.rs +++ b/src/config.rs @@ -235,6 +235,10 @@ pub(crate) struct MqttConfig { #[serde(default = "default_true")] pub(crate) enable_motion: bool, #[serde(default = "default_true")] + pub(crate) enable_visitor: bool, + #[serde(default = "default_true")] + pub(crate) enable_ai: bool, + #[serde(default = "default_true")] pub(crate) enable_light: bool, #[serde(default = "default_true")] pub(crate) enable_battery: bool, @@ -303,6 +307,8 @@ const fn default_false() -> bool { fn default_mqtt() -> MqttConfig { MqttConfig { enable_motion: true, + enable_visitor: true, + enable_ai: true, enable_light: true, enable_battery: true, battery_update: 2000, diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 36bf009e..a0045aed 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -77,7 +77,7 @@ mod discovery; mod mqttc; use crate::{ - common::{MdState, NeoInstance, NeoReactor}, + common::{AiState, MdState, NeoInstance, NeoReactor, VisitorState}, config::Config, AnyResult, }; @@ -331,6 +331,12 @@ async fn listen_on_camera(camera: NeoInstance, mqtt_instance: MqttInstance) -> R let camera_motion = camera.clone(); let mqtt_motion = mqtt_instance.resubscribe().await?; + let camera_visitor = camera.clone(); + let mqtt_visitor = mqtt_instance.resubscribe().await?; + + let camera_ai = camera.clone(); + let mqtt_ai = mqtt_instance.resubscribe().await?; + #[cfg(feature = "pushnoti")] let camera_pn = camera.clone(); #[cfg(feature = "pushnoti")] @@ -476,6 +482,60 @@ async fn listen_on_camera(camera: NeoInstance, mqtt_instance: MqttInstance) -> R }?; } }, if config.enable_motion => v, + // Handle the visitor messages + v = async { + let mut vis = camera_visitor.visitor().await?; + loop { + let v = async { + vis.wait_for(|state| matches!(state, VisitorState::Visted(_))).await.with_context(|| { + format!("{}: MdStart Watch Dropped", camera_name) + })?; + mqtt_visitor.send_message("status/visitor", "visitor", true).await.with_context(|| { + format!("{}: Failed to publish motion start", camera_name) + })?; + AnyResult::Ok(()) + }.await; + match v.map_err(|e| e.downcast::()) { + Err(Ok(neolink_core::Error::UnintelligibleReply{..})) => futures::future::pending().await, + Ok(()) => AnyResult::Ok(()), + Err(Ok(e)) => Err(e.into()), + Err(Err(e)) => Err(e), + }?; + } + }, if config.enable_visitor => v, + // Handle the ai messages + v = async { + let mut ai = camera_ai.ai().await?; + loop { + let v = async { + let ai_kind = { + let ai_state = ai.wait_for(|state| !matches!(state, AiState::Unknown)).await.with_context(|| { + format!("{}: MdStart Watch Dropped", camera_name) + })?; + match &*ai_state { + AiState::Person(_) => "person", + AiState::Car(_) => "car", + AiState::Other(_) => "other", + AiState::Unknown => unreachable!(), + } + }; + + mqtt_ai.send_message("status/ai", ai_kind, true).await.with_context(|| { + format!("{}: Failed to publish motion start", camera_name) + })?; + mqtt_ai.send_message(&format!("status/ai/{}", ai_kind), ai_kind, true).await.with_context(|| { + format!("{}: Failed to publish motion start", camera_name) + })?; + AnyResult::Ok(()) + }.await; + match v.map_err(|e| e.downcast::()) { + Err(Ok(neolink_core::Error::UnintelligibleReply{..})) => futures::future::pending().await, + Ok(()) => AnyResult::Ok(()), + Err(Ok(e)) => Err(e.into()), + Err(Err(e)) => Err(e), + }?; + } + }, if config.enable_ai => v, // Handle the SNAP (image preview) v = async { let mut wait = IntervalStream::new({ diff --git a/src/mqtt/mqttc.rs b/src/mqtt/mqttc.rs index c7f8d06f..29844d79 100644 --- a/src/mqtt/mqttc.rs +++ b/src/mqtt/mqttc.rs @@ -122,7 +122,7 @@ struct MqttBackend<'a> { cancel: CancellationToken, } -impl<'a> MqttBackend<'a> { +impl MqttBackend<'_> { async fn run(&mut self) -> AnyResult<()> { log::trace!("Run MQTT Server"); let mut mqttoptions = MqttOptions::new( @@ -339,7 +339,7 @@ impl<'a> MqttBackend<'a> { } } -impl<'a> Drop for MqttBackend<'a> { +impl Drop for MqttBackend<'_> { fn drop(&mut self) { self.cancel.cancel(); }