Skip to content

Commit

Permalink
v0 event system
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Sep 19, 2024
1 parent b161498 commit e7b88f3
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 41 deletions.
2 changes: 2 additions & 0 deletions screenpipe-audio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ webrtc-vad = "0.4.0"
# Deepgram
reqwest = { workspace = true }

uuid = { version = "1.10.0", features = ["v4"] }

screenpipe-core = { path = "../screenpipe-core" }

[target.'cfg(target_os = "windows")'.dependencies]
Expand Down
10 changes: 7 additions & 3 deletions screenpipe-audio/src/bin/screenpipe-audio-forever.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use screenpipe_audio::default_output_device;
use screenpipe_audio::list_audio_devices;
use screenpipe_audio::parse_audio_device;
use screenpipe_audio::record_and_transcribe;
use screenpipe_audio::vad_engine::VadSensitivity;
use screenpipe_audio::vad_engine::create_vad_engine;
use screenpipe_audio::vad_engine::VadEngine;
use screenpipe_audio::AudioDevice;
use screenpipe_audio::AudioTranscriptionEngine;
use screenpipe_audio::VadEngineEnum;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -80,13 +82,15 @@ async fn main() -> Result<()> {
return Err(anyhow!("No audio input devices found"));
}

let vad_engine_mutex: Arc<Mutex<Box<dyn VadEngine + Send>>> =
Arc::new(Mutex::new(create_vad_engine(VadEngineEnum::Silero).await?));

