Skip to content

Commit

Permalink
feat: add image post processing queue
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Jun 26, 2024
1 parent ce13ba3 commit 6eb594d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
1 change: 1 addition & 0 deletions screenpipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ clap = { version = "4.0", features = ["derive"] }
chrono = "0.4.38"
crossbeam = "0.8"
ctrlc = "3.4"
tokio = { version = "1.38", features = ["full"] }

[[bin]]
name = "screenpipe"
Expand Down
33 changes: 32 additions & 1 deletion screenpipe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, sleep};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::task;
use xcap::image::{ImageBuffer, Rgba};
use xcap::Monitor;
const DISPLAY: &str = r"
Expand Down Expand Up @@ -44,6 +46,14 @@ fn normalized(filename: &str) -> String {
.replace("/", "")
}

async fn process_image(filename: String) {
// Example async post-processing function
// Perform tasks like extracting text or making API calls here
println!("Processing image asynchronously: {}", filename);
// Simulate async work
tokio::time::sleep(Duration::from_secs(1)).await;
}

fn screenpipe(path: &str, interval: f32, running: Arc<AtomicBool>) {
// delete and recreate the directory
println!("Deleting and recreating directory {}", path);
Expand All @@ -60,14 +70,33 @@ fn screenpipe(path: &str, interval: f32, running: Arc<AtomicBool>) {
println!("{}", DISPLAY);

let (tx, rx) = channel::unbounded::<(ImageBuffer<Rgba<u8>, Vec<u8>>, String)>();

let (post_tx, post_rx) = channel::unbounded::<String>();
let post_tx_clone = post_tx.clone();
// Thread for saving images
let save_thread = thread::spawn(move || {
while let Ok((image, filename)) = rx.recv() {
image.save(&filename).unwrap();
post_tx_clone.send(filename).unwrap(); // Send filename to post-processing
}
});

// Async task for post-processing
let post_processing_thread = thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
while let Ok(filename) = post_rx.recv() {
task::spawn(async move {
// Perform async post-processing here
println!("Post-processing image: {}", filename);
// Example: Call an async function to process the image
process_image(filename).await;
})
.await
.unwrap();
}
});
});

while running.load(Ordering::SeqCst) {
let day_dir = format!("{}/{}", path, Local::now().format("%Y-%m-%d"));
create_dir_all(&day_dir).unwrap();
Expand All @@ -91,6 +120,8 @@ fn screenpipe(path: &str, interval: f32, running: Arc<AtomicBool>) {
}
drop(tx); // Close the channel
save_thread.join().unwrap(); // Wait for the saving thread to finish
drop(post_tx); // Close the post-processing channel
post_processing_thread.join().unwrap(); // Wait for the post-processing thread to finish
}

fn main() {
Expand Down

0 comments on commit 6eb594d

Please sign in to comment.