Skip to content

Commit

Permalink
feat: implement experimental self-healing
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Jul 30, 2024
1 parent 60cf7fa commit 4a72ba9
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 208 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ members = [
"screenpipe-vision",
"screenpipe-audio",
"screenpipe-server",

]
exclude = [
"examples/apps/screenpipe-app-tauri/src-tauri",
"examples/apps/screenpipe-app-tauri/src-tauri",
"examples/apps/screenpipe-app-dioxus",
]
resolver = "2"
Expand Down
24 changes: 20 additions & 4 deletions examples/apps/screenpipe-app-tauri/scripts/pre_build.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ if (platform == 'windows') {
await $`mv ${config.ffmpegRealname}/lib/x64/* ${config.ffmpegRealname}/lib/`
}

// Setup Tesseract
const tesseractName = 'tesseract-5.3.3-windows'
const tesseractUrl = 'https://github.com/UB-Mannheim/tesseract/releases/download/v5.3.3/tesseract-ocr-w64-setup-5.3.3.20231005.exe'
const tesseractInstaller = `${tesseractName}.exe`

if (!(await fs.exists('tesseract'))) {
console.log('Setting up Tesseract for Windows...')
await $`C:\\msys64\\usr\\bin\\wget.exe -nc --show-progress ${tesseractUrl} -O ${tesseractInstaller}`
await $`${tesseractInstaller} /S /D=C:\\Program Files\\Tesseract-OCR`
await $`rm ${tesseractInstaller}`
await $`mv "C:\\Program Files\\Tesseract-OCR" tesseract`
console.log('Tesseract for Windows set up successfully.')
} else {
console.log('Tesseract for Windows already exists.')
}

// Add Tesseract to PATH
process.env.PATH = `${process.cwd()}\\tesseract;${process.env.PATH}`

// Setup OpenBlas
if (!(await fs.exists(config.openblasRealname)) && hasFeature('openblas')) {
await $`C:\\msys64\\usr\\bin\\wget.exe -nc --show-progress ${config.windows.openBlasUrl} -O ${config.windows.openBlasName}.zip`
Expand Down Expand Up @@ -228,10 +247,7 @@ if (hasFeature('cuda')) {
[`${cudaPath}\\bin\\cudart64_*`]: './',
[`${cudaPath}\\bin\\cublas64_*`]: './',
[`${cudaPath}\\bin\\cublasLt64_*`]: './',
// Tesseract?
'C:\\Windows\\System32\\msvcp140.dll': './',
'C:\\Windows\\System32\\vcruntime140.dll': './',
'C:\\Windows\\System32\\vcruntime140_1.dll': './',
'tesseract\\*': './',
},
},
}
Expand Down
1 change: 0 additions & 1 deletion screenpipe-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ crossbeam = { workspace = true }
[dev-dependencies]
tempfile = "3.3.0"

reqwest = { version = "0.11", features = ["json"] }

