Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactors interruptions into the output device #25

Closed
wants to merge 9 commits into from
8 changes: 4 additions & 4 deletions playground/streaming/synthesizer/synthesize.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from vocode.streaming.models.message import BaseMessage
from vocode.streaming.models.synthesizer import AzureSynthesizerConfig
from vocode.streaming.output_device.base_output_device import BaseOutputDevice
from vocode.streaming.output_device.speaker_output import SpeakerOutput
from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice
from vocode.streaming.output_device.blocking_speaker_output import BlockingSpeakerOutput
from vocode.streaming.synthesizer.azure_synthesizer import AzureSynthesizer
from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer
from vocode.streaming.utils import get_chunk_size_per_second
Expand All @@ -19,7 +19,7 @@

async def speak(
synthesizer: BaseSynthesizer,
output_device: BaseOutputDevice,
output_device: AbstractOutputDevice,
message: BaseMessage,
):
message_sent = message.text
Expand Down Expand Up @@ -58,7 +58,7 @@ async def speak(
return message_sent, cut_off

async def main():
speaker_output = SpeakerOutput.from_default_device()
speaker_output = BlockingSpeakerOutput.from_default_device()
synthesizer = AzureSynthesizer(AzureSynthesizerConfig.from_output_device(speaker_output))
try:
while True:
Expand Down
1 change: 0 additions & 1 deletion quickstarts/streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ async def main():
speaker_output,
) = create_streaming_microphone_input_and_speaker_output(
use_default_devices=False,
use_blocking_speaker_output=True, # this moves the playback to a separate thread, set to False to use the main thread
)

conversation = StreamingConversation(
Expand Down
4 changes: 2 additions & 2 deletions tests/fakedata/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from vocode.streaming.models.message import BaseMessage
from vocode.streaming.models.synthesizer import PlayHtSynthesizerConfig, SynthesizerConfig
from vocode.streaming.models.transcriber import DeepgramTranscriberConfig, TranscriberConfig
from vocode.streaming.output_device.base_output_device import BaseOutputDevice
from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice
from vocode.streaming.streaming_conversation import StreamingConversation
from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer
from vocode.streaming.telephony.constants import DEFAULT_CHUNK_SIZE, DEFAULT_SAMPLING_RATE
Expand All @@ -35,7 +35,7 @@
)


class DummyOutputDevice(BaseOutputDevice):
class DummyOutputDevice(AbstractOutputDevice):
def consume_nonblocking(self, chunk: bytes):
pass

Expand Down
4 changes: 2 additions & 2 deletions tests/streaming/test_streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from vocode.streaming.models.events import Sender
from vocode.streaming.models.transcriber import Transcription
from vocode.streaming.models.transcript import ActionStart, Message, Transcript
from vocode.streaming.utils.worker import AsyncWorker
from vocode.streaming.utils.worker import AbstractAsyncWorker


class ShouldIgnoreUtteranceTestCase(BaseModel):
Expand All @@ -25,7 +25,7 @@ class ShouldIgnoreUtteranceTestCase(BaseModel):
expected: bool


