diff --git a/screenpipe-audio/Cargo.toml b/screenpipe-audio/Cargo.toml index 06e0ba9bc..835f4441d 100644 --- a/screenpipe-audio/Cargo.toml +++ b/screenpipe-audio/Cargo.toml @@ -61,6 +61,8 @@ webrtc-vad = "0.4.0" # Deepgram reqwest = { workspace = true } +uuid = { version = "1.10.0", features = ["v4"] } + screenpipe-core = { path = "../screenpipe-core" } [target.'cfg(target_os = "windows")'.dependencies] diff --git a/screenpipe-audio/src/bin/screenpipe-audio-forever.rs b/screenpipe-audio/src/bin/screenpipe-audio-forever.rs index 4a19d980f..c5c2bebae 100644 --- a/screenpipe-audio/src/bin/screenpipe-audio-forever.rs +++ b/screenpipe-audio/src/bin/screenpipe-audio-forever.rs @@ -7,13 +7,15 @@ use screenpipe_audio::default_output_device; use screenpipe_audio::list_audio_devices; use screenpipe_audio::parse_audio_device; use screenpipe_audio::record_and_transcribe; -use screenpipe_audio::vad_engine::VadSensitivity; +use screenpipe_audio::vad_engine::create_vad_engine; +use screenpipe_audio::vad_engine::VadEngine; use screenpipe_audio::AudioDevice; use screenpipe_audio::AudioTranscriptionEngine; use screenpipe_audio::VadEngineEnum; use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; #[derive(Parser, Debug)] @@ -80,13 +82,15 @@ async fn main() -> Result<()> { return Err(anyhow!("No audio input devices found")); } + let vad_engine_mutex: Arc>> = + Arc::new(Mutex::new(create_vad_engine(VadEngineEnum::Silero).await?)); + let chunk_duration = Duration::from_secs_f32(args.audio_chunk_duration); let (whisper_sender, mut whisper_receiver, _) = create_whisper_channel( Arc::new(AudioTranscriptionEngine::WhisperDistilLargeV3), - VadEngineEnum::Silero, // Or VadEngineEnum::WebRtc, hardcoded for now args.deepgram_api_key, &PathBuf::from("output.mp4"), - VadSensitivity::Medium, + vad_engine_mutex, ) .await?; // Spawn threads for each device diff --git a/screenpipe-audio/src/bin/screenpipe-audio.rs b/screenpipe-audio/src/bin/screenpipe-audio.rs index 62085e73f..e01c9c4a7 100644 --- a/screenpipe-audio/src/bin/screenpipe-audio.rs +++ b/screenpipe-audio/src/bin/screenpipe-audio.rs @@ -7,13 +7,15 @@ use screenpipe_audio::default_output_device; use screenpipe_audio::list_audio_devices; use screenpipe_audio::parse_audio_device; use screenpipe_audio::record_and_transcribe; -use screenpipe_audio::vad_engine::VadSensitivity; +use screenpipe_audio::vad_engine::create_vad_engine; +use screenpipe_audio::vad_engine::VadEngine; use screenpipe_audio::AudioDevice; use screenpipe_audio::AudioTranscriptionEngine; use screenpipe_audio::VadEngineEnum; use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; #[derive(Parser, Debug)] @@ -85,12 +87,15 @@ async fn main() -> Result<()> { let chunk_duration = Duration::from_secs(10); let output_path = PathBuf::from("output.mp4"); + + let vad_engine_mutex: Arc>> = + Arc::new(Mutex::new(create_vad_engine(VadEngineEnum::Silero).await?)); + let (whisper_sender, mut whisper_receiver, _) = create_whisper_channel( Arc::new(AudioTranscriptionEngine::WhisperDistilLargeV3), - VadEngineEnum::WebRtc, // Or VadEngineEnum::WebRtc, hardcoded for now deepgram_api_key, &output_path, - VadSensitivity::Medium, + vad_engine_mutex, ) .await?; // Spawn threads for each device diff --git a/screenpipe-audio/src/lib.rs b/screenpipe-audio/src/lib.rs index 884bb926a..9d9e358c5 100644 --- a/screenpipe-audio/src/lib.rs +++ b/screenpipe-audio/src/lib.rs @@ -1,14 +1,15 @@ mod core; +pub mod encode; +pub mod meeting_detector; mod multilingual; pub mod pcm_decode; pub mod stt; pub mod vad_engine; -pub mod encode; pub use core::{ default_input_device, default_output_device, list_audio_devices, parse_audio_device, - record_and_transcribe, AudioDevice, AudioTranscriptionEngine, DeviceControl, DeviceType + record_and_transcribe, AudioDevice, AudioTranscriptionEngine, DeviceControl, DeviceType, }; pub use encode::encode_single_audio; pub use pcm_decode::pcm_decode; pub use stt::{create_whisper_channel, stt, AudioInput, TranscriptionResult, WhisperModel}; -pub use vad_engine::VadEngineEnum; \ No newline at end of file +pub use vad_engine::VadEngineEnum; diff --git a/screenpipe-audio/src/meeting_detector.rs b/screenpipe-audio/src/meeting_detector.rs new file mode 100644 index 000000000..909c11f05 --- /dev/null +++ b/screenpipe-audio/src/meeting_detector.rs @@ -0,0 +1,135 @@ +use crate::vad_engine::VadEngine; +use anyhow::Result; +use chrono::{DateTime, Duration, Utc}; +use screenpipe_core::EventPayload; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use tracing::info; +use uuid::Uuid; + +const SILENCE_THRESHOLD: f32 = 0.01; +const ENERGY_THRESHOLD: f32 = 0.1; +const SILENCE_DURATION: Duration = Duration::seconds(180); // 3 minutes +const MEETING_START_ENERGY_DURATION: Duration = Duration::seconds(30); +const BUFFER_DURATION: Duration = Duration::seconds(300); // 5 minutes + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MeetingEvent { + pub event_type: MeetingEventType, + pub timestamp: DateTime, + pub meeting_id: String, +} + +impl EventPayload for MeetingEvent {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MeetingEventType { + Start, + End, +} + +pub struct MeetingDetector { + vad: Arc>>, + activity_buffer: VecDeque, + silence_start: Option>, + meeting_start: Option>, + pub current_meeting_id: Option, +} + +struct AudioActivityData { + energy: f32, + is_speech: bool, + timestamp: DateTime, +} + +impl MeetingDetector { + pub async fn new(vad_engine: Arc>>) -> Result { + Ok(Self { + vad: vad_engine, + activity_buffer: VecDeque::new(), + silence_start: None, + meeting_start: None, + current_meeting_id: None, + }) + } + + pub fn process_audio(&mut self, audio_frame: &[f32]) -> Result> { + let energy = self.calculate_energy(audio_frame); + let is_speech = self.vad.lock().unwrap().is_voice_segment(audio_frame)?; + let timestamp = Utc::now(); + + let activity = AudioActivityData { + energy, + is_speech, + timestamp, + }; + + self.activity_buffer.push_back(activity); + + // Remove old data from the buffer + while self + .activity_buffer + .front() + .map_or(false, |a| timestamp - a.timestamp > BUFFER_DURATION) + { + self.activity_buffer.pop_front(); + } + + if !is_speech && energy < SILENCE_THRESHOLD { + if self.silence_start.is_none() { + self.silence_start = Some(timestamp); + } + } else { + self.silence_start = None; + } + + if let Some(silence_start) = self.silence_start { + if timestamp - silence_start >= SILENCE_DURATION { + if let Some(meeting_id) = self.current_meeting_id.take() { + self.meeting_start = None; + info!("Meeting ended with id: {}", meeting_id); + return Ok(Some(MeetingEvent { + event_type: MeetingEventType::End, + timestamp, + meeting_id, + })); + } + } + } + + if self.meeting_start.is_none() && self.detect_meeting_start() { + let meeting_id = Uuid::new_v4().to_string(); + self.current_meeting_id = Some(meeting_id.clone()); + self.meeting_start = Some(timestamp); + info!("Meeting started with id: {}", meeting_id); + return Ok(Some(MeetingEvent { + event_type: MeetingEventType::Start, + timestamp, + meeting_id, + })); + } + + Ok(None) + } + + fn calculate_energy(&self, audio_frame: &[f32]) -> f32 { + audio_frame + .iter() + .map(|&sample| sample * sample) + .sum::() + / audio_frame.len() as f32 + } + + fn detect_meeting_start(&self) -> bool { + let high_energy_frames = self + .activity_buffer + .iter() + .rev() + .take_while(|a| Utc::now() - a.timestamp <= MEETING_START_ENERGY_DURATION) + .filter(|a| a.energy > ENERGY_THRESHOLD && a.is_speech) + .count(); + + high_energy_frames >= MEETING_START_ENERGY_DURATION.num_seconds() as usize / 2 + } +} diff --git a/screenpipe-audio/src/stt.rs b/screenpipe-audio/src/stt.rs index 3a07b8410..9a1521157 100644 --- a/screenpipe-audio/src/stt.rs +++ b/screenpipe-audio/src/stt.rs @@ -22,9 +22,7 @@ use rubato::{ }; use crate::{ - encode_single_audio, multilingual, - vad_engine::{SileroVad, VadEngine, VadEngineEnum, VadSensitivity, WebRtcVad}, - AudioDevice, AudioTranscriptionEngine, + encode_single_audio, multilingual, vad_engine::VadEngine, AudioDevice, AudioTranscriptionEngine, }; use hound::{WavSpec, WavWriter}; @@ -521,14 +519,14 @@ pub fn stt_sync( audio_input: &AudioInput, whisper_model: &WhisperModel, audio_transcription_engine: Arc, - vad_engine: Arc>>, // Changed type here + vad_engine: Arc>>, deepgram_api_key: Option, output_path: &PathBuf, ) -> Result<(String, String)> { let audio_input = audio_input.clone(); let whisper_model = whisper_model.clone(); let output_path = output_path.clone(); - let vad_engine = vad_engine.clone(); // Clone the Arc to move into the closure + let vad_engine = vad_engine.clone(); let handle = std::thread::spawn(move || { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -538,7 +536,7 @@ pub fn stt_sync( &audio_input, &whisper_model, audio_transcription_engine, - &mut **vad_engine_guard, // Obtain &mut dyn VadEngine + &mut **vad_engine_guard, deepgram_api_key, &output_path, )) @@ -821,14 +819,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; pub async fn create_whisper_channel( audio_transcription_engine: Arc, - vad_engine: VadEngineEnum, deepgram_api_key: Option, output_path: &PathBuf, - vad_sensitivity: VadSensitivity, + vad_engine: Arc>>, ) -> Result<( UnboundedSender, UnboundedReceiver, - Arc, // Shutdown flag + Arc, )> { let whisper_model = WhisperModel::new(audio_transcription_engine.clone())?; let (input_sender, mut input_receiver): ( @@ -839,12 +836,7 @@ pub async fn create_whisper_channel( UnboundedSender, UnboundedReceiver, ) = unbounded_channel(); - let mut vad_engine: Box = match vad_engine { - VadEngineEnum::WebRtc => Box::new(WebRtcVad::new()), - VadEngineEnum::Silero => Box::new(SileroVad::new().await?), - }; - vad_engine.set_sensitivity(vad_sensitivity); - let vad_engine = Arc::new(Mutex::new(vad_engine)); + let shutdown_flag = Arc::new(AtomicBool::new(false)); let shutdown_flag_clone = shutdown_flag.clone(); let output_path = output_path.clone(); diff --git a/screenpipe-audio/tests/meeting_detector_test.rs b/screenpipe-audio/tests/meeting_detector_test.rs new file mode 100644 index 000000000..60af9ecbf --- /dev/null +++ b/screenpipe-audio/tests/meeting_detector_test.rs @@ -0,0 +1,117 @@ +#[cfg(test)] +mod tests { + use env_logger; + use screenpipe_audio::{ + meeting_detector::{MeetingDetector, MeetingEvent, MeetingEventType}, + vad_engine::{VadEngine, VadSensitivity}, + }; + use std::sync::{Arc, Mutex}; // Add this import to initialize the logger + + pub struct MockVadEngine { + is_voice: bool, + sensitivity: VadSensitivity, + } + + impl MockVadEngine { + pub fn new() -> Self { + Self { + is_voice: false, + sensitivity: VadSensitivity::Medium, + } + } + } + + impl VadEngine for MockVadEngine { + fn is_voice_segment(&mut self, audio_chunk: &[f32]) -> anyhow::Result { + // Use the audio_chunk to determine if it's voice + // For simplicity, we'll use the average energy of the chunk + let energy: f32 = + audio_chunk.iter().map(|&x| x * x).sum::() / audio_chunk.len() as f32; + self.is_voice = energy > 0.1; // Arbitrary threshold + Ok(self.is_voice) + } + + fn set_sensitivity(&mut self, sensitivity: VadSensitivity) { + self.sensitivity = sensitivity; + } + + fn get_min_speech_ratio(&self) -> f32 { + self.sensitivity.min_speech_ratio() + } + } + + fn create_mock_vad() -> Arc>> { + Arc::new(Mutex::new(Box::new(MockVadEngine::new()))) + } + + fn create_audio_frame(energy: f32, samples: usize) -> Vec { + vec![energy.sqrt(); samples] + } + + #[tokio::test] + async fn test_meeting_detector() -> anyhow::Result<()> { + env_logger::init(); // Add this line to initialize the logger + + let vad = create_mock_vad(); + let mut detector = MeetingDetector::new(vad.clone()).await?; + + // simulate silence + for _ in 0..100 { + let frame = create_audio_frame(0.005, 1000); + assert!(detector.process_audio(&frame)?.is_none()); + } + + // simulate meeting start + let mut start_detected = false; + for i in 0..100 { + let frame = create_audio_frame(0.2, 1000); + let event = detector.process_audio(&frame)?; + if let Some(MeetingEvent { + event_type: MeetingEventType::Start, + .. + }) = event + { + println!("Meeting start detected at iteration {}", i); + start_detected = true; + break; + } + } + assert!(start_detected, "Meeting start event not detected"); + assert!(detector.current_meeting_id.is_some()); + + // simulate some ongoing meeting activity + for _ in 0..50 { + let frame = create_audio_frame(0.15, 1000); + assert!(detector.process_audio(&frame)?.is_none()); + } + + // simulate meeting end (prolonged silence) + let mut end_detected = false; + for i in 0..3600 { + // Increased to 3600 iterations (1 hour at 1 frame per second) + let frame = create_audio_frame(0.005, 1000); + let event = detector.process_audio(&frame)?; + if let Some(MeetingEvent { + event_type: MeetingEventType::End, + .. + }) = event + { + println!("Meeting end detected at iteration {}", i); + end_detected = true; + break; + } + if i % 600 == 0 { + println!("Iteration {}: Still waiting for end event", i); + } + } + + if !end_detected { + panic!("Meeting end event not detected after prolonged silence"); + } + + assert!(end_detected, "Meeting end event not detected"); + assert!(detector.current_meeting_id.is_none()); + + Ok(()) + } +} diff --git a/screenpipe-core/Cargo.toml b/screenpipe-core/Cargo.toml index cc9cf1272..747809e34 100644 --- a/screenpipe-core/Cargo.toml +++ b/screenpipe-core/Cargo.toml @@ -8,6 +8,8 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +serde = "1.0" +chrono = "0.4.22" serde_json = "1.0" which = "6.0.1" log = "0.4.17" diff --git a/screenpipe-core/src/events.rs b/screenpipe-core/src/events.rs new file mode 100644 index 000000000..2da5b15ac --- /dev/null +++ b/screenpipe-core/src/events.rs @@ -0,0 +1,36 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; + +pub trait EventPayload: + Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static +{ +} + +#[derive(Debug, Clone, Serialize)] +pub struct Event { + pub timestamp: DateTime, + pub payload: T, +} + +// wrapper for deserialization +#[derive(Deserialize)] +struct EventWrapper { + timestamp: DateTime, + payload: T, + #[serde(skip)] + _marker: PhantomData, +} + +impl<'de, T: EventPayload> Deserialize<'de> for Event { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let wrapper = EventWrapper::::deserialize(deserializer)?; + Ok(Event { + timestamp: wrapper.timestamp, + payload: wrapper.payload, + }) + } +} diff --git a/screenpipe-core/src/lib.rs b/screenpipe-core/src/lib.rs index 22163824c..3486b4e1d 100644 --- a/screenpipe-core/src/lib.rs +++ b/screenpipe-core/src/lib.rs @@ -10,3 +10,5 @@ pub use pipes::*; pub mod pii_removal; #[cfg(feature = "security")] pub use pii_removal::*; +pub mod events; +pub use events::*; diff --git a/screenpipe-server/Cargo.toml b/screenpipe-server/Cargo.toml index 212fa3ae7..3936357df 100644 --- a/screenpipe-server/Cargo.toml +++ b/screenpipe-server/Cargo.toml @@ -37,7 +37,7 @@ hf-hub = "0.3.2" rand = "0.8.5" # Server -axum = "0.7.5" +axum = { version = "0.7.5", features = ["ws"] } tokio = { version = "1.15", features = ["full", "tracing"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] } @@ -68,6 +68,7 @@ reqwest = { workspace = true } # Concurrency crossbeam = { workspace = true } +parking_lot = "0.12.1" # Friend integration screenpipe-integrations = { path = "../screenpipe-integrations" } diff --git a/screenpipe-server/src/bin/screenpipe-server.rs b/screenpipe-server/src/bin/screenpipe-server.rs index ac9f3b980..d98e08ab7 100644 --- a/screenpipe-server/src/bin/screenpipe-server.rs +++ b/screenpipe-server/src/bin/screenpipe-server.rs @@ -16,7 +16,7 @@ use screenpipe_audio::{ }; use screenpipe_core::find_ffmpeg_path; use screenpipe_server::{ - cli::{Cli, CliAudioTranscriptionEngine, CliOcrEngine, Command, PipeCommand}, start_continuous_recording, watch_pid, DatabaseManager, PipeManager, ResourceMonitor, Server + cli::{Cli, CliAudioTranscriptionEngine, CliOcrEngine, Command, PipeCommand}, start_continuous_recording, watch_pid, DatabaseManager, EventSystem, PipeManager, ResourceMonitor, Server }; use screenpipe_vision::monitor::list_monitors; use serde_json::{json, Value}; @@ -298,6 +298,9 @@ async fn main() -> anyhow::Result<()> { 1.0 }; + let event_system = Arc::new(EventSystem::new()); + let event_system_server = event_system.clone(); + let handle = { let runtime = &tokio::runtime::Handle::current(); runtime.spawn(async move { @@ -327,6 +330,7 @@ async fn main() -> anyhow::Result<()> { &cli.included_windows, cli.deepgram_api_key.clone(), cli.vad_sensitivity.clone(), + event_system.clone(), ); let result = tokio::select! { @@ -363,6 +367,7 @@ async fn main() -> anyhow::Result<()> { pipe_manager.clone(), cli.disable_vision, cli.disable_audio, + event_system_server, ); let mut pipe_futures = FuturesUnordered::new(); diff --git a/screenpipe-server/src/core.rs b/screenpipe-server/src/core.rs index 6c72e225a..5cad6c041 100644 --- a/screenpipe-server/src/core.rs +++ b/screenpipe-server/src/core.rs @@ -1,13 +1,14 @@ use crate::cli::{CliVadEngine, CliVadSensitivity}; -use crate::{DatabaseManager, VideoCapture}; +use crate::{DatabaseManager, EventSystem, VideoCapture}; use anyhow::Result; use crossbeam::queue::SegQueue; use futures::future::join_all; use log::{debug, error, info, warn}; -use screenpipe_audio::vad_engine::VadSensitivity; +use screenpipe_audio::meeting_detector::MeetingDetector; +use screenpipe_audio::vad_engine::{create_vad_engine, VadEngine}; use screenpipe_audio::{ - create_whisper_channel, record_and_transcribe, vad_engine::VadEngineEnum, AudioDevice, - AudioInput, AudioTranscriptionEngine, DeviceControl, TranscriptionResult, + create_whisper_channel, record_and_transcribe, AudioDevice, AudioInput, + AudioTranscriptionEngine, DeviceControl, TranscriptionResult, }; use screenpipe_core::pii_removal::remove_pii; use screenpipe_integrations::friend_wearable::initialize_friend_wearable_loop; @@ -15,7 +16,7 @@ use screenpipe_vision::OcrEngine; use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::runtime::Handle; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -44,7 +45,12 @@ pub async fn start_continuous_recording( include_windows: &[String], deepgram_api_key: Option, vad_sensitivity: CliVadSensitivity, + event_system: Arc, // Add this parameter ) -> Result<()> { + let mut vad_engine = create_vad_engine(vad_engine.into()).await?; + vad_engine.set_sensitivity(vad_sensitivity.into()); + let vad_engine_mutex: Arc>> = Arc::new(Mutex::new(vad_engine)); + let vad_engine_mutex_clone = vad_engine_mutex.clone(); let (whisper_sender, whisper_receiver, whisper_shutdown_flag) = if audio_disabled { // Create a dummy channel if no audio devices are available, e.g. audio disabled let (input_sender, _): (UnboundedSender, UnboundedReceiver) = @@ -61,10 +67,9 @@ pub async fn start_continuous_recording( } else { create_whisper_channel( audio_transcription_engine.clone(), - VadEngineEnum::from(vad_engine), deepgram_api_key, &PathBuf::from(output_path.as_ref()), - VadSensitivity::from(vad_sensitivity), + vad_engine_mutex, ) .await? }; @@ -128,6 +133,8 @@ pub async fn start_continuous_recording( audio_devices_control, friend_wearable_uid, audio_transcription_engine, + vad_engine_mutex_clone, + event_system, // Add this line ) .await }) @@ -257,8 +264,11 @@ async fn record_audio( audio_devices_control: Arc>, friend_wearable_uid: Option, audio_transcription_engine: Arc, + vad_engine: Arc>>, + event_system: Arc, // Add this parameter ) -> Result<()> { let mut handles: HashMap> = HashMap::new(); + let mut meeting_detector = MeetingDetector::new(vad_engine).await?; loop { while let Some((audio_device, device_control)) = audio_devices_control.pop() { @@ -358,7 +368,19 @@ async fn record_audio( "device {} received transcription {:?}", transcription.input.device, transcription.transcription ); - // avoiding crashing the audio processing if one fails + + // Process audio with MeetingDetector + if let Some(meeting_event) = + meeting_detector.process_audio(&transcription.input.data)? + { + // Handle meeting event (e.g., log it, store in database, etc.) + info!("Meeting event detected: {:?}", meeting_event); + + // Send the meeting event using the event system + event_system.publish(meeting_event); + } + + // Process transcription result if let Err(e) = process_audio_result( &db, transcription, @@ -368,7 +390,6 @@ async fn record_audio( .await { error!("Error processing audio result: {}", e); - // Optionally, you can add more specific error handling here } } diff --git a/screenpipe-server/src/event_system.rs b/screenpipe-server/src/event_system.rs new file mode 100644 index 000000000..2b15acef5 --- /dev/null +++ b/screenpipe-server/src/event_system.rs @@ -0,0 +1,40 @@ +use parking_lot::RwLock; +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use tokio::sync::broadcast; + +pub struct EventSystem { + channels: RwLock>>, +} + +impl EventSystem { + pub fn new() -> Self { + EventSystem { + channels: RwLock::new(HashMap::new()), + } + } + + pub fn subscribe(&self) -> broadcast::Receiver { + let type_id = TypeId::of::(); + let mut channels = self.channels.write(); + + let channel = channels + .entry(type_id) + .or_insert_with(|| Box::new(broadcast::channel::(100).0)) + .downcast_ref::>() + .expect("Failed to downcast channel"); + + channel.subscribe() + } + + pub fn publish(&self, event: T) { + let type_id = TypeId::of::(); + let channels = self.channels.read(); + + if let Some(channel) = channels.get(&type_id) { + if let Some(sender) = channel.downcast_ref::>() { + let _ = sender.send(event); + } + } + } +} diff --git a/screenpipe-server/src/lib.rs b/screenpipe-server/src/lib.rs index afcd87a61..748c593f2 100644 --- a/screenpipe-server/src/lib.rs +++ b/screenpipe-server/src/lib.rs @@ -1,7 +1,9 @@ +mod auto_destruct; pub mod chunking; pub mod cli; pub mod core; mod db; +mod event_system; pub mod filtering; pub mod logs; mod pipe_manager; @@ -11,11 +13,11 @@ mod server; mod video; mod video_db; mod video_utils; -mod auto_destruct; pub use auto_destruct::watch_pid; pub use cli::Cli; pub use core::start_continuous_recording; pub use db::{ContentSource, ContentType, DatabaseManager, SearchResult}; +pub use event_system::EventSystem; pub use logs::MultiWriter; pub use pipe_manager::PipeManager; pub use resource_monitor::{ResourceMonitor, RestartSignal}; diff --git a/screenpipe-server/src/server.rs b/screenpipe-server/src/server.rs index 8e4bd582b..cefb3b9ba 100644 --- a/screenpipe-server/src/server.rs +++ b/screenpipe-server/src/server.rs @@ -1,7 +1,10 @@ use axum::{ - extract::{Path, Query, State}, + extract::{ + ws::{Message, WebSocket}, + Path, Query, State, WebSocketUpgrade, + }, http::StatusCode, - response::Json as JsonResponse, + response::{IntoResponse, Json as JsonResponse}, routing::{get, post}, serve, Router, }; @@ -14,14 +17,14 @@ use crate::{ db::TagContentType, pipe_manager::{PipeInfo, PipeManager}, video_utils::{merge_videos, MergeVideosRequest, MergeVideosResponse}, - ContentType, DatabaseManager, SearchResult, + ContentType, DatabaseManager, EventSystem, SearchResult, }; use crate::{plugin::ApiPluginLayer, video_utils::extract_frame}; use chrono::{DateTime, Utc}; use log::{debug, error, info}; use screenpipe_audio::{ - default_input_device, default_output_device, list_audio_devices, AudioDevice, DeviceControl, - DeviceType, + default_input_device, default_output_device, list_audio_devices, + meeting_detector::MeetingEvent, AudioDevice, DeviceControl, DeviceType, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -47,6 +50,7 @@ pub struct AppState { pub pipe_manager: Arc, pub vision_disabled: bool, pub audio_disabled: bool, + pub event_system: Arc, } // Update the SearchQuery struct @@ -710,6 +714,7 @@ pub struct Server { pipe_manager: Arc, vision_disabled: bool, audio_disabled: bool, + event_system: Arc, } impl Server { @@ -722,6 +727,7 @@ impl Server { pipe_manager: Arc, vision_disabled: bool, audio_disabled: bool, + event_system: Arc, ) -> Self { Server { db, @@ -732,6 +738,7 @@ impl Server { pipe_manager, vision_disabled, audio_disabled, + event_system, } } @@ -753,6 +760,7 @@ impl Server { pipe_manager: self.pipe_manager, vision_disabled: self.vision_disabled, audio_disabled: self.audio_disabled, + event_system: self.event_system, }); let app = create_router() @@ -797,6 +805,24 @@ async fn merge_frames_handler( } } +pub async fn ws_handler( + ws: WebSocketUpgrade, + State(state): State>, +) -> impl IntoResponse { + ws.on_upgrade(|socket| handle_socket(socket, state)) +} + +async fn handle_socket(mut socket: WebSocket, state: Arc) { + let mut meeting_event_rx = state.event_system.subscribe::(); + + while let Ok(event) = meeting_event_rx.recv().await { + let json = serde_json::to_string(&event).unwrap(); + if socket.send(Message::Text(json)).await.is_err() { + break; + } + } +} + pub fn create_router() -> Router> { Router::new() .route("/search", get(search)) @@ -814,6 +840,7 @@ pub fn create_router() -> Router> { .route("/pipes/update", post(update_pipe_config_handler)) .route("/experimental/frames/merge", post(merge_frames_handler)) .route("/health", get(health_check)) + .route("/ws", get(ws_handler)) } /* @@ -1021,4 +1048,37 @@ MERGED_VIDEO_PATH=$(echo "$MERGE_RESPONSE" | jq -r '.video_path') echo "Merged Video Path: $MERGED_VIDEO_PATH" +# events + +virtualenv venv +source venv/bin/activate +pip install websocket-client + +python3 + +import websocket +import json + +def on_message(ws, message): + try: + data = json.loads(message) + print('received event:', data) + except json.JSONDecodeError as e: + print('error parsing event data:', str(e)) + +def on_error(ws, error): + print('websocket error:', str(error)) + +def on_close(ws, close_status_code, close_msg): + print('websocket connection closed') + +def on_open(ws): + print('websocket connection established') + +websocket.enableTrace(True) +ws = websocket.WebSocketApp("ws://localhost:3030/ws", on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close) + +ws.run_forever() + + */