Skip to content

Commit

Permalink
Refactor audio/video background
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowin committed Feb 16, 2024
1 parent 522194c commit e754b04
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 152 deletions.
10 changes: 6 additions & 4 deletions cookbook/flows/insurance_claim.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
authored by: @kevingrismore and @zzstoatzz
"""

from enum import Enum
from typing import TypeVar

Expand Down Expand Up @@ -52,11 +53,11 @@ def build_damage_report_model(damages: list[DamagedPart]) -> type[M]:
@task(cache_key_fn=task_input_hash)
def marvin_extract_damages_from_url(image_url: str) -> list[DamagedPart]:
return marvin.beta.extract(
data=marvin.beta.Image(image_url),
data=marvin.beta.Image.from_url(image_url),
target=DamagedPart,
instructions=(
"Give extremely brief, high-level descriptions of the damage."
" Only include the 2 most significant damages, which may also be minor and/or moderate."
"Give extremely brief, high-level descriptions of the damage. Only include"
" the 2 most significant damages, which may also be minor and/or moderate."
# only want 2 damages for purposes of this example
),
)
Expand All @@ -75,7 +76,8 @@ def submit_damage_report(report: M, car: Car):
description=f"## Latest damage report for car {car.id}",
)
print(
f"See your artifact in the UI: {PREFECT_UI_URL.value()}/artifacts/artifact/{uuid}"
"See your artifact in the UI:"
f" {PREFECT_UI_URL.value()}/artifacts/artifact/{uuid}"
)


Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ audio = [
"SpeechRecognition>=3.10",
"PyAudio>=0.2.11",
"playsound >= 1.0",
"pydub >= 0.25",
]
video = [
"opencv-python >= 4.5",
]
slackbot = ["marvin[prefect]", "numpy", "marvin[chromadb]"]

Expand Down
244 changes: 116 additions & 128 deletions src/marvin/audio.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
"""Utilities for working with audio."""

import collections
import io
import queue
import tempfile
import threading
from typing import Callable, Optional
from typing import Optional

from pydantic import BaseModel, Field
import pydub
import pydub.silence

from marvin.types import Audio
from marvin.utilities.logging import get_logger

logger = get_logger(__name__)
try:
import speech_recognition as sr
from playsound import playsound
Expand All @@ -21,6 +21,8 @@
' "marvin[audio]"` to use this module.'
)

logger = get_logger(__name__)


def play_audio(audio: bytes):
"""
Expand Down Expand Up @@ -73,6 +75,7 @@ def record_audio(duration: int = None) -> Audio:
frames.close()
audio = sr.audio.AudioData(frame_data, source.SAMPLE_RATE, source.SAMPLE_WIDTH)

return audio
return Audio(data=audio.get_wav_data(), format="wav")