let chunk_duration = Duration::from_secs_f32(args.audio_chunk_duration);
let (whisper_sender, mut whisper_receiver, _) = create_whisper_channel(
Arc::new(AudioTranscriptionEngine::WhisperDistilLargeV3),
VadEngineEnum::Silero, // Or VadEngineEnum::WebRtc, hardcoded for now
args.deepgram_api_key,
&PathBuf::from("output.mp4"),
VadSensitivity::Medium,
vad_engine_mutex,
)
.await?;
// Spawn threads for each device
Expand Down
11 changes: 8 additions & 3 deletions screenpipe-audio/src/bin/screenpipe-audio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use screenpipe_audio::default_output_device;
use screenpipe_audio::list_audio_devices;
use screenpipe_audio::parse_audio_device;
use screenpipe_audio::record_and_transcribe;
use screenpipe_audio::vad_engine::VadSensitivity;
use screenpipe_audio::vad_engine::create_vad_engine;
use screenpipe_audio::vad_engine::VadEngine;
use screenpipe_audio::AudioDevice;
use screenpipe_audio::AudioTranscriptionEngine;
use screenpipe_audio::VadEngineEnum;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -85,12 +87,15 @@ async fn main() -> Result<()> {

let chunk_duration = Duration::from_secs(10);
let output_path = PathBuf::from("output.mp4");

let vad_engine_mutex: Arc<Mutex<Box<dyn VadEngine + Send>>> =
Arc::new(Mutex::new(create_vad_engine(VadEngineEnum::Silero).await?));

let (whisper_sender, mut whisper_receiver, _) = create_whisper_channel(
Arc::new(AudioTranscriptionEngine::WhisperDistilLargeV3),
VadEngineEnum::WebRtc, // Or VadEngineEnum::WebRtc, hardcoded for now
deepgram_api_key,
&output_path,
VadSensitivity::Medium,
vad_engine_mutex,
)
.await?;
// Spawn threads for each device
Expand Down
7 changes: 4 additions & 3 deletions screenpipe-audio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
mod core;
pub mod encode;
pub mod meeting_detector;
mod multilingual;
pub mod pcm_decode;
pub mod stt;
pub mod vad_engine;
pub mod encode;
pub use core::{
default_input_device, default_output_device, list_audio_devices, parse_audio_device,
record_and_transcribe, AudioDevice, AudioTranscriptionEngine, DeviceControl, DeviceType
record_and_transcribe, AudioDevice, AudioTranscriptionEngine, DeviceControl, DeviceType,
};
pub use encode::encode_single_audio;
pub use pcm_decode::pcm_decode;
pub use stt::{create_whisper_channel, stt, AudioInput, TranscriptionResult, WhisperModel};
pub use vad_engine::VadEngineEnum;
pub use vad_engine::VadEngineEnum;
135 changes: 135 additions & 0 deletions screenpipe-audio/src/meeting_detector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::vad_engine::VadEngine;
use anyhow::Result;
use chrono::{DateTime, Duration, Utc};
use screenpipe_core::EventPayload;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tracing::info;
use uuid::Uuid;

const SILENCE_THRESHOLD: f32 = 0.01;
const ENERGY_THRESHOLD: f32 = 0.1;
const SILENCE_DURATION: Duration = Duration::seconds(180); // 3 minutes
const MEETING_START_ENERGY_DURATION: Duration = Duration::seconds(30);
const BUFFER_DURATION: Duration = Duration::seconds(300); // 5 minutes

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeetingEvent {
pub event_type: MeetingEventType,
pub timestamp: DateTime<Utc>,
pub meeting_id: String,
}

impl EventPayload for MeetingEvent {}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MeetingEventType {
Start,
End,
}

pub struct MeetingDetector {
vad: Arc<Mutex<Box<dyn VadEngine + Send>>>,
activity_buffer: VecDeque<AudioActivityData>,
silence_start: Option<DateTime<Utc>>,
meeting_start: Option<DateTime<Utc>>,
pub current_meeting_id: Option<String>,
}

struct AudioActivityData {
energy: f32,
is_speech: bool,
timestamp: DateTime<Utc>,
}

impl MeetingDetector {
pub async fn new(vad_engine: Arc<Mutex<Box<dyn VadEngine + Send>>>) -> Result<Self> {
Ok(Self {
vad: vad_engine,
activity_buffer: VecDeque::new(),
silence_start: None,
meeting_start: None,
current_meeting_id: None,
})
}

pub fn process_audio(&mut self, audio_frame: &[f32]) -> Result<Option<MeetingEvent>> {
let energy = self.calculate_energy(audio_frame);
let is_speech = self.vad.lock().unwrap().is_voice_segment(audio_frame)?;
let timestamp = Utc::now();

let activity = AudioActivityData {
energy,
is_speech,
timestamp,
};

self.activity_buffer.push_back(activity);

// Remove old data from the buffer
while self
.activity_buffer
.front()
.map_or(false, |a| timestamp - a.timestamp > BUFFER_DURATION)
{
self.activity_buffer.pop_front();
}

if !is_speech && energy < SILENCE_THRESHOLD {
if self.silence_start.is_none() {
self.silence_start = Some(timestamp);
}
} else {
self.silence_start = None;
}

if let Some(silence_start) = self.silence_start {
if timestamp - silence_start >= SILENCE_DURATION {
if let Some(meeting_id) = self.current_meeting_id.take() {
self.meeting_start = None;
info!("Meeting ended with id: {}", meeting_id);
return Ok(Some(MeetingEvent {
event_type: MeetingEventType::End,
timestamp,
meeting_id,
}));
}
}
}

if self.meeting_start.is_none() && self.detect_meeting_start() {
let meeting_id = Uuid::new_v4().to_string();
self.current_meeting_id = Some(meeting_id.clone());
self.meeting_start = Some(timestamp);
info!("Meeting started with id: {}", meeting_id);
return Ok(Some(MeetingEvent {
event_type: MeetingEventType::Start,
timestamp,
meeting_id,
}));
}

Ok(None)
}

fn calculate_energy(&self, audio_frame: &[f32]) -> f32 {
audio_frame
.iter()
.map(|&sample| sample * sample)
.sum::<f32>()
/ audio_frame.len() as f32
}

fn detect_meeting_start(&self) -> bool {
let high_energy_frames = self
.activity_buffer
.iter()
.rev()
.take_while(|a| Utc::now() - a.timestamp <= MEETING_START_ENERGY_DURATION)
.filter(|a| a.energy > ENERGY_THRESHOLD && a.is_speech)
.count();

high_energy_frames >= MEETING_START_ENERGY_DURATION.num_seconds() as usize / 2
}
}
22 changes: 7 additions & 15 deletions screenpipe-audio/src/stt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use rubato::{
};

use crate::{
encode_single_audio, multilingual,
vad_engine::{SileroVad, VadEngine, VadEngineEnum, VadSensitivity, WebRtcVad},
AudioDevice, AudioTranscriptionEngine,
encode_single_audio, multilingual, vad_engine::VadEngine, AudioDevice, AudioTranscriptionEngine,
};

use hound::{WavSpec, WavWriter};
Expand Down Expand Up @@ -521,14 +519,14 @@ pub fn stt_sync(
audio_input: &AudioInput,
whisper_model: &WhisperModel,
audio_transcription_engine: Arc<AudioTranscriptionEngine>,
vad_engine: Arc<Mutex<Box<dyn VadEngine + Send>>>, // Changed type here
vad_engine: Arc<Mutex<Box<dyn VadEngine + Send>>>,
deepgram_api_key: Option<String>,
output_path: &PathBuf,
) -> Result<(String, String)> {
let audio_input = audio_input.clone();
let whisper_model = whisper_model.clone();
let output_path = output_path.clone();
let vad_engine = vad_engine.clone(); // Clone the Arc to move into the closure
let vad_engine = vad_engine.clone();

let handle = std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
Expand All @@ -538,7 +536,7 @@ pub fn stt_sync(
&audio_input,
&whisper_model,
audio_transcription_engine,
&mut **vad_engine_guard, // Obtain &mut dyn VadEngine
&mut **vad_engine_guard,
deepgram_api_key,
&output_path,
))
Expand Down Expand Up @@ -821,14 +819,13 @@ use std::sync::atomic::{AtomicBool, Ordering};

pub async fn create_whisper_channel(
audio_transcription_engine: Arc<AudioTranscriptionEngine>,
vad_engine: VadEngineEnum,
deepgram_api_key: Option<String>,
output_path: &PathBuf,
vad_sensitivity: VadSensitivity,
vad_engine: Arc<Mutex<Box<dyn VadEngine + Send>>>,
) -> Result<(
UnboundedSender<AudioInput>,
UnboundedReceiver<TranscriptionResult>,
Arc<AtomicBool>, // Shutdown flag
Arc<AtomicBool>,
)> {
let whisper_model = WhisperModel::new(audio_transcription_engine.clone())?;
let (input_sender, mut input_receiver): (
Expand All @@ -839,12 +836,7 @@ pub async fn create_whisper_channel(
UnboundedSender<TranscriptionResult>,
UnboundedReceiver<TranscriptionResult>,
) = unbounded_channel();
let mut vad_engine: Box<dyn VadEngine + Send> = match vad_engine {
VadEngineEnum::WebRtc => Box::new(WebRtcVad::new()),
VadEngineEnum::Silero => Box::new(SileroVad::new().await?),
};
vad_engine.set_sensitivity(vad_sensitivity);
let vad_engine = Arc::new(Mutex::new(vad_engine));

let shutdown_flag = Arc::new(AtomicBool::new(false));
let shutdown_flag_clone = shutdown_flag.clone();
let output_path = output_path.clone();
Expand Down
Loading

0 comments on commit e7b88f3

Please sign in to comment.