-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodem.py
344 lines (282 loc) · 11.5 KB
/
modem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
"""
Python 3 interface for Conexant CX93001 chipset based voice modems.
Forked from https://github.com/havocsec/cx93001
"""
__author__ = "samhinshaw"
__version__ = "0.0.4"
__license__ = "GPLv3"
import sys
import os
import time
import wave
import subprocess
import fcntl
from datetime import datetime
import serial
from pydub import AudioSegment
from threading import Timer
import reactivex as rx
from reactivex.observable import Observable
from reactivex.subject import Subject
# from reactivex.observer import ObserverBase
from reactivex import operators as ops
class CouldNotInitializeException(Exception):
pass
class CX93001:
"""Main modem class
Main class to interface with Conexant CX93001 chipset based voicemodems.
"""
__con: serial.Serial
__modem_name: str
__listening: bool = True
__output: Subject[str] = Subject()
# output: ObserverBase[str]
def __init__(self, modem_name: str, port="/dev/ttyACM0", baudrate=115200):
"""Constructor
Class constructor that accepts a custom serial port and baudrate.
By default, parameters are set to 8N1 comm @ 115200 bauds, no
XONXOFF, no RTS/CTS, no DST/DTR and no write timeout. Then, the
serial connection is open and the modem configuration is reset.
Finally, the verbosity, echoing and caller ID are enabled through
AT commands.
"""
self.output = self.__output.as_observer()
self.__modem_name = modem_name
self.__con = serial.Serial(
port=port,
baudrate=57600, # 57600 # 9600 #115200
bytesize=serial.EIGHTBITS, # number of bits per bytes
parity=serial.PARITY_NONE, # set parity check: no parity
stopbits=serial.STOPBITS_ONE, # number of stop bits
timeout=3, # non-block read
xonxoff=False, # disable software flow control
rtscts=False, # disable hardware (RTS/CTS) flow control
dsrdtr=False, # disable hardware (DSR/DTR) flow control
writeTimeout=3, # timeout for write
)
# self.__con = serial.Serial(
# # Set the port and the baudrate
# port=port,
# baudrate=baudrate,
# # Set 8N1
# bytesize=serial.EIGHTBITS,
# parity=serial.PARITY_NONE,
# stopbits=serial.STOPBITS_ONE,
# timeout=3,
# xonxoff=False,
# rtscts=False,
# dsrdtr=False,
# writeTimeout=False,
# )
# Try to open the connection
try:
self.__con.flushInput()
self.__con.flushOutput()
if not self.__con.isOpen():
self.__con.open()
except serial.SerialException:
raise CouldNotInitializeException("Could not open a serial connection")
# Try to initialize
if not self.__at("ATE1"):
raise CouldNotInitializeException("Could not enable command echoing")
if not self.__at("AT"):
raise CouldNotInitializeException("Could not execute AT commands")
if not self.__at("AT&F0"):
raise CouldNotInitializeException("Could not reset to default state")
if not self.__at("ATV1"):
raise CouldNotInitializeException("Could not enable verbose reporting")
if not self.__at("ATE1"):
raise CouldNotInitializeException("Could not enable command echoing")
if not self.__at("AT+VCID=1"):
raise CouldNotInitializeException("Could not enable caller ID")
# Timer(1.0, self.read_input).start()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.__del__()
def __del__(self):
"""Destructor
Closes the serial connection.
"""
if self.__con.isOpen():
self.__con.close()
print("Serial Port closed...")
def self_test(self):
"""Returns True if the modem is working correctly
Returns True if the modem is working correctly, False otherwise
"""
return (
self.__at("AT")
and self.__at("AT&F0")
and self.__at("ATV1")
and self.__at("ATE1")
and self.__at("AT+VCID=1")
and self.__at("ATI1")
and self.__at("ATI2")
)
# and self.__at('ATI3', 'CX93001-EIS_V0.2013-V92') and self.__at('ATI0', '56000')
def __at(self, cmd: str, expected="OK"):
"""Execute AT command, returns True if the expected response is received, False otherwise.
Executes an AT command and waits for the expected response, which is 'OK' by default. If the
expected response is received, True is returned. In any other case, False is returned.
"""
self.__con.write((cmd + "\r").encode())
resp = self.__con.readline()
while resp == b"\r\n" or resp == b"OK\r\n":
resp = self.__con.readline()
if resp != (cmd + "\r\r\n").encode():
return False
resp = self.__con.readline()
if resp == (expected + "\r\n").encode():
# print('OK:', cmd)
return True
else:
# print('NOK:', cmd)
return False
def __detect_end(self, data):
"""Detects the end of an ongoing call
Returns if <DLE>s (silence), <DLE>b (busy tone) or <DLE><ETX> (End of TX) are detected in the
provided data.
"""
return b"\x10s" in data or b"\x10b" in data or b"\x10\x03" in data
def push(self, data: str) -> None:
self.__output.on_next(data)
# def read_input(self) -> None:
# while True:
# data = self.__con.readline().decode().replace("\r\n", "")
# self.__output.on_next(data)
def read_input(self) -> None:
while True:
data = self.__con.readline().decode().replace("\r\n", "")
if data != "":
print(data)
def wait_call(self, max_rings_ignore_cid=4):
"""Waits until an incoming call is detected, then returns its caller ID data
Waits until an incoming call is detected, then returns its caller ID data if possible (date, number). If after
max_rings_ignore_cid rings no caller ID data is detected, then returns the tuple (date, '').
"""
rings = 0
while rings <= max_rings_ignore_cid:
data = self.__con.readline().decode().replace("\r\n", "")
if data != "":
if "NMBR" in data:
return datetime.now(), data.replace("NMBR = ", "")
if "RING" in data:
# Just in case Caller ID isn't working
rings += 1
if rings >= max_rings_ignore_cid:
return datetime.now(), ""
def accept_call(self):
"""Accept an incoming call
Sets voice mode, voice sampling mode to 8-bit PCM mono @ 8000Hz, enables transmitting operating mode
and answers the call.
"""
self.__at("AT+FCLASS=8")
self.__at("AT+VSM=1,8000,0,0")
self.__at("AT+VLS=1")
# Pick up
self.__at("ATA")
def play_audio_obj(self, wavobj, timeout=0):
"""Transmits a wave audio object over an ongoing call
Transmits a wave audio object over an ongoing call. Enables voice transmit mode and the audio is
played until it's finished if the timeout is 0 or until the timeout is reached.
"""
if timeout == 0:
timeout = wavobj.getnframes() / wavobj.getframerate()
self.__at("AT+VTX")
# print(timeout)
chunksize = 1024
start_time = time.time()
data = wavobj.readframes(chunksize)
while data != "":
self.__con.write(data)
data = wavobj.readframes(chunksize)
time.sleep(0.06)
if time.time() - start_time >= timeout:
break
def play_audio_file(self, wavfile, timeout=0):
"""Transmits a wave 8-bit PCM mono @ 8000Hz audio file over an ongoing call
Transmits a wave 8-bit PCM mono @ 8000Hz audio file over an ongoing call. Enables voice transmit mode
and the audio is played until it finished if the timeout is 0 or until the timeout is reached.
"""
wavobj = wave.open(wavfile, "rb")
self.play_audio_obj(wavobj, timeout=timeout)
wavobj.close()
def tts_say(self, phrase, lang="english"):
"""Transmits a TTS phrase over an ongoing call
Uses espeak and ffmpeg to generate a wav file of the phrase. Then, it's transmitted over the ongoing call.
"""
os.system(
"espeak -w temp.wav -v"
+ lang
+ ' "'
+ phrase
+ '" ; ffmpeg -i temp.wav -ar 8000 -acodec pcm_u8 '
" -ac 1 phrase.wav"
)
os.remove("temp.wav")
self.play_audio_file("phrase.wav")
os.remove("../phrase.wav")
def play_tones(self, sequence):
"""Plays a sequence of DTMF tones
Plays a sequence of DTMF tones over an ongoing call.
"""
self.__at("AT+VTS=" + ",".join(sequence))
time.sleep(len(sequence))
def reject_call(self):
"""Rejects an incoming call
Answers the call and immediately hangs up in order to correctly terminate the incoming call.
"""
self.__at("ATA")
self.hang_up()
def hang_up(self):
"""Terminates an ongoing call
Terminates the currently ongoing call
"""
self.__at("AT+FCLASS=8")
self.__at("ATH")
def dial(self, number):
"""Initiate a call with the desired number
Sets the modem to voice mode, sets the sampling mode to 8-bit PCM mono @ 8000 Hz, enables transmitting
operating mode, silence detection over a period of 5 seconds and dials to the desired number.
"""
self.__at("AT+FCLASS=8")
self.__at("AT+VSM=1,8000,0,0")
self.__at("AT+VLS=1")
self.__at("AT+VSD=128,50")
self.__at("ATD" + number)
def record_call(self, date=datetime.now(), number="unknown", timeout=7200):
"""Records an ongoing call until it's finished or the timeout is reached
Sets the modem to voice mode, sets the sampling mode to 8-bit PCM mono @ 8000 Hz, enables transmitting
operating mode, silence detection over a period of 5 seconds and voice reception mode. Then, a mp3 file
is written until the end of the call or until the timeout is reached.
"""
self.__at("AT+FCLASS=8")
self.__at("AT+VSM=1,8000,0,0")
self.__at("AT+VLS=1")
self.__at("AT+VSD=128,50")
self.__at("AT+VRX", "CONNECT")
chunksize = 1024
frames = []
start = time.time()
while True:
chunk = self.__con.read(chunksize)
if self.__detect_end(chunk):
break
if time.time() - start >= timeout:
# print('Timeout reached')
break
frames.append(chunk)
self.hang_up()
# Merge frames and save temporarily as .wav
wav_path = date.strftime("%d-%m-%Y_%H:%M:%S_") + number + ".wav"
wav_file = wave.open(wav_path, "wb")
wav_file.setnchannels(1)
wav_file.setsampwidth(1)
wav_file.setframerate(8000)
wav_file.writeframes(b"".join(frames))
wav_file.close()
# Convert from .wav to .mp3 in order to save space
segment = AudioSegment.from_wav(wav_path)
segment.export(wav_path[:-3] + "mp3", format="mp3")
os.remove(wav_path)