async def _consume_worker_output(worker: AsyncWorker, timeout: float = 0.1):
async def _consume_worker_output(worker: AbstractAsyncWorker, timeout: float = 0.1):
try:
return await asyncio.wait_for(worker.output_queue.get(), timeout=timeout)
except asyncio.TimeoutError:
Expand Down
11 changes: 2 additions & 9 deletions vocode/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from vocode.streaming.output_device.blocking_speaker_output import (
BlockingSpeakerOutput as BlockingStreamingSpeakerOutput,
)
from vocode.streaming.output_device.speaker_output import SpeakerOutput as StreamingSpeakerOutput
from vocode.turn_based.input_device.microphone_input import (
MicrophoneInput as TurnBasedMicrophoneInput,
)
Expand All @@ -31,15 +30,10 @@ def create_streaming_microphone_input_and_speaker_output(
output_device_name: Optional[str] = None,
mic_sampling_rate=None,
speaker_sampling_rate=None,
use_blocking_speaker_output=False,
):
return _create_microphone_input_and_speaker_output(
microphone_class=StreamingMicrophoneInput,
speaker_class=(
BlockingStreamingSpeakerOutput
if use_blocking_speaker_output
else StreamingSpeakerOutput
),
speaker_class=BlockingStreamingSpeakerOutput,
use_default_devices=use_default_devices,
input_device_name=input_device_name,
output_device_name=output_device_name,
Expand Down Expand Up @@ -70,7 +64,6 @@ def _create_microphone_input_and_speaker_output(
microphone_class: typing.Type[Union[StreamingMicrophoneInput, TurnBasedMicrophoneInput]],
speaker_class: typing.Type[
Union[
StreamingSpeakerOutput,
BlockingStreamingSpeakerOutput,
TurnBasedSpeakerOutput,
]
Expand All @@ -83,7 +76,7 @@ def _create_microphone_input_and_speaker_output(
) -> Union[
Tuple[
StreamingMicrophoneInput,
Union[StreamingSpeakerOutput, BlockingStreamingSpeakerOutput],
Union[BlockingStreamingSpeakerOutput],
],
Tuple[TurnBasedMicrophoneInput, TurnBasedSpeakerOutput],
]:
Expand Down
2 changes: 1 addition & 1 deletion vocode/streaming/client_backend/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(
async def handle_event(self, event: Event):
if event.type == EventType.TRANSCRIPT:
transcript_event = typing.cast(TranscriptEvent, event)
self.output_device.consume_transcript(transcript_event)
await self.output_device.send_transcript(transcript_event)
# logger.debug(event.dict())

def restart(self, output_device: WebsocketOutputDevice):
Expand Down
4 changes: 2 additions & 2 deletions vocode/streaming/models/synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .audio import AudioEncoding, SamplingRate
from .model import BaseModel, TypedModel
from vocode.streaming.models.client_backend import OutputAudioConfig
from vocode.streaming.output_device.base_output_device import BaseOutputDevice
from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice
from vocode.streaming.telephony.constants import DEFAULT_AUDIO_ENCODING, DEFAULT_SAMPLING_RATE


Expand Down Expand Up @@ -46,7 +46,7 @@ class Config:
arbitrary_types_allowed = True

@classmethod
def from_output_device(cls, output_device: BaseOutputDevice, **kwargs):
def from_output_device(cls, output_device: AbstractOutputDevice, **kwargs):
return cls(
sampling_rate=output_device.sampling_rate,
audio_encoding=output_device.audio_encoding,
Expand Down
20 changes: 20 additions & 0 deletions vocode/streaming/output_device/abstract_output_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from abc import abstractmethod
import asyncio
from vocode.streaming.output_device.audio_chunk import AudioChunk
from vocode.streaming.utils.worker import AbstractAsyncWorker, InterruptibleEvent


class AbstractOutputDevice(AbstractAsyncWorker[InterruptibleEvent[AudioChunk]]):

def __init__(self, sampling_rate: int, audio_encoding):
super().__init__(input_queue=asyncio.Queue())
self.sampling_rate = sampling_rate
self.audio_encoding = audio_encoding

@abstractmethod
async def play(self, chunk: bytes):
pass

@abstractmethod
def interrupt(self):
pass
28 changes: 28 additions & 0 deletions vocode/streaming/output_device/audio_chunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dataclasses import dataclass, field
from enum import Enum
from uuid import UUID
import uuid


class ChunkState(int, Enum):
UNPLAYED = 0
PLAYED = 1
INTERRUPTED = 2


@dataclass
class AudioChunk:
data: bytes
state: ChunkState = ChunkState.UNPLAYED
chunk_id: UUID = field(default_factory=uuid.uuid4)

@staticmethod
def on_play():
pass

@staticmethod
def on_interrupt():
pass

def __hash__(self) -> int:
return hash(self.chunk_id)
16 changes: 0 additions & 16 deletions vocode/streaming/output_device/base_output_device.py

This file was deleted.

69 changes: 46 additions & 23 deletions vocode/streaming/output_device/blocking_speaker_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,30 @@
import sounddevice as sd

from vocode.streaming.models.audio import AudioEncoding
from vocode.streaming.output_device.base_output_device import BaseOutputDevice
from vocode.streaming.output_device.rate_limit_interruptions_output_device import (
RateLimitInterruptionsOutputDevice,
)
from vocode.streaming.utils.worker import ThreadAsyncWorker

DEFAULT_SAMPLING_RATE = 44100

class BlockingSpeakerOutput(BaseOutputDevice, ThreadAsyncWorker):
DEFAULT_SAMPLING_RATE = 44100

def __init__(
self,
device_info: dict,
sampling_rate: Optional[int] = None,
audio_encoding: AudioEncoding = AudioEncoding.LINEAR16,
):
class _PlaybackWorker(ThreadAsyncWorker[bytes]):

def __init__(self, *, device_info: dict, sampling_rate: int):
super().__init__(input_queue=asyncio.Queue())
self.sampling_rate = sampling_rate
self.device_info = device_info
sampling_rate = sampling_rate or int(
self.device_info.get("default_samplerate", self.DEFAULT_SAMPLING_RATE)
)
self.input_queue: asyncio.Queue[bytes] = asyncio.Queue()
BaseOutputDevice.__init__(self, sampling_rate, audio_encoding)
ThreadAsyncWorker.__init__(self, self.input_queue)
self.input_queue.put_nowait(self.sampling_rate * b"\x00")
self.stream = sd.OutputStream(
channels=1,
samplerate=self.sampling_rate,
dtype=np.int16,
device=int(self.device_info["index"]),
)
self._ended = False
self.input_queue.put_nowait(self.sampling_rate * b"\x00")
self.stream.start()

def start(self):
ThreadAsyncWorker.start(self)

def _run_loop(self):
while not self._ended:
try:
Expand All @@ -47,10 +38,42 @@ def _run_loop(self):
except queue.Empty:
continue

def consume_nonblocking(self, chunk):
ThreadAsyncWorker.consume_nonblocking(self, chunk)

def terminate(self):
self._ended = True
ThreadAsyncWorker.terminate(self)
super().terminate()
self.stream.close()


class BlockingSpeakerOutput(RateLimitInterruptionsOutputDevice):
DEFAULT_SAMPLING_RATE = 44100

def __init__(
self,
device_info: dict,
sampling_rate: Optional[int] = None,
audio_encoding: AudioEncoding = AudioEncoding.LINEAR16,
):
sampling_rate = sampling_rate or int(
device_info.get("default_samplerate", DEFAULT_SAMPLING_RATE)
)
super().__init__(sampling_rate=sampling_rate, audio_encoding=audio_encoding)
self.playback_worker = _PlaybackWorker(device_info=device_info, sampling_rate=sampling_rate)
self.input_queue: asyncio.Queue[bytes] = asyncio.Queue()

async def play(self, chunk):
self.playback_worker.consume_nonblocking(chunk)

def start(self) -> asyncio.Task:
self.playback_worker.start()
return super().start()

def terminate(self):
self.playback_worker.terminate()
super().terminate()

@classmethod
def from_default_device(
cls,
**kwargs,
):
return cls(sd.query_devices(kind="output"), **kwargs)
14 changes: 11 additions & 3 deletions vocode/streaming/output_device/file_output_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

import numpy as np

from .base_output_device import BaseOutputDevice
from vocode.streaming.output_device.rate_limit_interruptions_output_device import (
RateLimitInterruptionsOutputDevice,
)

from vocode.streaming.models.audio import AudioEncoding
from vocode.streaming.utils.worker import ThreadAsyncWorker

Expand All @@ -27,7 +30,7 @@ def terminate(self):
self.wav.close()


class FileOutputDevice(BaseOutputDevice):
class FileOutputDevice(RateLimitInterruptionsOutputDevice):
DEFAULT_SAMPLING_RATE = 44100

def __init__(
Expand All @@ -47,9 +50,13 @@ def __init__(
self.wav = wav

self.thread_worker = FileWriterWorker(self.queue, wav)

def start(self) -> asyncio.Task:
self.thread_worker.start()
return super().start()

def consume_nonblocking(self, chunk):
async def play(self, chunk: bytes):
# TODO (output device refactor): just dispatch out into a thread to write to the file per block, doesn't need a worker
chunk_arr = np.frombuffer(chunk, dtype=np.int16)
for i in range(0, chunk_arr.shape[0], self.blocksize):
block = np.zeros(self.blocksize, dtype=np.int16)
Expand All @@ -59,3 +66,4 @@ def consume_nonblocking(self, chunk):

def terminate(self):
self.thread_worker.terminate()
super().terminate()
Loading
Loading