Skip to content

Commit

Permalink
endless streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
saltlas committed Jan 22, 2024
1 parent 18235f6 commit f790aa0
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 54 deletions.
Binary file modified commands/__pycache__/rotatecommand.cpython-311.pyc
Binary file not shown.
208 changes: 162 additions & 46 deletions googlething.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,49 +40,83 @@


# Audio recording parameters
STREAMING_LIMIT = 300000 # 5 minutes
RATE = 16000
CHUNK = int(RATE / 10) # 100ms


class MicrophoneStream:
class ResumableMicrophoneStream:
"""Opens a recording stream as a generator yielding the audio chunks."""

def __init__(self: object, rate: int = RATE, chunk: int = CHUNK) -> None:
"""The audio -- and generator -- is guaranteed to be on the main thread."""
self._rate = rate
self._chunk = chunk
def __init__(
self: object,
rate: int,
chunk_size: int,
) -> None:
"""Creates a resumable microphone stream.
Args:
self: The class instance.
rate: The audio file's sampling rate.
chunk_size: The audio file's chunk size.
# Create a thread-safe buffer of audio data
returns: None
"""
self._rate = rate
self.chunk_size = chunk_size
self._num_channels = 1
self._buff = queue.Queue()
self.closed = True

def __enter__(self: object) -> object:
self.start_time = time_utils.get_time_milliseconds()
self.restart_counter = 0
self.audio_input = []
self.last_audio_input = []
self.result_end_time = 0
self.is_final_end_time = 0
self.final_request_end_time = 0
self.bridging_offset = 0
self.last_transcript_was_final = False
self.new_stream = True
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
# The API currently only supports 1-channel (mono) audio
# https://goo.gl/z757pE
channels=1,
channels=self._num_channels,
rate=self._rate,
input=True,
frames_per_buffer=self._chunk,
frames_per_buffer=self.chunk_size,
# Run the audio stream asynchronously to fill the buffer object.
# This is necessary so that the input device's buffer doesn't
# overflow while the calling thread makes network requests, etc.
stream_callback=self._fill_buffer,
)

self.closed = False
def __enter__(self: object) -> object:
"""Opens the stream.
Args:
self: The class instance.
returns: None
"""
self.closed = False
return self

def __exit__(
self: object,
type: object,
value: object,
traceback: object,
) -> None:
"""Closes the stream, regardless of whether the connection was lost or not."""
) -> object:
"""Closes the stream and releases resources.
Args:
self: The class instance.
type: The exception type.
value: The exception value.
traceback: The exception traceback.
returns: None
"""
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
Expand All @@ -94,56 +128,85 @@ def __exit__(
def _fill_buffer(
self: object,
in_data: object,
frame_count: int,
time_info: object,
status_flags: object,
*args: object,
**kwargs: object,
) -> object:
"""Continuously collect data from the audio stream, into the buffer.
Args:
in_data: The audio data as a bytes object
frame_count: The number of frames captured
time_info: The time information
status_flags: The status flags
self: The class instance.
in_data: The audio data as a bytes object.
args: Additional arguments.
kwargs: Additional arguments.
Returns:
The audio data as a bytes object
returns: None
"""
self._buff.put(in_data)
return None, pyaudio.paContinue

def generator(self: object) -> object:
"""Generates audio chunks from the stream of audio data in chunks.
"""Stream Audio from microphone to API and to local buffer
Args:
self: The MicrophoneStream object
self: The class instance.
Returns:
A generator that outputs audio chunks.
returns:
The data from the audio stream.
"""
while not self.closed:
data = []

if self.new_stream and self.last_audio_input:
chunk_time = STREAMING_LIMIT / len(self.last_audio_input)

if chunk_time != 0:
if self.bridging_offset < 0:
self.bridging_offset = 0

if self.bridging_offset > self.final_request_end_time:
self.bridging_offset = self.final_request_end_time

chunks_from_ms = round(
(self.final_request_end_time - self.bridging_offset)
/ chunk_time
)

self.bridging_offset = round(
(len(self.last_audio_input) - chunks_from_ms) * chunk_time
)

for i in range(chunks_from_ms, len(self.last_audio_input)):
data.append(self.last_audio_input[i])

self.new_stream = False

# Use a blocking get() to ensure there's at least one chunk of
# data, and stop iteration if the chunk is None, indicating the
# end of the audio stream.
chunk = self._buff.get()
self.audio_input.append(chunk)

if chunk is None:
return
data = [chunk]

data.append(chunk)
# Now consume whatever other data's still buffered.
while True:
try:
chunk = self._buff.get(block=False)

if chunk is None:
return
data.append(chunk)
self.audio_input.append(chunk)

except queue.Empty:
break

yield b"".join(data)


def listen_print_loop(processor: object, mode: object, responses: object) -> str:

def listen_print_loop(processor: object, mode: object, responses: object, stream: object) -> str:
"""Iterates through server responses and prints them.
The responses passed is a generator that will block until a response
Expand All @@ -168,6 +231,12 @@ def listen_print_loop(processor: object, mode: object, responses: object) -> str


for response in responses:
if time_utils.get_time_milliseconds() - stream.start_time > STREAMING_LIMIT:
stream.start_time = time_utils.get_time_milliseconds()
num_chars_printed = 0
print("RESTARTING")
break

if not response.results:
continue

Expand All @@ -181,6 +250,7 @@ def listen_print_loop(processor: object, mode: object, responses: object) -> str
# Display the transcription of the top alternative.
transcript = result.alternatives[0].transcript


if mode == "interim":
if (result.stability == 0.0 or result.stability > 0.7) and num_chars_printed < len(transcript):

Expand All @@ -191,7 +261,16 @@ def listen_print_loop(processor: object, mode: object, responses: object) -> str
num_chars_printed = len(transcript)


result_seconds = 0
result_micros = 0

if result.result_end_time.seconds:
result_seconds = result.result_end_time.seconds

if result.result_end_time.microseconds:
result_micros = result.result_end_time.microseconds

stream.result_end_time = int((result_seconds * 1000) + (result_micros / 1000))

# Display interim results, but with a carriage return at the end of the
# line, so subsequent lines will overwrite them.
Expand All @@ -205,17 +284,26 @@ def listen_print_loop(processor: object, mode: object, responses: object) -> str
#sys.stdout.flush()

#num_chars_printed = len(transcript)
pass
if mode == "interim" and result.stability > 0.7:
stream.is_final_end_time = stream.result_end_time
stream.last_transcript_was_final = True
else:
stream.last_transcript_was_final = False

else:
#print(transcript + overwrite_chars)
if mode == "stable":

for word_info in result.alternatives[0].words:
end_time = time_utils.convert_timedelta_to_milliseconds(word_info.end_time)
timestamp = time_utils.add_offset(end_time, processor.init_time)

print(word_info.word)
processor.process_commands(word_info.word, timestamp)


stream.is_final_end_time = stream.result_end_time
stream.last_transcript_was_final = True

# Exit recognition if any of the transcribed phrases could be
# one of our keywords.

Expand Down Expand Up @@ -266,22 +354,50 @@ def main() -> None:
config=config, interim_results=True,
)

