-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreaming2.py
172 lines (141 loc) · 6.07 KB
/
streaming2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved.
# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
# SPDX-License-Identifier: MIT
from signal import SIGINT, SIGTERM
import asyncio
import os
from dotenv import load_dotenv
import logging
from deepgram.utils import verboselogs
from time import sleep
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
Microphone,
)
load_dotenv()
DEEPGRAM_API_KEY = os.getenv('DEEPGRAM_API_KEY')
# We will collect the is_final=true messages here so we can use them when the person finishes speaking
is_finals = []
async def main():
try:
loop = asyncio.get_event_loop()
for signal in (SIGTERM, SIGINT):
loop.add_signal_handler(
signal,
lambda: asyncio.create_task(
shutdown(signal, loop, dg_connection, microphone)
),
)
# example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
config: DeepgramClientOptions = DeepgramClientOptions(
options={"keepalive": "true"}
)
deepgram: DeepgramClient = DeepgramClient("", config)
# otherwise, use default config
# deepgram: DeepgramClient = DeepgramClient()
dg_connection = deepgram.listen.asyncwebsocket.v("1")
async def on_open(self, open, **kwargs):
print("Connection Open")
async def on_message(self, result, **kwargs):
global is_finals
sentence = result.channel.alternatives[0].transcript
if len(sentence) == 0:
return
if result.is_final:
# We need to collect these and concatenate them together when we get a speech_final=true
# See docs: https://developers.deepgram.com/docs/understand-endpointing-interim-results
is_finals.append(sentence)
# Speech Final means we have detected sufficent silence to consider this end of speech
# Speech final is the lowest latency result as it triggers as soon an the endpointing value has triggered
if result.speech_final:
utterance = " ".join(is_finals)
print(f"Speech Final: {utterance}")
is_finals = []
else:
# These are useful if you need real time captioning and update what the Interim Results produced
print(f"Is Final: {sentence}")
else:
# These are useful if you need real time captioning of what is being spoken
print(f"Interim Results: {sentence}")
async def on_metadata(self, metadata, **kwargs):
print(f"Metadata: {metadata}")
async def on_speech_started(self, speech_started, **kwargs):
print("Speech Started")
async def on_utterance_end(self, utterance_end, **kwargs):
print("Utterance End")
global is_finals
if len(is_finals) > 0:
utterance = " ".join(is_finals)
print(f"Utterance End: {utterance}")
is_finals = []
async def on_close(self, close, **kwargs):
print("Connection Closed")
async def on_error(self, error, **kwargs):
print(f"Handled Error: {error}")
async def on_unhandled(self, unhandled, **kwargs):
print(f"Unhandled Websocket Message: {unhandled}")
dg_connection.on(LiveTranscriptionEvents.Open, on_open)
dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata)
dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started)
dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end)
dg_connection.on(LiveTranscriptionEvents.Close, on_close)
dg_connection.on(LiveTranscriptionEvents.Error, on_error)
dg_connection.on(LiveTranscriptionEvents.Unhandled, on_unhandled)
# connect to websocket
options: LiveOptions = LiveOptions(
model="nova-2",
language="en-US",
# Apply smart formatting to the output
smart_format=True,
# Raw audio format deatils
encoding="linear16",
channels=1,
sample_rate=16000,
# To get UtteranceEnd, the following must be set:
interim_results=True,
utterance_end_ms="1000",
vad_events=True,
# Time in milliseconds of silence to wait for before finalizing speech
endpointing=300,
)
addons = {
# Prevent waiting for additional numbers
"no_delay": "true"
}
print("\n\nStart talking! Press Ctrl+C to stop...\n")
if await dg_connection.start(options, addons=addons) is False:
print("Failed to connect to Deepgram")
return
# Open a microphone stream on the default input device
microphone = Microphone(dg_connection.send)
# start microphone
microphone.start()
# wait until cancelled
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
# This block will be executed when the shutdown coroutine cancels all tasks
pass
finally:
microphone.finish()
await dg_connection.finish()
print("Finished")
except Exception as e:
print(f"Could not open socket: {e}")
return
async def shutdown(signal, loop, dg_connection, microphone):
print(f"Received exit signal {signal.name}...")
microphone.finish()
await dg_connection.finish()
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
print(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
print("Shutdown complete.")
asyncio.run(main())