From 6eb594d0f97bf69c06236bd0f317caa1867e6597 Mon Sep 17 00:00:00 2001 From: Louis Beaumont Date: Wed, 26 Jun 2024 09:51:38 +0200 Subject: [PATCH] feat: add image post processing queue --- screenpipe/Cargo.toml | 1 + screenpipe/src/main.rs | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/screenpipe/Cargo.toml b/screenpipe/Cargo.toml index 6b0c17ed..1657050b 100644 --- a/screenpipe/Cargo.toml +++ b/screenpipe/Cargo.toml @@ -9,6 +9,7 @@ clap = { version = "4.0", features = ["derive"] } chrono = "0.4.38" crossbeam = "0.8" ctrlc = "3.4" +tokio = { version = "1.38", features = ["full"] } [[bin]] name = "screenpipe" diff --git a/screenpipe/src/main.rs b/screenpipe/src/main.rs index 06c1d69c..a3812983 100644 --- a/screenpipe/src/main.rs +++ b/screenpipe/src/main.rs @@ -6,6 +6,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::{self, sleep}; use std::time::Duration; +use tokio::runtime::Runtime; +use tokio::task; use xcap::image::{ImageBuffer, Rgba}; use xcap::Monitor; const DISPLAY: &str = r" @@ -44,6 +46,14 @@ fn normalized(filename: &str) -> String { .replace("/", "") } +async fn process_image(filename: String) { + // Example async post-processing function + // Perform tasks like extracting text or making API calls here + println!("Processing image asynchronously: {}", filename); + // Simulate async work + tokio::time::sleep(Duration::from_secs(1)).await; +} + fn screenpipe(path: &str, interval: f32, running: Arc) { // delete and recreate the directory println!("Deleting and recreating directory {}", path); @@ -60,14 +70,33 @@ fn screenpipe(path: &str, interval: f32, running: Arc) { println!("{}", DISPLAY); let (tx, rx) = channel::unbounded::<(ImageBuffer, Vec>, String)>(); - + let (post_tx, post_rx) = channel::unbounded::(); + let post_tx_clone = post_tx.clone(); // Thread for saving images let save_thread = thread::spawn(move || { while let Ok((image, filename)) = rx.recv() { image.save(&filename).unwrap(); + post_tx_clone.send(filename).unwrap(); // Send filename to post-processing } }); + // Async task for post-processing + let post_processing_thread = thread::spawn(move || { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + while let Ok(filename) = post_rx.recv() { + task::spawn(async move { + // Perform async post-processing here + println!("Post-processing image: {}", filename); + // Example: Call an async function to process the image + process_image(filename).await; + }) + .await + .unwrap(); + } + }); + }); + while running.load(Ordering::SeqCst) { let day_dir = format!("{}/{}", path, Local::now().format("%Y-%m-%d")); create_dir_all(&day_dir).unwrap(); @@ -91,6 +120,8 @@ fn screenpipe(path: &str, interval: f32, running: Arc) { } drop(tx); // Close the channel save_thread.join().unwrap(); // Wait for the saving thread to finish + drop(post_tx); // Close the post-processing channel + post_processing_thread.join().unwrap(); // Wait for the post-processing thread to finish } fn main() {