with MicrophoneStream(RATE, CHUNK) as stream:
audio_generator = stream.generator()
requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
)
mic_manager = ResumableMicrophoneStream(RATE, CHUNK)


command_processor = None

with mic_manager as stream:

while not stream.closed:


stream.audio_input = []
audio_generator = stream.generator()

requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
)

init_time = time_utils.get_time()
print("!!", init_time)
if not command_processor:
command_processor = processcommands.CommandProcessor(init_time)
else:
command_processor.init_time = init_time

responses = client.streaming_recognize(streaming_config, requests)

# Now, put the transcription responses to use.
listen_print_loop(command_processor, transcription_mode, responses, stream)

if stream.result_end_time > 0:
stream.final_request_end_time = stream.is_final_end_time
stream.result_end_time = 0
stream.last_audio_input = []
stream.last_audio_input = stream.audio_input
stream.audio_input = []
stream.restart_counter = stream.restart_counter + 1

stream.new_stream = True




init_time = time_utils.get_time()
print("!!", init_time)
command_processor = processcommands.CommandProcessor(init_time)
responses = client.streaming_recognize(streaming_config, requests)
print(init_time)

# Now, put the transcription responses to use.
listen_print_loop(command_processor, transcription_mode, responses)


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions processcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def process_commands(self, word_str, time):
if cmd.check_current_keyword(word_str):
msg = cmd.action(time, word_str)
if msg:
print(self.active_command)
self.websocketclient.send_message(msg)
if cmd.finished:
self.active_command = None
Expand All @@ -31,6 +32,7 @@ def process_commands(self, word_str, time):




def close(self):
self.websocketclient.close()

Expand Down
Binary file modified utils/__pycache__/phrase_utils.cpython-311.pyc
Binary file not shown.
Binary file modified utils/__pycache__/time_utils.cpython-311.pyc
Binary file not shown.
18 changes: 12 additions & 6 deletions utils/phrase_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from wildcards import wildcardsdict
def init_PhraseSet(client, phrases, project_number, project_id, wildcards):
phraseset_id = "-".join(["-".join(phrase.split(" ")) for phrase in sorted(phrases)]).lower()
from utils import wildcards
import hashlib
def init_PhraseSet(client, phrases, project_number, project_id, wildcards_config):
wildcards_config = ["select_word", "move_word"]
wildcard_terms = sum([wildcards.wildcardsdict[wildcard] for wildcard in wildcards_config], [])
phraseset_string = "-".join(["-".join(phrase.split(" ")) for phrase in sorted(phrases + wildcard_terms)]).lower()
phraseset_id = str(int(hashlib.sha256(phraseset_string.encode()).hexdigest(), 16) % 10**8)
print(phraseset_id)
print(phraseset_string)

try:
response = client.get_phrase_set({"name": f"projects/{project_number}/locations/global/phraseSets/{phraseset_id}"})
print("existing phrase set found, loading...")
Expand All @@ -14,9 +21,8 @@ def init_PhraseSet(client, phrases, project_number, project_id, wildcards):
for word in range(len(split_phrase)):
values.add(split_phrase[word])
values.add(" ".join(split_phrase[:word + 1]))
for wildcard in wildcards:
for term in wildcardsdict[wildcard]:
values.add(term)
for term in wildcard_terms:
values.add(term)

print(values)
json_phrases = [{"value": term} for term in values]
Expand Down
7 changes: 5 additions & 2 deletions utils/time_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta, datetime

import time


def add_offset(offset, init_time, unit="milliseconds"):
Expand All @@ -14,4 +14,7 @@ def convert_timedelta_to_milliseconds(time_to_convert):
return time_to_convert / timedelta(milliseconds = 1)

def get_time():
return datetime.now()
return datetime.now()

def get_time_milliseconds():
return time.time() * 1000

0 comments on commit f790aa0

Please sign in to comment.