Expand Down Expand Up @@ -111,149 +114,134 @@ def record_phrase(
return Audio(data=audio.get_wav_data(), format="wav")


class AudioPayload(BaseModel):
model_config: dict = dict(arbitrary_types_allowed=True)
audio: Audio
audio_buffer: list[Audio] = Field(
description="A buffer of the last 10 audio samples."
def remove_silence(audio: sr.AudioData) -> Optional[Audio]:
# Convert the recorded audio data to a pydub AudioSegment
audio_segment = pydub.AudioSegment(
data=audio.get_wav_data(),
sample_width=audio.sample_width,
frame_rate=audio.sample_rate,
channels=1,
)
recognizer: sr.Recognizer
stop_recording: Callable

# Adjust the silence threshold and minimum silence length as needed
silence_threshold = -40 # dB
min_silence_len = 400 # milliseconds

# Split the audio_segment where silence is detected
chunks = pydub.silence.split_on_silence(
audio_segment,
min_silence_len=min_silence_len,
silence_thresh=silence_threshold,
keep_silence=100,
)

class BackgroundRecorder(BaseModel):
is_recording: bool = False
stop_recording: Optional[Callable] = None
if chunks:
return Audio(data=sum(chunks).raw_data, format="wav")

def record(

class BackgroundAudioRecorder:
def __init__(self):
self.is_recording = False
self.queue = queue.Queue()
self._stop_event = None
self._thread = None

def __len__(self) -> int:
return self.queue.qsize()

def __iter__(self) -> "BackgroundAudioRecorder":
return self

def __next__(self) -> Audio:
while True:
if not self.is_recording and self.queue.empty():
raise StopIteration
try:
return self.queue.get(timeout=0.25)
except queue.Empty:
continue

def _record_thread(
self, max_phrase_duration: Optional[int], adjust_for_ambient_noise: bool
):
r = sr.Recognizer()
m = sr.Microphone()
with m as source:
if adjust_for_ambient_noise:
r.adjust_for_ambient_noise(source)

logger.info("Recording started.")
while not self._stop_event.is_set():
try:
audio = r.listen(
source, timeout=1, phrase_time_limit=max_phrase_duration
)
if processed_audio := remove_silence(audio):
self.queue.put(processed_audio)
# listening timed out, just try again
except sr.exceptions.WaitTimeoutError:
continue

def start_recording(
self,
callback: Callable[[AudioPayload], None],
max_phrase_duration: int = None,
adjust_for_ambient_noise: bool = True,
default_wait_for_stop: bool = True,
clear_queue: bool = False,
):
"""
Start a background thread to record phrases and invoke a callback with each.
Parameters:
callback (Callable): Function to call with AudioPayload for
each phrase.
max_phrase_duration (int, optional): Max phrase duration. None for no
limit.
adjust_for_ambient_noise (bool, optional): Adjust sensitivity to ambient
noise. Defaults to True. (Adds minor latency during calibration)
default_wait_for_stop (bool, optional): When the stop function is called,
this determines the default behavior of whether to wait for the
background thread to finish. Defaults to True.
Returns:
Callable: Function to stop background recording.
"""
if self.is_recording:
raise ValueError("Recording is already in progress.")
r = sr.Recognizer()
m = sr.Microphone()
if adjust_for_ambient_noise:
with m as source:
r.adjust_for_ambient_noise(source)

def stop_recording(wait_for_stop=None):
if wait_for_stop is None:
wait_for_stop = default_wait_for_stop
self.is_recording = False
if wait_for_stop:
logger.debug("Waiting for background thread to finish...")
listener_thread.join(
timeout=3
) # block until the background thread is done, which can take around 1 second
logger.info("Recording finished.")

self.stop_recording = stop_recording

def callback_wrapper(payload):
"""Run the callback in a separate thread to avoid blocking."""
callback_thread = threading.Thread(target=callback, args=(payload,))
callback_thread.daemon = True
logger.debug("Running callback...")
callback_thread.start()

def threaded_listen():
with m as source:
audio_buffer = collections.deque(maxlen=10)
while self.is_recording:
try: # listen for 1 second, then check again if the stop function has been called
audio = r.listen(source, 1, max_phrase_duration)
audio = Audio(data=audio.get_wav_data(), format="wav")
audio_buffer.append(audio)
except sr.exceptions.WaitTimeoutError:
# listening timed out, just try again
pass
else:
payload = AudioPayload(
audio=audio,
audio_buffer=audio_buffer,
recognizer=r,
stop_recording=stop_recording,
)
# run callback in thread
callback_wrapper(payload)

if max_phrase_duration is None:
max_phrase_duration = 5
if clear_queue:
self.queue.queue.clear()
self.is_recording = True
listener_thread = threading.Thread(target=threaded_listen)
listener_thread.daemon = True
listener_thread.start()
logger.info("Recording...")
return self
self._stop_event = threading.Event()
self._thread = threading.Thread(
target=self._record_thread,
args=(max_phrase_duration, adjust_for_ambient_noise),
)
self._thread.daemon = True
self._thread.start()

def stop_recording(self, wait: bool = True):
if not self.is_recording:
raise ValueError("Recording is not in progress.")
self._stop_event.set()
if wait:
self._thread.join()
logger.info("Recording finished.")
self._is_recording = False


def transcribe_live(
callback: Callable[[str], None] = None, stop_phrase: str = None
) -> BackgroundRecorder:
def record_background(
max_phrase_duration: int = None, adjust_for_ambient_noise: bool = True
) -> BackgroundAudioRecorder:
"""
Starts a live transcription service that transcribes audio in real-time and
calls a callback function with the transcribed text.
The function starts a background task in a thread that continuously records audio and
transcribes it into text. The transcribed text is then passed to the
provided callback function. Note that the callback must be threadsafe.
Start a background task that continuously records audio and stores it in a queue.
Args:
callback (Callable[[str], None], optional): A function that is called
with the transcribed text as its argument. If no callback is provided,
the transcribed text will be printed to the console. Defaults to None.
stop_phrase (str, optional): A phrase that, when spoken, will stop recording.
max_phrase_duration (int, optional): The maximum duration of a phrase to record.
Defaults to 5.
adjust_for_ambient_noise (bool, optional): Adjust recognizer sensitivity to
ambient noise. Defaults to True.
Returns:
BackgroundRecorder: The background recorder instance that is recording audio.
"""
if callback is None:
callback = lambda t: print(f">> {t}") # noqa E731
transcription_buffer = collections.deque(maxlen=3)

import marvin.audio

def audio_callback(payload: marvin.audio.AudioPayload) -> None:
buffer_str = (
"\n\n".join(transcription_buffer)
if transcription_buffer
else "<no audio received yet>"
)
transcription = marvin.transcribe(
payload.audio,
prompt=(
"The audio is being spoken directly into the microphone. For context"
" only, here is the transcription up to this point. Do not simply"
f" repeat it. \n\n<START HISTORY>\n\n{buffer_str}\n\n<END HISTORY>\n\n"
),
)
transcription_buffer.append(transcription or "")
if transcription:
callback(transcription)
if stop_phrase and stop_phrase.lower() in transcription.lower():
logger.debug("Stop phrase detected, stopping recording...")
payload.stop_recording()
Example:
```python
import marvin.audio
clips = marvin.audio.record_background()
for clip in clips:
print(marvin.transcribe(clip))
recorder = BackgroundRecorder()
recorder.record(audio_callback, max_phrase_duration=10, default_wait_for_stop=False)
if some_condition:
clips.stop()
```
"""
recorder = BackgroundAudioRecorder()
recorder.start_recording(
max_phrase_duration=max_phrase_duration,
adjust_for_ambient_noise=adjust_for_ambient_noise,
)
return recorder
Loading

0 comments on commit e754b04

Please sign in to comment.