Skip to content

Commit

Permalink
feat: cut audio storage by 10x
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Jul 11, 2024
1 parent c3f77b9 commit 510e3a5
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 103 deletions.
6 changes: 5 additions & 1 deletion screenpipe-audio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ base64 = "0.21.7"
anyhow = "1.0.86"
byteorder = "1.5.0"
hf-hub = "0.3.2"
symphonia = "0.5.4"
symphonia = { version = "0.5.4", features = ["mp3"] }
rand = "0.8.5"
rubato = "0.15.0"

Expand All @@ -56,8 +56,12 @@ tracing = { workspace = true }
# Concurrency
crossbeam = "0.8"

# Bytes
bytemuck = "1.16.1"

[dev-dependencies]
tempfile = "3.3.0"
infer = "0.15"

[features]
metal = ["candle/metal", "candle-nn/metal", "candle-transformers/metal"]
Expand Down
4 changes: 2 additions & 2 deletions screenpipe-audio/src/bin/screenpipe-audio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn main() -> Result<()> {
}

let chunk_duration = Duration::from_secs(30);
let output_path = PathBuf::from("output.wav");
let output_path = PathBuf::from("output.mp3");
let (whisper_sender, whisper_receiver) = create_whisper_channel()?;

// Spawn threads for each device
Expand All @@ -74,7 +74,7 @@ fn main() -> Result<()> {
.enumerate()
.map(|(i, device)| {
let whisper_sender = whisper_sender.clone();
let output_path = output_path.with_file_name(format!("output_{}.wav", i));
let output_path = output_path.with_file_name(format!("output_{}.mp3", i));
thread::spawn(move || {
record_and_transcribe(&device, chunk_duration, output_path, whisper_sender)
})
Expand Down
151 changes: 78 additions & 73 deletions screenpipe-audio/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use anyhow::{anyhow, Result};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{FromSample, Sample};
use crossbeam::channel::Sender;
use crossbeam::channel::{Receiver, Sender};
use hound::WavSpec;
use log::{error, info};
use log::{debug, error, info};
use serde::Serialize;
use std::fmt;
use std::fs::File;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::time::Duration;
use std::{io::BufWriter, thread};

use crate::AudioInput;
Expand Down Expand Up @@ -80,6 +80,9 @@ pub fn parse_device_spec(name: &str) -> Result<DeviceSpec> {
DeviceSpec::from_name(&name)
}

use std::io::Write;
use std::process::{Command, Stdio};

pub fn record_and_transcribe(
device_spec: &DeviceSpec,
duration: Duration,
Expand Down Expand Up @@ -117,95 +120,97 @@ pub fn record_and_transcribe(
config
);

let spec = wav_spec_from_config(&config);
let writer = hound::WavWriter::create(&output_path, spec)?;
let writer = Arc::new(Mutex::new(Some(writer)));
let writer_2 = writer.clone();
let sample_rate = config.sample_rate().0;
let channels = config.channels() as u16;

let (audio_sender, audio_receiver): (Sender<Vec<u8>>, Receiver<Vec<u8>>) =
crossbeam::channel::unbounded();
let is_running = Arc::new(Mutex::new(true));
let is_running_clone = is_running.clone();

let output_path_clone = output_path.clone();
// Spawn FFmpeg process in a separate thread
let ffmpeg_handle = thread::spawn(move || {
let mut ffmpeg = Command::new("ffmpeg")
.args(&[
"-f",
"f32le",
"-ar",
&sample_rate.to_string(),
"-ac",
&channels.to_string(),
"-i",
"pipe:0",
"-c:a",
"libmp3lame",
"-b:a",
"128k",
"-f",
"mp3",
output_path_clone.to_str().unwrap(),
])
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.expect("Failed to spawn ffmpeg process");

let mut stdin = ffmpeg.stdin.take().expect("Failed to open stdin");

while *is_running_clone.lock().unwrap() {
if let Ok(data) = audio_receiver.recv_timeout(Duration::from_millis(100)) {
if let Err(e) = stdin.write_all(&data) {
error!("Failed to write audio data to FFmpeg: {}", e);
break;
}
}
}

// Close stdin to signal EOF to FFmpeg
drop(stdin);

// Wait for FFmpeg to finish
match ffmpeg.wait() {
Ok(status) => debug!("FFmpeg process exited with status: {}", status),
Err(e) => error!("Failed to wait for FFmpeg process: {}", e),
}
});

let err_fn = |err| error!("An error occurred on the audio stream: {}", err);

let stream = match config.sample_format() {
cpal::SampleFormat::I8 => audio_device.build_input_stream(
&config.into(),
move |data, _: &_| write_input_data::<i8, i8>(data, &writer_2),
err_fn,
None,
)?,
cpal::SampleFormat::I16 => audio_device.build_input_stream(
&config.into(),
move |data, _: &_| write_input_data::<i16, i16>(data, &writer_2),
err_fn,
None,
)?,
cpal::SampleFormat::I32 => audio_device.build_input_stream(
&config.into(),
move |data, _: &_| write_input_data::<i32, i32>(data, &writer_2),
err_fn,
None,
)?,
cpal::SampleFormat::F32 => audio_device.build_input_stream(
&config.into(),
move |data, _: &_| write_input_data::<f32, f32>(data, &writer_2),
move |data: &[f32], _: &_| {
if let Err(e) = audio_sender.send(bytemuck::cast_slice(data).to_vec()) {
error!("Failed to send audio data: {}", e);
}
},
err_fn,
None,
)?,
sample_format => return Err(anyhow!("Unsupported sample format '{}'", sample_format)),
};

stream.play()?;
info!(
"Will write an audio file every {} seconds",
duration.as_secs()
);
info!("Recording for {} seconds", duration.as_secs());

let start_time = Instant::now();
let chunk_duration = Duration::from_secs(5); // Adjust as needed
let mut next_chunk_time = start_time + chunk_duration;
thread::sleep(duration);

while start_time.elapsed() < duration {
let now = Instant::now();
// Stop the stream and signal the recording to stop
stream.pause()?;
*is_running.lock().unwrap() = false;

if now >= next_chunk_time {
// Stop the stream temporarily
stream.pause()?;
// Wait for the FFmpeg thread to finish
ffmpeg_handle.join().expect("Failed to join FFmpeg thread");

{
let mut writer_guard = writer.lock().unwrap();
if let Some(writer) = writer_guard.as_mut() {
writer.flush()?;

// Send the file path to the whisper channel
whisper_sender.send(AudioInput {
path: output_path.to_str().unwrap().to_string(),
device: device_spec.to_string(),
})?;
}
}

// Resume the stream
stream.play()?;
next_chunk_time = now + chunk_duration;
}

thread::sleep(Duration::from_millis(100));
}

// Final flush and transcription
{
let mut writer_guard = writer.lock().unwrap();
if let Some(writer) = writer_guard.as_mut() {
writer.flush()?;

// Send the file path to the whisper channel
whisper_sender.send(AudioInput {
path: output_path.to_str().unwrap().to_string(),
device: device_spec.to_string(),
})?;
}
}
whisper_sender.send(AudioInput {
path: output_path.to_str().unwrap().to_string(),
device: device_spec.to_string(),
})?;

Ok(output_path)
}

fn wav_spec_from_config(config: &cpal::SupportedStreamConfig) -> WavSpec {
WavSpec {
channels: config.channels() as _,
Expand Down
16 changes: 0 additions & 16 deletions screenpipe-audio/src/stt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,20 +534,4 @@ pub fn create_whisper_channel() -> Result<(Sender<AudioInput>, Receiver<Transcri
Ok((input_sender, output_receiver))
}

#[test]
#[ignore]
fn test_speech_to_text() {
println!("Starting speech to text test");

println!("Loading audio file");
let start = std::time::Instant::now();
let whisper_model = WhisperModel::new().unwrap();

let text = stt("./test_data/poetic_kapil_gupta.wav", &whisper_model).unwrap();
let duration = start.elapsed();

println!("Speech to text completed in {:?}", duration);
println!("Transcribed text: {}", text);

assert!(text.contains("The fire"));
}
Binary file added screenpipe-audio/test_data/selah.mp3
Binary file not shown.
59 changes: 52 additions & 7 deletions screenpipe-audio/tests/core_tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
#[cfg(test)]
mod tests {
use screenpipe_audio::{list_audio_devices, parse_device_spec, stt, AudioInput, WhisperModel};

use super::*;
use chrono::Utc;
use crossbeam::channel;
use screenpipe_audio::record_and_transcribe;
use screenpipe_audio::{
default_output_device, list_audio_devices, parse_device_spec, stt, WhisperModel,
};
use std::path::PathBuf;
use std::time::{Duration, Instant};

// ! what happen in github action?
#[test]
Expand All @@ -18,7 +23,6 @@ mod tests {
assert_eq!(spec.to_string(), "Test Device (input)");
}

// TODO move to tests folder
#[test]
#[ignore]
fn test_speech_to_text() {
Expand All @@ -28,14 +32,55 @@ mod tests {
let start = std::time::Instant::now();
let whisper_model = WhisperModel::new().unwrap();

let text = stt("./test_data/poetic_kapil_gupta.wav", &whisper_model).unwrap();
let text = stt("./test_data/selah.mp3", &whisper_model).unwrap();
let duration = start.elapsed();

println!("Speech to text completed in {:?}", duration);
println!("Transcribed text: {:?}", text);

assert!(text.contains("The fire"));
assert!(text.contains("love"));
}

// Add more tests for other functions
#[test]
#[ignore] // Add this if you want to skip this test in regular test runs
fn test_record_and_transcribe() {
// Setup
let device_spec = default_output_device().unwrap();
let duration = Duration::from_secs(30); // Record for 3 seconds
let time = Utc::now().timestamp_millis();
let output_path = PathBuf::from(format!("test_output_{}.mp3", time));
let (sender, receiver) = channel::unbounded();

// Act
let start_time = Instant::now();
let result = record_and_transcribe(&device_spec, duration, output_path.clone(), sender);
let elapsed_time = start_time.elapsed();

// Assert
assert!(result.is_ok(), "record_and_transcribe should succeed");

// Check if the recording duration is close to the specified duration
assert!(
elapsed_time >= duration && elapsed_time < duration + Duration::from_secs(3),
"Recording duration should be close to the specified duration"
);

// Check if the file was created
assert!(output_path.exists(), "Output file should exist");

// Check if we received the correct AudioInput
let audio_input = receiver.recv_timeout(Duration::from_secs(1)).unwrap();
assert_eq!(audio_input.path, output_path.to_str().unwrap());

// Verify file format (you might need to install the `infer` crate for this)
let kind = infer::get_from_path(&output_path).unwrap().unwrap();
assert_eq!(
kind.mime_type(),
"audio/mpeg",
"File should be in mp3 format"
);

// Clean up
std::fs::remove_file(output_path).unwrap();
}
}
2 changes: 1 addition & 1 deletion screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async fn record_audio(
move || {
let new_file_name = Utc::now().format("%Y-%m-%d_%H-%M-%S").to_string();
let file_path = format!(
"{}/{}_{}.wav",
"{}/{}_{}.mp3",
output_path_clone, device_spec_clone, new_file_name
);
record_and_transcribe(
Expand Down
6 changes: 3 additions & 3 deletions screenpipe-server/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ mod tests {
#[tokio::test]
async fn test_insert_and_search_audio() {
let db = setup_test_db().await;
let audio_chunk_id = db.insert_audio_chunk("test_audio.wav").await.unwrap();
let audio_chunk_id = db.insert_audio_chunk("test_audio.mp3").await.unwrap();
db.insert_audio_transcription(audio_chunk_id, "Hello from audio", 0)
.await
.unwrap();
Expand All @@ -574,7 +574,7 @@ mod tests {
assert_eq!(results.len(), 1);
if let SearchResult::Audio(audio_result) = &results[0] {
assert_eq!(audio_result.transcription, "Hello from audio");
assert_eq!(audio_result.file_path, "test_audio.wav");
assert_eq!(audio_result.file_path, "test_audio.mp3");
} else {
panic!("Expected Audio result");
}
Expand All @@ -592,7 +592,7 @@ mod tests {
.unwrap();

// Insert Audio data
let audio_chunk_id = db.insert_audio_chunk("test_audio.wav").await.unwrap();
let audio_chunk_id = db.insert_audio_chunk("test_audio.mp3").await.unwrap();
db.insert_audio_transcription(audio_chunk_id, "Hello from audio", 0)
.await
.unwrap();
Expand Down

0 comments on commit 510e3a5

Please sign in to comment.