# Benches
criterion = { workspace = true }
Expand Down
102 changes: 57 additions & 45 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use std::{
use clap::Parser;
#[allow(unused_imports)]
use colored::Colorize;
use crossbeam::queue::SegQueue;
use dirs::home_dir;
use log::{debug, info, LevelFilter};
use log::{debug, error, info, LevelFilter};
use screenpipe_audio::{
default_input_device, default_output_device, list_audio_devices, parse_audio_device,
DeviceControl,
Expand Down Expand Up @@ -58,20 +59,10 @@ struct Cli {
#[arg(long, default_value_t = false)]
disable_audio: bool,

/// Memory usage threshold for restart (in percentage)
#[arg(long, default_value_t = 80.0)]
memory_threshold: f64,

/// Runtime threshold for restart (in minutes)
#[arg(long, default_value_t = 60)]
runtime_threshold: u64,

/// Enable automatic restart when resource thresholds are exceeded.
/// This feature will automatically restart the application if the memory usage
/// or runtime exceeds the specified thresholds, helping to ensure stability
/// and prevent potential crashes or performance degradation.
/// EXPERIMENTAL: Enable self healing when detecting unhealthy state based on /health endpoint.
/// This feature will automatically restart the recording tasks while keeping the API alive.
#[arg(long, default_value_t = false)]
restart_enabled: bool,
self_healing: bool,

/// Audio devices to use (can be specified multiple times)
#[arg(long)]
Expand All @@ -81,7 +72,7 @@ struct Cli {
#[arg(long)]
list_audio_devices: bool,

/// Data directory
/// Data directory. Default to $HOME/.screenpipe
#[arg(long)]
data_dir: Option<String>,

Expand Down Expand Up @@ -120,7 +111,7 @@ async fn main() -> anyhow::Result<()> {
builder
.filter(None, LevelFilter::Info)
.filter_module("tokenizers", LevelFilter::Error)
// .filter_module("rusty_tesseract", LevelFilter::Error)
.filter_module("rusty_tesseract", LevelFilter::Error)
.filter_module("symphonia", LevelFilter::Error);

if cli.debug {
Expand Down Expand Up @@ -177,9 +168,9 @@ async fn main() -> anyhow::Result<()> {

let mut audio_devices = Vec::new();

let (audio_devices_control_sender, audio_devices_control_receiver) = channel(64);
let audio_devices_control = Arc::new(SegQueue::new());

let audio_devices_control_sender_server = audio_devices_control_sender.clone();
let audio_devices_control_server = audio_devices_control.clone();

info!("Available audio devices:");
// Add all available audio devices to the controls
Expand Down Expand Up @@ -237,22 +228,20 @@ async fn main() -> anyhow::Result<()> {
is_paused: false,
};
let device_clone = device.deref().clone();
let sender_clone = audio_devices_control_sender.clone();
let sender_clone = audio_devices_control.clone();
// send signal after everything started
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(15)).await;
let _ = sender_clone.send((device_clone, device_control)).await;
let _ = sender_clone.push((device_clone, device_control));
});
}
}
}

let resource_monitor = ResourceMonitor::new(
cli.memory_threshold,
cli.runtime_threshold,
cli.restart_enabled,
);
resource_monitor.start_monitoring(Duration::from_secs(10)); // Log every 10 seconds
let (restart_sender, mut restart_receiver) = channel(10);
let resource_monitor =
ResourceMonitor::new(cli.self_healing, Duration::from_secs(60), 3, restart_sender);
resource_monitor.start_monitoring(Duration::from_secs(10));

let db = Arc::new(
DatabaseManager::new(&format!("{}/db.sqlite", local_data_dir.to_string_lossy()))
Expand All @@ -266,31 +255,54 @@ async fn main() -> anyhow::Result<()> {
"Database initialized, will store files in {}",
local_data_dir.to_string_lossy()
);
let db_record = db.clone();
let db_server = db.clone();

// Channel for controlling the recorder ! TODO RENAME SHIT
let (_control_tx, control_rx) = channel(64);
let vision_control = Arc::new(AtomicBool::new(true));

let vision_control_server_clone = vision_control.clone();

// Start continuous recording in a separate task
let _recording_task = tokio::spawn({
async move {
let audio_chunk_duration = Duration::from_secs(cli.audio_chunk_duration);

start_continuous_recording(
db_record,
Arc::new(local_data_dir.join("data").to_string_lossy().into_owned()),
cli.fps,
audio_chunk_duration,
control_rx,
vision_control,
audio_devices_control_receiver,
cli.save_text_files,
)
.await
// Function to start or restart the recording task
let _start_recording = tokio::spawn(async move {
// hack
let mut recording_task = tokio::spawn(async move {});

loop {
let db_clone = db.clone();
let local_data_dir = local_data_dir.clone();
let vision_control = vision_control.clone();
let audio_devices_control = audio_devices_control.clone();
tokio::select! {
_ = &mut recording_task => {
// Recording task completed or errored, restart it
debug!("Recording task ended. Restarting...");
}
Some(_) = restart_receiver.recv() => {
// Received restart signal, cancel the current task and restart
info!("Received restart signal. Restarting recording task...");
recording_task.abort();
}
}
recording_task = tokio::spawn(async move {
let result = start_continuous_recording(
db_clone,
Arc::new(local_data_dir.join("data").to_string_lossy().into_owned()),
cli.fps,
Duration::from_secs(cli.audio_chunk_duration),
vision_control,
audio_devices_control,
cli.save_text_files,
)
.await;

if let Err(e) = result {
error!("Continuous recording error: {:?}", e);
}
});
debug!("Recording task started");

// Short delay before restarting to avoid rapid restarts
tokio::time::sleep(Duration::from_secs(1)).await;
}
});

Expand All @@ -307,7 +319,7 @@ async fn main() -> anyhow::Result<()> {
db_server,
SocketAddr::from(([0, 0, 0, 0], cli.port)),
vision_control_server_clone,
audio_devices_control_sender_server,
audio_devices_control_server,
);
server.start(devices_status, api_plugin).await.unwrap();
});
Expand Down
80 changes: 36 additions & 44 deletions screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{DatabaseManager, VideoCapture};
use anyhow::Result;
use chrono::Utc;
use crossbeam::queue::SegQueue;
use log::{debug, error, info, warn};
use screenpipe_audio::{
create_whisper_channel, record_and_transcribe, AudioDevice, AudioInput, DeviceControl,
Expand All @@ -10,7 +11,7 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{Receiver, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
pub enum RecorderControl {
Pause,
Expand Down Expand Up @@ -39,15 +40,13 @@ impl DataOutputWrapper {
}
}


pub async fn start_continuous_recording(
db: Arc<DatabaseManager>,
output_path: Arc<String>,
fps: f64,
audio_chunk_duration: Duration,
mut full_control: Receiver<RecorderControl>,
vision_control: Arc<AtomicBool>,
audio_devices_control_receiver: Receiver<(AudioDevice, DeviceControl)>,
audio_devices_control: Arc<SegQueue<(AudioDevice, DeviceControl)>>,
save_text_files: bool,
) -> Result<()> {
info!("Recording now");
Expand All @@ -63,7 +62,14 @@ pub async fn start_continuous_recording(
let output_path_audio = Arc::clone(&output_path);

let video_handle = tokio::spawn(async move {
record_video(db_manager_video, output_path_video, fps, is_running_video, save_text_files).await
record_video(
db_manager_video,
output_path_video,
fps,
is_running_video,
save_text_files,
)
.await
});

let audio_handle = tokio::spawn(async move {
Expand All @@ -73,43 +79,13 @@ pub async fn start_continuous_recording(
audio_chunk_duration,
whisper_sender,
whisper_receiver,
audio_devices_control_receiver,
audio_devices_control,
)
.await
});

// Control loop
let control_handle = tokio::spawn(async move {
loop {
tokio::select! {
Some(ctrl) = full_control.recv() => {
match ctrl {
RecorderControl::Stop => {
vision_control.store(false, Ordering::SeqCst);
// stop all audio devices
// TODO not implemented
break; // Exit the loop when Stop is received
}
RecorderControl::Pause => {
// pause all audio devices
}
RecorderControl::Resume => {
vision_control.store(true, Ordering::SeqCst);
// resume all audio devices
}
}
}
else => {
// Channel closed, exit the loop
break;
}
}
}
});

video_handle.await??;
audio_handle.await??;
control_handle.await?;

info!("Stopped recording");
Ok(())
Expand All @@ -135,20 +111,36 @@ async fn record_video(
};
// debug!("record_video: video_capture");
let video_capture = VideoCapture::new(&output_path, fps, new_chunk_callback, save_text_files);

while is_running.load(Ordering::SeqCst) {
// let queue_lenglth = video_capture.ocr_frame_queue.lock().await.len();
// debug!("record_video: Checking for latest frame. Number of frames in OCR queue: {}", queue_length);
if let Some(frame) = video_capture.ocr_frame_queue.lock().await.pop_front() {
match db.insert_frame().await {
Ok(frame_id) => {
let text_json = serde_json::to_string(&frame.text_json).unwrap_or_default();
let new_text_json_vs_previous_frame = serde_json::to_string(&frame.new_text_json).unwrap_or_default();
let raw_data_output_from_ocr = DataOutputWrapper { data_output: frame.data_output }.to_json();
let new_text_json_vs_previous_frame =
serde_json::to_string(&frame.new_text_json).unwrap_or_default();
let raw_data_output_from_ocr = DataOutputWrapper {
data_output: frame.data_output,
}
.to_json();

// debug!("insert_ocr_text called for frame {}", frame_id);
if let Err(e) = db.insert_ocr_text(frame_id, &frame.text, &text_json, &new_text_json_vs_previous_frame, &raw_data_output_from_ocr).await {
error!("Failed to insert OCR text: {}, skipping frame {}", e, frame_id);
if let Err(e) = db
.insert_ocr_text(
frame_id,
&frame.text,
&text_json,
&new_text_json_vs_previous_frame,
&raw_data_output_from_ocr,
)
.await
{
error!(
"Failed to insert OCR text: {}, skipping frame {}",
e, frame_id
);
continue; // Skip to the next iteration
}
}
Expand All @@ -173,13 +165,13 @@ async fn record_audio(
chunk_duration: Duration,
whisper_sender: UnboundedSender<AudioInput>,
mut whisper_receiver: UnboundedReceiver<TranscriptionResult>,
mut audio_devices_control_receiver: Receiver<(AudioDevice, DeviceControl)>,
audio_devices_control: Arc<SegQueue<(AudioDevice, DeviceControl)>>,
) -> Result<()> {
let mut handles: HashMap<String, JoinHandle<()>> = HashMap::new();

loop {
// Non-blocking check for new device controls
while let Ok((audio_device, device_control)) = audio_devices_control_receiver.try_recv() {
while let Some((audio_device, device_control)) = audio_devices_control.pop() {
info!("Received audio device: {}", &audio_device);
let device_id = audio_device.to_string();

Expand Down Expand Up @@ -324,4 +316,4 @@ async fn process_audio_result(db: &DatabaseManager, result: TranscriptionResult)
result.input.device, e
),
}
}
}
Loading

0 comments on commit 4a72ba9

Please sign in to comment.