From 4a72ba98c840d3fddb6a32819c79417625f7dfbf Mon Sep 17 00:00:00 2001 From: Louis Beaumont Date: Tue, 30 Jul 2024 10:26:49 +0200 Subject: [PATCH] feat: implement experimental self-healing --- Cargo.toml | 4 +- .../screenpipe-app-tauri/scripts/pre_build.js | 24 ++- screenpipe-server/Cargo.toml | 1 - .../src/bin/screenpipe-server.rs | 102 +++++++------ screenpipe-server/src/core.rs | 80 +++++----- screenpipe-server/src/lib.rs | 2 +- screenpipe-server/src/resource_monitor.rs | 138 +++++++++--------- screenpipe-server/src/server.rs | 67 ++++----- screenpipe-server/tests/endpoint_test.rs | 4 +- 9 files changed, 214 insertions(+), 208 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 931bb19b..ea549ea4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,10 @@ members = [ "screenpipe-vision", "screenpipe-audio", "screenpipe-server", - + ] exclude = [ - "examples/apps/screenpipe-app-tauri/src-tauri", + "examples/apps/screenpipe-app-tauri/src-tauri", "examples/apps/screenpipe-app-dioxus", ] resolver = "2" diff --git a/examples/apps/screenpipe-app-tauri/scripts/pre_build.js b/examples/apps/screenpipe-app-tauri/scripts/pre_build.js index faf8f009..a9d6ce0c 100644 --- a/examples/apps/screenpipe-app-tauri/scripts/pre_build.js +++ b/examples/apps/screenpipe-app-tauri/scripts/pre_build.js @@ -93,6 +93,25 @@ if (platform == 'windows') { await $`mv ${config.ffmpegRealname}/lib/x64/* ${config.ffmpegRealname}/lib/` } + // Setup Tesseract + const tesseractName = 'tesseract-5.3.3-windows' + const tesseractUrl = 'https://github.com/UB-Mannheim/tesseract/releases/download/v5.3.3/tesseract-ocr-w64-setup-5.3.3.20231005.exe' + const tesseractInstaller = `${tesseractName}.exe` + + if (!(await fs.exists('tesseract'))) { + console.log('Setting up Tesseract for Windows...') + await $`C:\\msys64\\usr\\bin\\wget.exe -nc --show-progress ${tesseractUrl} -O ${tesseractInstaller}` + await $`${tesseractInstaller} /S /D=C:\\Program Files\\Tesseract-OCR` + await $`rm ${tesseractInstaller}` + await $`mv "C:\\Program Files\\Tesseract-OCR" tesseract` + console.log('Tesseract for Windows set up successfully.') + } else { + console.log('Tesseract for Windows already exists.') + } + + // Add Tesseract to PATH + process.env.PATH = `${process.cwd()}\\tesseract;${process.env.PATH}` + // Setup OpenBlas if (!(await fs.exists(config.openblasRealname)) && hasFeature('openblas')) { await $`C:\\msys64\\usr\\bin\\wget.exe -nc --show-progress ${config.windows.openBlasUrl} -O ${config.windows.openBlasName}.zip` @@ -228,10 +247,7 @@ if (hasFeature('cuda')) { [`${cudaPath}\\bin\\cudart64_*`]: './', [`${cudaPath}\\bin\\cublas64_*`]: './', [`${cudaPath}\\bin\\cublasLt64_*`]: './', - // Tesseract? - 'C:\\Windows\\System32\\msvcp140.dll': './', - 'C:\\Windows\\System32\\vcruntime140.dll': './', - 'C:\\Windows\\System32\\vcruntime140_1.dll': './', + 'tesseract\\*': './', }, }, } diff --git a/screenpipe-server/Cargo.toml b/screenpipe-server/Cargo.toml index 8fdde236..a3f2e1dc 100644 --- a/screenpipe-server/Cargo.toml +++ b/screenpipe-server/Cargo.toml @@ -92,7 +92,6 @@ crossbeam = { workspace = true } [dev-dependencies] tempfile = "3.3.0" -reqwest = { version = "0.11", features = ["json"] } # Benches criterion = { workspace = true } diff --git a/screenpipe-server/src/bin/screenpipe-server.rs b/screenpipe-server/src/bin/screenpipe-server.rs index de124a0e..ac479803 100644 --- a/screenpipe-server/src/bin/screenpipe-server.rs +++ b/screenpipe-server/src/bin/screenpipe-server.rs @@ -11,8 +11,9 @@ use std::{ use clap::Parser; #[allow(unused_imports)] use colored::Colorize; +use crossbeam::queue::SegQueue; use dirs::home_dir; -use log::{debug, info, LevelFilter}; +use log::{debug, error, info, LevelFilter}; use screenpipe_audio::{ default_input_device, default_output_device, list_audio_devices, parse_audio_device, DeviceControl, @@ -58,20 +59,10 @@ struct Cli { #[arg(long, default_value_t = false)] disable_audio: bool, - /// Memory usage threshold for restart (in percentage) - #[arg(long, default_value_t = 80.0)] - memory_threshold: f64, - - /// Runtime threshold for restart (in minutes) - #[arg(long, default_value_t = 60)] - runtime_threshold: u64, - - /// Enable automatic restart when resource thresholds are exceeded. - /// This feature will automatically restart the application if the memory usage - /// or runtime exceeds the specified thresholds, helping to ensure stability - /// and prevent potential crashes or performance degradation. + /// EXPERIMENTAL: Enable self healing when detecting unhealthy state based on /health endpoint. + /// This feature will automatically restart the recording tasks while keeping the API alive. #[arg(long, default_value_t = false)] - restart_enabled: bool, + self_healing: bool, /// Audio devices to use (can be specified multiple times) #[arg(long)] @@ -81,7 +72,7 @@ struct Cli { #[arg(long)] list_audio_devices: bool, - /// Data directory + /// Data directory. Default to $HOME/.screenpipe #[arg(long)] data_dir: Option, @@ -120,7 +111,7 @@ async fn main() -> anyhow::Result<()> { builder .filter(None, LevelFilter::Info) .filter_module("tokenizers", LevelFilter::Error) - // .filter_module("rusty_tesseract", LevelFilter::Error) + .filter_module("rusty_tesseract", LevelFilter::Error) .filter_module("symphonia", LevelFilter::Error); if cli.debug { @@ -177,9 +168,9 @@ async fn main() -> anyhow::Result<()> { let mut audio_devices = Vec::new(); - let (audio_devices_control_sender, audio_devices_control_receiver) = channel(64); + let audio_devices_control = Arc::new(SegQueue::new()); - let audio_devices_control_sender_server = audio_devices_control_sender.clone(); + let audio_devices_control_server = audio_devices_control.clone(); info!("Available audio devices:"); // Add all available audio devices to the controls @@ -237,22 +228,20 @@ async fn main() -> anyhow::Result<()> { is_paused: false, }; let device_clone = device.deref().clone(); - let sender_clone = audio_devices_control_sender.clone(); + let sender_clone = audio_devices_control.clone(); // send signal after everything started tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(15)).await; - let _ = sender_clone.send((device_clone, device_control)).await; + let _ = sender_clone.push((device_clone, device_control)); }); } } } - let resource_monitor = ResourceMonitor::new( - cli.memory_threshold, - cli.runtime_threshold, - cli.restart_enabled, - ); - resource_monitor.start_monitoring(Duration::from_secs(10)); // Log every 10 seconds + let (restart_sender, mut restart_receiver) = channel(10); + let resource_monitor = + ResourceMonitor::new(cli.self_healing, Duration::from_secs(60), 3, restart_sender); + resource_monitor.start_monitoring(Duration::from_secs(10)); let db = Arc::new( DatabaseManager::new(&format!("{}/db.sqlite", local_data_dir.to_string_lossy())) @@ -266,31 +255,54 @@ async fn main() -> anyhow::Result<()> { "Database initialized, will store files in {}", local_data_dir.to_string_lossy() ); - let db_record = db.clone(); let db_server = db.clone(); // Channel for controlling the recorder ! TODO RENAME SHIT - let (_control_tx, control_rx) = channel(64); let vision_control = Arc::new(AtomicBool::new(true)); let vision_control_server_clone = vision_control.clone(); - // Start continuous recording in a separate task - let _recording_task = tokio::spawn({ - async move { - let audio_chunk_duration = Duration::from_secs(cli.audio_chunk_duration); - - start_continuous_recording( - db_record, - Arc::new(local_data_dir.join("data").to_string_lossy().into_owned()), - cli.fps, - audio_chunk_duration, - control_rx, - vision_control, - audio_devices_control_receiver, - cli.save_text_files, - ) - .await + // Function to start or restart the recording task + let _start_recording = tokio::spawn(async move { + // hack + let mut recording_task = tokio::spawn(async move {}); + + loop { + let db_clone = db.clone(); + let local_data_dir = local_data_dir.clone(); + let vision_control = vision_control.clone(); + let audio_devices_control = audio_devices_control.clone(); + tokio::select! { + _ = &mut recording_task => { + // Recording task completed or errored, restart it + debug!("Recording task ended. Restarting..."); + } + Some(_) = restart_receiver.recv() => { + // Received restart signal, cancel the current task and restart + info!("Received restart signal. Restarting recording task..."); + recording_task.abort(); + } + } + recording_task = tokio::spawn(async move { + let result = start_continuous_recording( + db_clone, + Arc::new(local_data_dir.join("data").to_string_lossy().into_owned()), + cli.fps, + Duration::from_secs(cli.audio_chunk_duration), + vision_control, + audio_devices_control, + cli.save_text_files, + ) + .await; + + if let Err(e) = result { + error!("Continuous recording error: {:?}", e); + } + }); + debug!("Recording task started"); + + // Short delay before restarting to avoid rapid restarts + tokio::time::sleep(Duration::from_secs(1)).await; } }); @@ -307,7 +319,7 @@ async fn main() -> anyhow::Result<()> { db_server, SocketAddr::from(([0, 0, 0, 0], cli.port)), vision_control_server_clone, - audio_devices_control_sender_server, + audio_devices_control_server, ); server.start(devices_status, api_plugin).await.unwrap(); }); diff --git a/screenpipe-server/src/core.rs b/screenpipe-server/src/core.rs index b727e0d4..acf36fdd 100644 --- a/screenpipe-server/src/core.rs +++ b/screenpipe-server/src/core.rs @@ -1,6 +1,7 @@ use crate::{DatabaseManager, VideoCapture}; use anyhow::Result; use chrono::Utc; +use crossbeam::queue::SegQueue; use log::{debug, error, info, warn}; use screenpipe_audio::{ create_whisper_channel, record_and_transcribe, AudioDevice, AudioInput, DeviceControl, @@ -10,7 +11,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc::{Receiver, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; pub enum RecorderControl { Pause, @@ -39,15 +40,13 @@ impl DataOutputWrapper { } } - pub async fn start_continuous_recording( db: Arc, output_path: Arc, fps: f64, audio_chunk_duration: Duration, - mut full_control: Receiver, vision_control: Arc, - audio_devices_control_receiver: Receiver<(AudioDevice, DeviceControl)>, + audio_devices_control: Arc>, save_text_files: bool, ) -> Result<()> { info!("Recording now"); @@ -63,7 +62,14 @@ pub async fn start_continuous_recording( let output_path_audio = Arc::clone(&output_path); let video_handle = tokio::spawn(async move { - record_video(db_manager_video, output_path_video, fps, is_running_video, save_text_files).await + record_video( + db_manager_video, + output_path_video, + fps, + is_running_video, + save_text_files, + ) + .await }); let audio_handle = tokio::spawn(async move { @@ -73,43 +79,13 @@ pub async fn start_continuous_recording( audio_chunk_duration, whisper_sender, whisper_receiver, - audio_devices_control_receiver, + audio_devices_control, ) .await }); - // Control loop - let control_handle = tokio::spawn(async move { - loop { - tokio::select! { - Some(ctrl) = full_control.recv() => { - match ctrl { - RecorderControl::Stop => { - vision_control.store(false, Ordering::SeqCst); - // stop all audio devices - // TODO not implemented - break; // Exit the loop when Stop is received - } - RecorderControl::Pause => { - // pause all audio devices - } - RecorderControl::Resume => { - vision_control.store(true, Ordering::SeqCst); - // resume all audio devices - } - } - } - else => { - // Channel closed, exit the loop - break; - } - } - } - }); - video_handle.await??; audio_handle.await??; - control_handle.await?; info!("Stopped recording"); Ok(()) @@ -135,7 +111,7 @@ async fn record_video( }; // debug!("record_video: video_capture"); let video_capture = VideoCapture::new(&output_path, fps, new_chunk_callback, save_text_files); - + while is_running.load(Ordering::SeqCst) { // let queue_lenglth = video_capture.ocr_frame_queue.lock().await.len(); // debug!("record_video: Checking for latest frame. Number of frames in OCR queue: {}", queue_length); @@ -143,12 +119,28 @@ async fn record_video( match db.insert_frame().await { Ok(frame_id) => { let text_json = serde_json::to_string(&frame.text_json).unwrap_or_default(); - let new_text_json_vs_previous_frame = serde_json::to_string(&frame.new_text_json).unwrap_or_default(); - let raw_data_output_from_ocr = DataOutputWrapper { data_output: frame.data_output }.to_json(); + let new_text_json_vs_previous_frame = + serde_json::to_string(&frame.new_text_json).unwrap_or_default(); + let raw_data_output_from_ocr = DataOutputWrapper { + data_output: frame.data_output, + } + .to_json(); // debug!("insert_ocr_text called for frame {}", frame_id); - if let Err(e) = db.insert_ocr_text(frame_id, &frame.text, &text_json, &new_text_json_vs_previous_frame, &raw_data_output_from_ocr).await { - error!("Failed to insert OCR text: {}, skipping frame {}", e, frame_id); + if let Err(e) = db + .insert_ocr_text( + frame_id, + &frame.text, + &text_json, + &new_text_json_vs_previous_frame, + &raw_data_output_from_ocr, + ) + .await + { + error!( + "Failed to insert OCR text: {}, skipping frame {}", + e, frame_id + ); continue; // Skip to the next iteration } } @@ -173,13 +165,13 @@ async fn record_audio( chunk_duration: Duration, whisper_sender: UnboundedSender, mut whisper_receiver: UnboundedReceiver, - mut audio_devices_control_receiver: Receiver<(AudioDevice, DeviceControl)>, + audio_devices_control: Arc>, ) -> Result<()> { let mut handles: HashMap> = HashMap::new(); loop { // Non-blocking check for new device controls - while let Ok((audio_device, device_control)) = audio_devices_control_receiver.try_recv() { + while let Some((audio_device, device_control)) = audio_devices_control.pop() { info!("Received audio device: {}", &audio_device); let device_id = audio_device.to_string(); @@ -324,4 +316,4 @@ async fn process_audio_result(db: &DatabaseManager, result: TranscriptionResult) result.input.device, e ), } -} \ No newline at end of file +} diff --git a/screenpipe-server/src/lib.rs b/screenpipe-server/src/lib.rs index eea82514..f4bfea40 100644 --- a/screenpipe-server/src/lib.rs +++ b/screenpipe-server/src/lib.rs @@ -9,7 +9,7 @@ mod video; pub use core::{start_continuous_recording, RecorderControl}; pub use db::{ContentType, DatabaseManager, SearchResult}; pub use logs::MultiWriter; -pub use resource_monitor::ResourceMonitor; +pub use resource_monitor::{ResourceMonitor, RestartSignal}; pub use server::health_check; pub use server::AppState; pub use server::HealthCheckResponse; diff --git a/screenpipe-server/src/resource_monitor.rs b/screenpipe-server/src/resource_monitor.rs index fe940538..eb907b28 100644 --- a/screenpipe-server/src/resource_monitor.rs +++ b/screenpipe-server/src/resource_monitor.rs @@ -1,28 +1,38 @@ use log::{error, info, warn}; use std::process::Command; use std::sync::Arc; -use std::thread; use std::time::{Duration, Instant}; use sysinfo::{PidExt, ProcessExt, System, SystemExt}; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; pub struct ResourceMonitor { start_time: Instant, - memory_threshold: f64, - runtime_threshold: Duration, - restart_enabled: bool, + self_healing_enabled: bool, + health_check_interval: Duration, + health_check_failures: Mutex, + max_health_check_failures: u32, + restart_sender: Sender, +} + +pub enum RestartSignal { + RecordingTasks, } impl ResourceMonitor { pub fn new( - memory_threshold: f64, - runtime_threshold_minutes: u64, - restart_enabled: bool, + self_healing_enabled: bool, + health_check_interval: Duration, + max_health_check_failures: u32, + restart_sender: Sender, ) -> Arc { Arc::new(Self { start_time: Instant::now(), - memory_threshold, - runtime_threshold: Duration::from_secs(runtime_threshold_minutes * 60), - restart_enabled, + self_healing_enabled, + health_check_interval, + health_check_failures: Mutex::new(0), + max_health_check_failures, + restart_sender, }) } @@ -74,75 +84,63 @@ impl ResourceMonitor { }; info!("{}", log_message); - - // Check for restart conditions only if restart is enabled - if self.restart_enabled - && (memory_usage_percent > self.memory_threshold - || runtime > self.runtime_threshold) - { - warn!( - "Restarting due to: Memory usage: {:.0}%, Runtime: {}s", - memory_usage_percent * 100.0, - runtime.as_secs() - ); - self.restart(); - } else if memory_usage_percent > self.memory_threshold - || runtime > self.runtime_threshold - { - warn!( - "Resource threshold exceeded: Memory usage: {:.0}%, Runtime: {}s", - memory_usage_percent * 100.0, - runtime.as_secs() - ); - } } } - fn restart(&self) { - if !self.restart_enabled { - warn!("Restart requested but restart feature is disabled."); - return; - } - warn!("Initiating restart due to resource thresholds..."); - - let args: Vec = std::env::args().collect(); - - #[cfg(unix)] - { - use std::os::unix::process::CommandExt; - let mut cmd = Command::new(&args[0]); - cmd.args(&args[1..]); - let err = cmd.exec(); - error!("Failed to restart application: {}", err); - std::process::exit(1); - } - - #[cfg(not(unix))] - { - // For non-Unix systems, we'll use a less seamless but still functional approach - match Command::new(&args[0]).args(&args[1..]).spawn() { - Ok(_) => { - info!("Application restarted successfully"); - std::process::exit(0); + pub fn start_monitoring(self: &Arc, interval: Duration) { + let monitor = Arc::clone(self); + tokio::spawn(async move { + let mut sys = System::new_all(); + let mut health_check_interval = tokio::time::interval(monitor.health_check_interval); + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => { + sys.refresh_all(); + monitor.log_status(&sys); + } + _ = health_check_interval.tick() => { + monitor.check_health().await; + } } - Err(e) => { - error!("Failed to restart application: {}", e); - std::process::exit(1); + } + }); + } + async fn check_health(&self) { + let client = reqwest::Client::new(); + match client.get("http://localhost:3030/health").send().await { + Ok(response) => { + if response.status().is_success() { + *self.health_check_failures.lock().await = 0; + } else { + self.handle_health_check_failure().await; } } + Err(_) => { + self.handle_health_check_failure().await; + } } } - pub fn start_monitoring(self: &Arc, interval: Duration) { - let monitor = Arc::clone(self); - thread::spawn(move || { - let mut sys = System::new_all(); - loop { - sys.refresh_all(); - monitor.log_status(&sys); - thread::sleep(interval); + async fn handle_health_check_failure(&self) { + let mut failures = self.health_check_failures.lock().await; + *failures += 1; + warn!("Health check failed. Consecutive failures: {}", *failures); + + if !self.self_healing_enabled { + return; + } + + if *failures >= self.max_health_check_failures { + warn!("Max health check failures reached. Restarting recording tasks..."); + if let Err(e) = self + .restart_sender + .send(RestartSignal::RecordingTasks) + .await + { + error!("Failed to send restart signal: {}", e); } - }); + *self.health_check_failures.lock().await = 0; + } } #[cfg(target_os = "macos")] @@ -175,4 +173,4 @@ impl ResourceMonitor { fn get_npu_usage(&self) -> Option { None } -} \ No newline at end of file +} diff --git a/screenpipe-server/src/server.rs b/screenpipe-server/src/server.rs index e059c779..f5979819 100644 --- a/screenpipe-server/src/server.rs +++ b/screenpipe-server/src/server.rs @@ -5,11 +5,12 @@ use axum::{ routing::{get, post}, serve, Router, }; +use crossbeam::queue::SegQueue; use tracing::Level; use crate::{ContentType, DatabaseManager, SearchResult}; use chrono::{DateTime, Utc}; -use log::{error, info}; +use log::{debug, error, info}; use screenpipe_audio::{AudioDevice, DeviceControl}; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -22,7 +23,7 @@ use std::{ }, time::Duration, }; -use tokio::{net::TcpListener, sync::mpsc::Sender}; +use tokio::net::TcpListener; use tower_http::trace::TraceLayer; use tower_http::{ cors::CorsLayer, @@ -35,7 +36,7 @@ use crate::plugin::ApiPluginLayer; pub struct AppState { pub db: Arc, pub vision_control: Arc, - pub audio_devices_control_sender: Sender<(AudioDevice, DeviceControl)>, + pub audio_devices_control: Arc>, pub devices_status: HashMap, pub app_start_time: DateTime, } @@ -222,6 +223,7 @@ pub(crate) async fn start_device( State(state): State>, JsonExt(payload): JsonExt, ) -> Result, (StatusCode, JsonResponse)> { + debug!("Received start device request: {}", payload.device_id); // Create an AudioDevice from the device_id string let audio_device = match AudioDevice::from_name(&payload.device_id) { Ok(device) => device, @@ -238,28 +240,21 @@ pub(crate) async fn start_device( is_paused: false, }; - if let Err(e) = state - .audio_devices_control_sender - .send((audio_device, device_control)) - .await - { - error!("failed to start audio device: {}", e); - Err(( - StatusCode::NOT_FOUND, - JsonResponse(json!({"error": "Device not found"})), - )) - } else { - Ok(JsonResponse(DeviceStatus { - id: payload.device_id, - is_running: true, - })) - } + state + .audio_devices_control + .push((audio_device, device_control)); + + Ok(JsonResponse(DeviceStatus { + id: payload.device_id, + is_running: true, + })) } pub(crate) async fn stop_device( State(state): State>, JsonExt(payload): JsonExt, ) -> Result, (StatusCode, JsonResponse)> { + debug!("Received stop device request: {}", payload.device_id); // Create an AudioDevice from the device_id string let audio_device = match AudioDevice::from_name(&payload.device_id) { Ok(device) => device, @@ -275,22 +270,14 @@ pub(crate) async fn stop_device( is_paused: false, }; - if let Err(e) = state - .audio_devices_control_sender - .send((audio_device, device_control)) - .await - { - error!("failed to stop audio device: {}", e); - Err(( - StatusCode::NOT_FOUND, - JsonResponse(json!({"error": "Device not found"})), - )) - } else { - Ok(JsonResponse(DeviceStatus { - id: payload.device_id, - is_running: false, - })) - } + state + .audio_devices_control + .push((audio_device, device_control)); + + Ok(JsonResponse(DeviceStatus { + id: payload.device_id, + is_running: false, + })) } pub(crate) async fn start_recording( @@ -363,6 +350,8 @@ pub async fn health_check(State(state): State>) -> JsonResponse, addr: SocketAddr, vision_control: Arc, - audio_devices_control_sender: Sender<(AudioDevice, DeviceControl)>, + audio_devices_control: Arc>, } impl Server { @@ -470,13 +459,13 @@ impl Server { db: Arc, addr: SocketAddr, vision_control: Arc, - audio_devices_control_sender: Sender<(AudioDevice, DeviceControl)>, + audio_devices_control: Arc>, ) -> Self { Server { db, addr, vision_control, - audio_devices_control_sender, + audio_devices_control, } } @@ -492,7 +481,7 @@ impl Server { let app_state = Arc::new(AppState { db: self.db, vision_control: self.vision_control, - audio_devices_control_sender: self.audio_devices_control_sender, + audio_devices_control: self.audio_devices_control, devices_status: device_status, app_start_time: Utc::now(), }); diff --git a/screenpipe-server/tests/endpoint_test.rs b/screenpipe-server/tests/endpoint_test.rs index c12fb1c5..564e2a0c 100644 --- a/screenpipe-server/tests/endpoint_test.rs +++ b/screenpipe-server/tests/endpoint_test.rs @@ -5,6 +5,7 @@ mod tests { use axum::Router; use axum::{body::to_bytes, routing::get}; use chrono::{Duration, Utc}; + use crossbeam::queue::SegQueue; use screenpipe_server::HealthCheckResponse; use screenpipe_server::{health_check, AppState, DatabaseManager}; // Adjust this import based on your actual module structure use std::collections::HashMap; @@ -17,7 +18,7 @@ mod tests { let app_state = Arc::new(AppState { db: db.clone(), vision_control: Arc::new(AtomicBool::new(false)), - audio_devices_control_sender: tokio::sync::mpsc::channel(100).0, + audio_devices_control: Arc::new(SegQueue::new()), devices_status: HashMap::new(), app_start_time: Utc::now(), }); @@ -94,7 +95,6 @@ mod tests { // Simulate passage of time tokio::time::sleep(tokio::time::Duration::from_secs(120)).await; - // Insert some recent data let _ = db.insert_video_chunk("test_video.mp4").await.unwrap(); let frame_id = db.insert_frame().await.unwrap();