diff --git a/screenpipe-core/Cargo.toml b/screenpipe-core/Cargo.toml index e1fcb24fa..91c87e2da 100644 --- a/screenpipe-core/Cargo.toml +++ b/screenpipe-core/Cargo.toml @@ -48,11 +48,16 @@ once_cell = "1.19.0" cron = "0.13.0" chrono = "0.4.38" +# Add rdev for keyboard capture +rdev = { version = "0.5", optional = true } +crossbeam = { workspace = true, optional = true } + [features] default = ["pipes", "security"] llm = ["candle", "candle-nn", "candle-transformers", "tokenizers", "hf-hub"] pipes = [] security = ["dep:regex", "dep:lazy_static"] +keyboard = ["dep:rdev", "dep:crossbeam"] metal = ["candle/metal", "candle-nn/metal", "candle-transformers/metal"] cuda = ["candle/cuda", "candle-nn/cuda", "candle-transformers/cuda"] mkl = ["candle/mkl", "candle-nn/mkl", "candle-transformers/mkl"] diff --git a/screenpipe-core/src/keyboard_capture.rs b/screenpipe-core/src/keyboard_capture.rs new file mode 100644 index 000000000..97ea68492 --- /dev/null +++ b/screenpipe-core/src/keyboard_capture.rs @@ -0,0 +1,146 @@ +use anyhow::Result; +use crossbeam::channel::{bounded, Sender}; +use rdev::{listen, EventType}; +use std::fmt::Display; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::SystemTime; +use tokio::sync::mpsc; +use tracing::{debug, error}; + +pub struct KeyboardCapture { + is_running: Arc, + tx: mpsc::Sender, +} + +#[derive(Debug, Clone)] +pub struct KeyboardEvent { + pub timestamp: SystemTime, + pub key: String, + pub event_type: KeyboardEventType, +} + +#[derive(Debug, Clone)] +pub enum KeyboardEventType { + KeyPress, + KeyRelease, +} + +impl Display for KeyboardEventType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + KeyboardEventType::KeyPress => write!(f, "press"), + KeyboardEventType::KeyRelease => write!(f, "release"), + } + } +} + +impl KeyboardCapture { + pub fn new(tx: mpsc::Sender) -> Self { + Self { + is_running: Arc::new(AtomicBool::new(true)), + tx, + } + } + + pub async fn start(&self) -> Result<()> { + debug!("starting keyboard capture"); + let (event_tx, event_rx) = bounded(100); + let is_running = Arc::clone(&self.is_running); + let tx = self.tx.clone(); + + // Create a channel for stopping the keyboard listener + let (stop_tx, stop_rx) = bounded::<()>(1); + let stop_tx_clone = stop_tx.clone(); + let is_running_clone = is_running.clone(); + + // Spawn blocking task for keyboard events + tokio::task::spawn_blocking(move || { + if let Err(e) = listen_keyboard(event_tx, is_running_clone, stop_rx) { + error!("keyboard capture error: {:?}", e); + } + }); + + // Process events + loop { + if !self.is_running.load(Ordering::SeqCst) { + debug!("keyboard capture shutdown signal received"); + let _ = stop_tx_clone.send(()); + break; + } + + if let Ok(event) = event_rx.try_recv() { + if let Err(e) = tx.try_send(event) { + error!("error sending keyboard event: {:?}", e); + } + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + debug!("keyboard capture stopped"); + Ok(()) + } + + pub fn stop(&self) { + debug!("stopping keyboard capture"); + self.is_running.store(false, Ordering::SeqCst); + } +} + +fn listen_keyboard( + tx: Sender, + is_running: Arc, + stop_rx: crossbeam::channel::Receiver<()>, +) -> anyhow::Result<()> { + let stop_rx = Arc::new(stop_rx); + let stop_rx_clone = Arc::clone(&stop_rx); + let is_running_clone = Arc::clone(&is_running); + + // Create a thread to handle the stop signal + let stop_thread = std::thread::spawn(move || { + while is_running_clone.load(Ordering::SeqCst) { + if stop_rx_clone.try_recv().is_ok() { + is_running_clone.store(false, Ordering::SeqCst); + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + }); + + // Use a channel to communicate with the rdev listener + let (exit_tx, exit_rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + listen(move |event| { + if !is_running.load(Ordering::SeqCst) || stop_rx.try_recv().is_ok() { + let _ = exit_tx.send(()); + return; + } + + match event.event_type { + EventType::KeyPress(key) | EventType::KeyRelease(key) => { + let event = KeyboardEvent { + timestamp: SystemTime::now(), + key: format!("{:?}", key), + event_type: match event.event_type { + EventType::KeyPress(_) => KeyboardEventType::KeyPress, + EventType::KeyRelease(_) => KeyboardEventType::KeyRelease, + _ => return, + }, + }; + if let Err(e) = tx.send(event) { + error!("error sending keyboard event: {:?}", e); + } + } + _ => {} + } + }) + .unwrap(); + }); + + // Wait for the stop signal + let _ = exit_rx.recv(); + stop_thread.join().unwrap(); + + Ok(()) +} diff --git a/screenpipe-core/src/lib.rs b/screenpipe-core/src/lib.rs index ddfb9a6e4..d0eaab953 100644 --- a/screenpipe-core/src/lib.rs +++ b/screenpipe-core/src/lib.rs @@ -31,6 +31,10 @@ pub mod pii_removal; #[cfg(feature = "mkl")] extern crate intel_mkl_src; +#[cfg(feature = "keyboard")] +pub mod keyboard_capture; +#[cfg(feature = "keyboard")] +pub use keyboard_capture::*; #[cfg(feature = "security")] pub use pii_removal::*; diff --git a/screenpipe-server/Cargo.toml b/screenpipe-server/Cargo.toml index ffe8ebb6d..2a1320591 100644 --- a/screenpipe-server/Cargo.toml +++ b/screenpipe-server/Cargo.toml @@ -147,10 +147,11 @@ default = [] metal = ["candle/metal", "candle-nn/metal", "candle-transformers/metal"] cuda = ["candle/cuda", "candle-nn/cuda", "candle-transformers/cuda"] mkl = ["candle/mkl", "candle-nn/mkl", "candle-transformers/mkl"] -pipes = ["screenpipe-core/pipes", "url"] +pipes = ["screenpipe-core/pipes"] llm = ["screenpipe-core/llm"] -beta = ["screenpipe-core/beta", "dep:screenpipe-actions"] +beta = ["screenpipe-core/beta"] experimental = ["enigo"] +keyboard = ["screenpipe-core/keyboard"] [[bin]] name = "screenpipe" diff --git a/screenpipe-server/src/bin/screenpipe-server.rs b/screenpipe-server/src/bin/screenpipe-server.rs index 82163edb9..37df343a5 100644 --- a/screenpipe-server/src/bin/screenpipe-server.rs +++ b/screenpipe-server/src/bin/screenpipe-server.rs @@ -23,6 +23,7 @@ use screenpipe_audio::{ use screenpipe_core::find_ffmpeg_path; use screenpipe_server::{ cli::{Cli, CliAudioTranscriptionEngine, CliOcrEngine, Command, OutputFormat, PipeCommand}, + core::ShutdownSignal, highlight::{Highlight, HighlightConfig}, pipe_manager::PipeInfo, start_continuous_recording, watch_pid, DatabaseManager, PipeManager, ResourceMonitor, Server, @@ -31,7 +32,7 @@ use screenpipe_vision::monitor::list_monitors; #[cfg(target_os = "macos")] use screenpipe_vision::run_ui; use serde_json::{json, Value}; -use tokio::{runtime::Runtime, signal, sync::broadcast}; +use tokio::{signal, sync::broadcast}; use tracing::{debug, error, info, warn}; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::rolling::{RollingFileAppender, Rotation}; @@ -403,22 +404,15 @@ async fn main() -> anyhow::Result<()> { let ocr_engine_clone = cli.ocr_engine.clone(); let vad_engine = cli.vad_engine.clone(); let vad_engine_clone = vad_engine.clone(); + let vad_engine_clone_2 = vad_engine.clone(); let vad_sensitivity_clone = cli.vad_sensitivity.clone(); let (shutdown_tx, _) = broadcast::channel::<()>(1); - let audio_runtime = Runtime::new().unwrap(); - let vision_runtime = Runtime::new().unwrap(); - - let audio_handle = audio_runtime.handle().clone(); - let vision_handle = vision_runtime.handle().clone(); - let db_clone = Arc::clone(&db); let output_path_clone = Arc::new(local_data_dir.join("data").to_string_lossy().into_owned()); - let vision_control_clone = Arc::clone(&vision_control); - let shutdown_tx_clone = shutdown_tx.clone(); let monitor_ids_clone = monitor_ids.clone(); - let ignored_windows_clone = cli.ignored_windows.clone(); - let included_windows_clone = cli.included_windows.clone(); + let ignored_windows = cli.ignored_windows.clone(); + let included_windows = cli.included_windows.clone(); let fps = if cli.fps.is_finite() && cli.fps > 0.0 { cli.fps @@ -429,54 +423,43 @@ async fn main() -> anyhow::Result<()> { let audio_chunk_duration = Duration::from_secs(cli.audio_chunk_duration); - let handle = { - let runtime = &tokio::runtime::Handle::current(); - runtime.spawn(async move { - loop { - let vad_engine_clone = vad_engine.clone(); // Clone it here for each iteration - let mut shutdown_rx = shutdown_tx_clone.subscribe(); - let recording_future = start_continuous_recording( - db_clone.clone(), - output_path_clone.clone(), - fps, - audio_chunk_duration, // use the new setting - Duration::from_secs(cli.video_chunk_duration), - vision_control_clone.clone(), - audio_devices_control.clone(), - cli.disable_audio, - Arc::new(cli.audio_transcription_engine.clone().into()), - Arc::new(cli.ocr_engine.clone().into()), - monitor_ids_clone.clone(), - cli.use_pii_removal, - cli.disable_vision, - vad_engine_clone, - &vision_handle, - &audio_handle, - &cli.ignored_windows, - &cli.included_windows, - cli.deepgram_api_key.clone(), - cli.vad_sensitivity.clone(), - languages.clone(), - cli.capture_unfocused_windows, - ); - - let result = tokio::select! { - result = recording_future => result, - _ = shutdown_rx.recv() => { - info!("received shutdown signal for recording"); - break; - } - }; - - if let Err(e) = result { - error!("continuous recording error: {:?}", e); - } - } + let shutdown = ShutdownSignal::new(); + let shutdown_for_ctrlc = shutdown.clone(); + let shutdown_for_autodestruct = shutdown.clone(); + + // Use in continuous recording + let recording_future = start_continuous_recording( + db_clone, + output_path_clone, + fps, + audio_chunk_duration, + Duration::from_secs(cli.video_chunk_duration), + audio_devices_control.clone(), + cli.disable_audio, + Arc::new(cli.audio_transcription_engine.clone().into()), + Arc::new(cli.ocr_engine.clone().into()), + monitor_ids_clone.clone(), + cli.use_pii_removal, + cli.disable_vision, + vad_engine_clone_2, + &ignored_windows, + &included_windows, + cli.deepgram_api_key.clone(), + cli.vad_sensitivity.clone(), + languages.clone(), + cli.capture_unfocused_windows, + shutdown_for_autodestruct.clone(), + #[cfg(feature = "keyboard")] + cli.enable_keyboard, + ); - drop(vision_runtime); - drop(audio_runtime); - }) - }; + // Single Ctrl+C handler + tokio::spawn(async move { + if let Ok(()) = signal::ctrl_c().await { + info!("received ctrl+c, initiating shutdown"); + shutdown_for_ctrlc.signal(); + } + }); let local_data_dir_clone_2 = local_data_dir_clone.clone(); #[cfg(feature = "llm")] @@ -567,11 +550,11 @@ async fn main() -> anyhow::Result<()> { println!("│ use pii removal │ {:<34} │", cli.use_pii_removal); println!( "│ ignored windows │ {:<34} │", - format_cell(&format!("{:?}", &ignored_windows_clone), VALUE_WIDTH) + format_cell(&format!("{:?}", &ignored_windows), VALUE_WIDTH) ); println!( "│ included windows │ {:<34} │", - format_cell(&format!("{:?}", &included_windows_clone), VALUE_WIDTH) + format_cell(&format!("{:?}", &included_windows), VALUE_WIDTH) ); println!("│ ui monitoring │ {:<34} │", cli.enable_ui_monitoring); println!("│ frame cache │ {:<34} │", cli.enable_frame_cache); @@ -647,7 +630,7 @@ async fn main() -> anyhow::Result<()> { } // Audio devices section - println!("├─────────────────────┼────────────────────────────────────┤"); + println!("├─────────────────────┼──────────────────────────────���─────┤"); println!("│ audio devices │ │"); if cli.disable_audio { @@ -762,49 +745,18 @@ async fn main() -> anyhow::Result<()> { let server_future = server.start(devices_status, api_plugin, cli.enable_frame_cache); pin_mut!(server_future); - // Add auto-destruct watcher + // Auto-destruct watcher (if enabled) if let Some(pid) = cli.auto_destruct_pid { info!("watching pid {} for auto-destruction", pid); - let shutdown_tx_clone = shutdown_tx.clone(); tokio::spawn(async move { - // sleep for 5 seconds - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(5)).await; if watch_pid(pid).await { info!("watched pid {} has stopped, initiating shutdown", pid); - let _ = shutdown_tx_clone.send(()); + shutdown_for_autodestruct.signal(); } }); } - let ctrl_c_future = signal::ctrl_c(); - pin_mut!(ctrl_c_future); - - // only in beta and on macos - #[cfg(feature = "beta")] - { - if cli.enable_beta && cfg!(target_os = "macos") { - use screenpipe_actions::run; - - info!("beta feature enabled, starting screenpipe actions"); - - let shutdown_tx_clone = shutdown_tx.clone(); - tokio::spawn(async move { - let mut shutdown_rx = shutdown_tx_clone.subscribe(); - - tokio::select! { - result = run() => { - if let Err(e) = result { - error!("Error running screenpipe actions: {}", e); - } - } - _ = shutdown_rx.recv() => { - info!("Received shutdown signal, stopping screenpipe actions"); - } - } - }); - } - } - // Start the UI monitoring task #[cfg(target_os = "macos")] if cli.enable_ui_monitoring { @@ -833,18 +785,15 @@ async fn main() -> anyhow::Result<()> { }); } + // Main select loop tokio::select! { - _ = handle => info!("recording completed"), - result = &mut server_future => { + _ = recording_future => info!("recording completed"), + result = server_future => { match result { Ok(_) => info!("server stopped normally"), Err(e) => error!("server stopped with error: {:?}", e), } } - _ = ctrl_c_future => { - info!("received ctrl+c, initiating shutdown"); - let _ = shutdown_tx.send(()); - } } h.shutdown(); @@ -1161,7 +1110,7 @@ fn persist_path_windows(new_path: PathBuf) -> anyhow::Result<()> { } // Construct the new PATH string - let new_path_env = format!("{};{}", current_path, new_path.display()); + let new_path_env = format!("\";{}\"", new_path.display()); // Execute the 'setx' command to persist the PATH let output = std::process::Command::new("setx") diff --git a/screenpipe-server/src/cli.rs b/screenpipe-server/src/cli.rs index 25190fcef..1823e6c60 100644 --- a/screenpipe-server/src/cli.rs +++ b/screenpipe-server/src/cli.rs @@ -239,6 +239,11 @@ pub struct Cli { #[arg(long, default_value_t = false)] pub capture_unfocused_windows: bool, + /// Enable keyboard capture + #[cfg(feature = "keyboard")] + #[arg(long, default_value_t = false)] + pub enable_keyboard: bool, + #[command(subcommand)] pub command: Option, diff --git a/screenpipe-server/src/core.rs b/screenpipe-server/src/core.rs index 7710548e8..a8907b63f 100644 --- a/screenpipe-server/src/core.rs +++ b/screenpipe-server/src/core.rs @@ -3,7 +3,6 @@ use crate::db_types::Speaker; use crate::{DatabaseManager, VideoCapture}; use anyhow::Result; use crossbeam::queue::SegQueue; -use futures::future::join_all; use log::{debug, error, info, warn}; use screenpipe_audio::vad_engine::VadSensitivity; use screenpipe_audio::AudioStream; @@ -20,15 +19,35 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::runtime::Handle; +use tokio::sync::broadcast; use tokio::task::JoinHandle; +#[derive(Clone)] +pub struct ShutdownSignal { + tx: broadcast::Sender<()>, +} + +impl ShutdownSignal { + pub fn new() -> Self { + let (tx, _) = broadcast::channel(1); + Self { tx } + } + + pub fn subscribe(&self) -> broadcast::Receiver<()> { + self.tx.subscribe() + } + + pub fn signal(&self) { + let _ = self.tx.send(()); + } +} + pub async fn start_continuous_recording( db: Arc, output_path: Arc, fps: f64, audio_chunk_duration: Duration, video_chunk_duration: Duration, - vision_control: Arc, audio_devices_control: Arc>, audio_disabled: bool, audio_transcription_engine: Arc, @@ -37,71 +56,20 @@ pub async fn start_continuous_recording( use_pii_removal: bool, vision_disabled: bool, vad_engine: CliVadEngine, - vision_handle: &Handle, - audio_handle: &Handle, ignored_windows: &[String], include_windows: &[String], deepgram_api_key: Option, vad_sensitivity: CliVadSensitivity, languages: Vec, capture_unfocused_windows: bool, + shutdown: ShutdownSignal, + #[cfg(feature = "keyboard")] enable_keyboard: bool, ) -> Result<()> { - debug!("Starting video recording for monitor {:?}", monitor_ids); - let video_tasks = if !vision_disabled { - monitor_ids - .iter() - .map(|&monitor_id| { - let db_manager_video = Arc::clone(&db); - let output_path_video = Arc::clone(&output_path); - let is_running_video = Arc::clone(&vision_control); - let ocr_engine = Arc::clone(&ocr_engine); - let ignored_windows_video = ignored_windows.to_vec(); - let include_windows_video = include_windows.to_vec(); - - let languages = languages.clone(); - - debug!("Starting video recording for monitor {}", monitor_id); - vision_handle.spawn(async move { - record_video( - db_manager_video, - output_path_video, - fps, - is_running_video, - ocr_engine, - monitor_id, - use_pii_removal, - &ignored_windows_video, - &include_windows_video, - video_chunk_duration, - languages.clone(), - capture_unfocused_windows, - ) - .await - }) - }) - .collect::>() - } else { - vec![vision_handle.spawn(async move { - tokio::time::sleep(Duration::from_secs(60)).await; - Ok(()) - })] - }; + let mut shutdown_rx = shutdown.subscribe(); + // Create channels for each component let (whisper_sender, whisper_receiver, whisper_shutdown_flag) = if audio_disabled { - // Create a dummy channel if no audio devices are available, e.g. audio disabled - let (input_sender, _): ( - crossbeam::channel::Sender, - crossbeam::channel::Receiver, - ) = crossbeam::channel::bounded(100); - let (_, output_receiver): ( - crossbeam::channel::Sender, - crossbeam::channel::Receiver, - ) = crossbeam::channel::bounded(100); - ( - input_sender, - output_receiver, - Arc::new(AtomicBool::new(false)), - ) + create_dummy_channels() } else { create_whisper_channel( audio_transcription_engine.clone(), @@ -113,58 +81,180 @@ pub async fn start_continuous_recording( ) .await? }; - let whisper_sender_clone = whisper_sender.clone(); - let db_manager_audio = Arc::clone(&db); + let whisper_sender_shutdown = whisper_sender.clone(); + + // Spawn video recording tasks + let video_tasks = if !vision_disabled { + monitor_ids + .iter() + .map(|&monitor_id| { + let db = Arc::clone(&db); + let output_path = Arc::clone(&output_path); + let ocr_engine = Arc::clone(&ocr_engine); + let ignored_windows = ignored_windows.to_vec(); + let include_windows = include_windows.to_vec(); + let languages = languages.clone(); + let mut shutdown_rx = shutdown.subscribe(); + + tokio::spawn(async move { + loop { + tokio::select! { + result = record_video( + db.clone(), + output_path.clone(), + fps, + ocr_engine.clone(), + monitor_id, + use_pii_removal, + &ignored_windows, + &include_windows, + video_chunk_duration, + languages.clone(), + capture_unfocused_windows, + ) => { + if let Err(e) = result { + error!("video recording error: {}", e); + } + break; + } + _ = shutdown_rx.recv() => { + debug!("received shutdown signal for video recording"); + break; + } + } + } + Ok::<(), anyhow::Error>(()) + }) + }) + .collect() + } else { + vec![] + }; + + // Spawn audio recording task let audio_task = if !audio_disabled { - audio_handle.spawn(async move { - record_audio( - db_manager_audio, - audio_chunk_duration, - whisper_sender, - whisper_receiver, - audio_devices_control, - audio_transcription_engine, - ) - .await + let db = Arc::clone(&db); + let mut shutdown_rx = shutdown.subscribe(); + + tokio::spawn(async move { + loop { + tokio::select! { + result = record_audio( + db.clone(), + audio_chunk_duration, + whisper_sender_shutdown.clone(), + whisper_receiver.clone(), + audio_devices_control.clone(), + audio_transcription_engine.clone(), + ) => { + if let Err(e) = result { + error!("audio recording error: {}", e); + } + break; + } + _ = shutdown_rx.recv() => { + debug!("received shutdown signal for audio recording"); + break; + } + } + } + Ok::<(), anyhow::Error>(()) }) } else { - audio_handle.spawn(async move { - tokio::time::sleep(Duration::from_secs(60)).await; - Ok(()) + tokio::spawn(async { Ok(()) }) + }; + + // Spawn keyboard recording task if enabled + #[cfg(feature = "keyboard")] + let keyboard_task = if enable_keyboard { + let db = Arc::clone(&db); + let mut shutdown_rx = shutdown.subscribe(); + + tokio::spawn(async move { + use screenpipe_core::KeyboardCapture; + use tokio::sync::mpsc; + + let (tx, mut rx) = mpsc::channel(100); + let keyboard_capture = Arc::new(KeyboardCapture::new(tx)); + let keyboard_capture_clone = keyboard_capture.clone(); + + tokio::select! { + _ = async { + let keyboard_future = keyboard_capture.start(); + tokio::select! { + _ = keyboard_future => {}, + _ = async { + while let Some(event) = rx.recv().await { + if let Err(e) = db.insert_keyboard_event( + event.timestamp, + &event.key, + &event.event_type.to_string(), + ).await { + error!("Failed to insert keyboard event: {}", e); + } + } + } => {} + } + } => {}, + _ = shutdown_rx.recv() => { + debug!("received shutdown signal for keyboard recording"); + keyboard_capture_clone.stop(); + } + } + + Ok::<(), anyhow::Error>(()) }) + } else { + tokio::spawn(async { Ok::<(), anyhow::Error>(()) }) }; - // Join all video tasks - let video_results = join_all(video_tasks); + // Wait for shutdown signal + shutdown_rx.recv().await?; + info!("initiating graceful shutdown"); + + // Signal whisper to shutdown + whisper_shutdown_flag.store(true, Ordering::Relaxed); - // Handle any errors from the tasks - for (i, result) in video_results.await.into_iter().enumerate() { + // Wait for all tasks to complete + let results = futures::future::join_all(video_tasks).await; + for (i, result) in results.into_iter().enumerate() { if let Err(e) = result { - error!("Video recording error for monitor {}: {:?}", i, e); + error!("video task {} error during shutdown: {:?}", i, e); } } + if let Err(e) = audio_task.await { - error!("Audio recording error: {:?}", e); + error!("audio task error during shutdown: {:?}", e); } - // Shutdown the whisper channel - whisper_shutdown_flag.store(true, Ordering::Relaxed); - drop(whisper_sender_clone); // Close the sender channel - - // TODO: process any remaining audio chunks - // TODO: wait a bit for whisper to finish processing - // TODO: any additional cleanup like device controls to release + #[cfg(feature = "keyboard")] + if let Err(e) = keyboard_task.await { + error!("keyboard task error during shutdown: {:?}", e); + } - info!("Stopped recording"); + info!("all recording tasks completed"); Ok(()) } +fn create_dummy_channels() -> ( + crossbeam::channel::Sender, + crossbeam::channel::Receiver, + Arc, +) { + let (input_sender, _) = crossbeam::channel::bounded(1); + let (_, output_receiver) = crossbeam::channel::bounded(1); + ( + input_sender, + output_receiver, + Arc::new(AtomicBool::new(false)), + ) +} + async fn record_video( db: Arc, output_path: Arc, fps: f64, - is_running: Arc, ocr_engine: Arc, monitor_id: u32, use_pii_removal: bool, @@ -211,7 +301,7 @@ async fn record_video( capture_unfocused_windows, ); - while is_running.load(Ordering::SeqCst) { + loop { if let Some(frame) = video_capture.ocr_frame_queue.pop() { for window_result in &frame.window_ocr_results { match db.insert_frame(&device_name, None).await { @@ -253,8 +343,6 @@ async fn record_video( } tokio::time::sleep(Duration::from_secs_f64(1.0 / fps)).await; } - - Ok(()) } async fn record_audio( diff --git a/screenpipe-server/src/db.rs b/screenpipe-server/src/db.rs index e2f6955f5..d5143a480 100644 --- a/screenpipe-server/src/db.rs +++ b/screenpipe-server/src/db.rs @@ -12,7 +12,7 @@ use sqlx::Row; use sqlx::TypeInfo; use sqlx::ValueRef; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tracing::info; use std::collections::BTreeMap; @@ -1822,4 +1822,24 @@ impl DatabaseManager { Ok(()) } + + pub async fn insert_keyboard_event( + &self, + timestamp: SystemTime, + key: &str, + event_type: &str, + ) -> anyhow::Result { + let datetime = DateTime::::from(timestamp); + let id = sqlx::query( + "INSERT INTO keyboard_events (timestamp, key, event_type) VALUES (?, ?, ?)", + ) + .bind(datetime) + .bind(key) + .bind(event_type) + .execute(&self.pool) + .await? + .last_insert_rowid(); + + Ok(id) + } } diff --git a/screenpipe-server/src/db_types.rs b/screenpipe-server/src/db_types.rs index f3b3b0b1d..2554764a8 100644 --- a/screenpipe-server/src/db_types.rs +++ b/screenpipe-server/src/db_types.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use sqlx::FromRow; use std::error::Error as StdError; use std::fmt; +use std::time::SystemTime; #[derive(Debug)] pub struct DatabaseError(pub String); @@ -194,3 +195,11 @@ pub struct AudioChunksResponse { pub file_path: String, pub timestamp: DateTime, } + +#[derive(Debug)] +pub struct KeyboardChunk { + pub id: i64, + pub timestamp: SystemTime, + pub text: String, + pub event_type: String, +} diff --git a/screenpipe-server/src/migrations/20241219165648_keyboard_events.sql b/screenpipe-server/src/migrations/20241219165648_keyboard_events.sql new file mode 100644 index 000000000..97e697ab1 --- /dev/null +++ b/screenpipe-server/src/migrations/20241219165648_keyboard_events.sql @@ -0,0 +1,11 @@ +-- Add keyboard_events table +CREATE TABLE IF NOT EXISTS keyboard_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + key TEXT NOT NULL, + event_type TEXT NOT NULL CHECK (event_type IN ('press', 'release')), + created_at DATETIME DEFAULT CURRENT_TIMESTAMP +); + +-- Add index for faster timestamp-based queries +CREATE INDEX IF NOT EXISTS idx_keyboard_events_timestamp ON keyboard_events(timestamp);