-
Notifications
You must be signed in to change notification settings - Fork 0
/
DTMFdetector.py
288 lines (222 loc) · 7.49 KB
/
DTMFdetector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import chunk
import wave
import struct
import math
###############################################
## This class is used to detect DTMF tones in a WAV file
##
## Currently this only works with uncompressed .WAV files
## encoded 16 bits per channel, one channel, at 8000 hertz.
## I know if I tried I could make this so it'd work with
## other sampling rates and such, but this is how PyS60 records
## audio and I've got a bunch of other things to work on right
## now. If you want to add this functionality please go ahead
## and do it
class DTMFdetector(object):
###########################################
## The constructor
##
## initializes the instance variables
## pre-calculates the coefficients
def __init__(self):
#DEFINE SOME CONSTANTS FOR THE
#GOERTZEL ALGORITHM
self.MAX_BINS = 8
self.GOERTZEL_N = 92
self.SAMPLING_RATE = 8000
self.GOERTZEL_N = 210
self.SAMPLING_RATE = 16000
#the frequencies we're looking for
self.freqs = [697, 770, 852, 941, 1209, 1336, 1477, 1633]
#the coefficients
self.coefs = [0, 0, 0, 0, 0, 0, 0, 0]
self.reset()
self.calc_coeffs()
###########################################
## This will reset all the state of the detector
def reset(self):
#the index of the current sample being looked at
self.sample_index = 0
#the counts of samples we've seen
self.sample_count = 0
#first pass
self.q1 = [0, 0, 0, 0, 0, 0, 0, 0]
#second pass
self.q2 = [0, 0, 0, 0, 0, 0, 0, 0]
#r values
self.r = [0, 0, 0, 0, 0, 0, 0, 0]
#this stores the characters seen so far
#and the times they were seen at for
#post, post processing
self.characters = []
#this stores the final string of characters
#we believe the audio contains
self.charStr = ""
###########################################
## Post testing for algorithm
## figures out what's a valid signal and what's not
def post_testing(self):
row = 0
col = 0
see_digit = 0
peak_count = 0
max_index = 0
maxval = 0.0
t = 0
i = 0
msg = "none"
row_col_ascii_codes = [["1", "2", "3", "A"],["4", "5", "6", "B"],["7", "8", "9", "C"],["*", "0", "#", "D"]]
#Find the largest in the row group.
for i in range(4):
if self.r[i] > maxval:
maxval = self.r[i]
row = i
#Find the largest in the column group.
col = 4
maxval = 0
for i in range(4,8):
if self.r[i] > maxval:
maxval = self.r[i]
col = i
#Check for minimum energy
if self.r[row] < 4.0e5:
msg = "energy not enough"
elif self.r[col] < 4.0e5:
msg = "energy not enough"
else:
see_digit = True
#Normal twist
if self.r[col] > self.r[row]:
max_index = col
if self.r[row] < (self.r[col] * 0.398):
see_digit = False
#Reverse twist
else:
max_index = row
if self.r[col] < (self.r[row] * 0.158):
see_digit = False
#signal to noise test
#AT&T states that the noise must be 16dB down from the signal.
# Here we count the number of signals above the threshold and
#there ought to be only two.
if self.r[max_index] > 1.0e9:
t = self.r[max_index] * 0.158
else:
t = self.r[max_index] * 0.010
peak_count = 0
for i in range(8):
if self.r[i] > t:
peak_count = peak_count + 1
if peak_count > 2:
see_digit = False
#print "peak count is to high: ", peak_count
if see_digit:
#print row_col_ascii_codes[row][col-4] #for debugging
#stores the character found, and the time in the file in seconds in which the file was found
self.characters.append( (row_col_ascii_codes[row][col-4], float(self.sample_index) / float(self.SAMPLING_RATE)) )
###########################################
## This takes the number of characters found and such and
## figures out what's a distinct key press.
##
## So say you pressed 5,3,2,1,1
## The algorithm sees 555553333332222221111111111111
## Cleaning up gives you 5,3,2,1,1
def clean_up_processing(self):
#this is nothing but a fancy state machine
#to get a valid key press we need
MIN_CONSECUTIVE = 2
#characters in a row
#with no more than
MAX_GAP = 0.3000
#seconds between each consecutive characters
#otherwise we'll think they've pressed the same
#key twice
self.charStr = ""
currentCount = 0
lastChar = ""
lastTime = 0
charIndex = -1
for i in self.characters:
charIndex+=1
currentChar = i[0]
currentTime = i[1]
timeDelta = currentTime - lastTime
# ###########################################################################3 debugger here ##################################
# print ("curr char:", currentChar, "time delta:", timeDelta) #for debugging
#check if this is the same char as last time
if lastChar == currentChar and currentCount<=30:
currentCount+=1
else:
#some times it seems we'll get a stream of good input, then some erronous input
#will pop-up just once. So what we're gonna do is peak ahead here and see what
#if it goes back to the pattern we're getting and then decide if we should
# let it go, stop th whole thing
# Make sure we can look ahead
if len(self.characters) > (charIndex + 2):
if (self.characters[charIndex + 1][0] == lastChar) and (self.characters[charIndex + 2][0] == lastChar):
#forget this every happened
lastTime = currentTime
continue
#check to see if we have a valid key press on our hands
if currentCount >= MIN_CONSECUTIVE:
self.charStr+=lastChar
currentCount = 1
lastChar = currentChar
lastTime = currentTime
continue
#check to see if we have a big enough gap to make us think we've
#got a new key press
if timeDelta > MAX_GAP:
#so de we have enough counts for this to be valid?
if (currentCount - 1) >= MIN_CONSECUTIVE:
self.charStr+=lastChar
currentCount = 1
lastChar = currentChar
lastTime = currentTime
#check the end of the characters
if currentCount >= MIN_CONSECUTIVE:
self.charStr+=lastChar
###########################################
## the Goertzel algorithm
## takes in a 16 bit signed sample
def goertzel(self, sample):
q0 = 0
i = 0
self.sample_count += 1
self.sample_index += 1
for i in range(self.MAX_BINS):
q0 = self.coefs[i] * self.q1[i] - self.q2[i] + sample
self.q2[i] = self.q1[i]
self.q1[i] = q0
if self.sample_count == self.GOERTZEL_N:
for i in range(self.MAX_BINS):
self.r[i] = (self.q1[i] * self.q1[i]) + (self.q2[i] * self.q2[i]) - (self.coefs[i] * self.q1[i] * self.q2[i])
self.q1[i] = 0
self.q2[i] = 0
self.post_testing()
self.sample_count = 0
###########################################
## calculate the coefficients ahead of time
def calc_coeffs(self):
for n in range(self.MAX_BINS):
self.coefs[n] = 2.0 * math.cos(2.0 * math.pi * self.freqs[n] / self.SAMPLING_RATE)
#print "coefs", n, "=", self.coefs[n] #for debugging
###########################################
## this will take in a file name of a WAV file and return
## a string that contains the characters that were detected
## so if youre WAV file has the DTMFs for 5,5,5,3 then the string
## it returns will be "5553"
def getDTMFfromWAV(self, filename):
self.reset() #reset the current state of the detector
file = wave.open(filename)
#print file.getparams()
totalFrames = file.getnframes()
count = 0
while totalFrames != count:
raw = file.readframes(1)
(sample,) = struct.unpack("h", raw)
self.goertzel(sample)
count = count + 1
file.close()
self.clean_up_processing()
return self.charStr