-
Notifications
You must be signed in to change notification settings - Fork 0
/
bnkreader.py
191 lines (177 loc) · 6.37 KB
/
bnkreader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
27 JUL 2020
bnkreader.py
Nathan Zimmerberg ([email protected])
Helper functions for reading the BNK chip
"""
import time
import serial
from serial.tools import list_ports
import numpy as np
import binascii
def findteensy():
"""returns the port name to connect to a teensy over usb, returns None if no teensy is found."""
port_list=serial.tools.list_ports.comports()
for port in port_list:
if port.manufacturer=='Teensyduino':
return port.device
def nop(ser):
ser.reset_input_buffer()
ser.write(b'a\n')
#time.sleep(0.1)# this prevents python from busy waiting.
if(ser.readline().strip()!=b'a'):
raise IOError("ack failed")
def dac_set_voltage(ser,voltage):
ser.reset_input_buffer()
ser.write(b'd%f\n'%(voltage))
#time.sleep(0.1)# this prevents python from busy waiting.
if(ser.readline().strip()!=b'a'):
raise IOError("ack failed")
def recording_start(ser,framerate,framechunks,auxchannel,adcrange,userdata0,userdata1):
ser.reset_input_buffer()
print(b'r%d,%d,%d,%d,%d,%d\n'%(framerate,framechunks,auxchannel,adcrange,userdata0,userdata1))
ser.write(b'r%d,%d,%d,%d,%d,%d\n'%(framerate,framechunks,auxchannel,adcrange,userdata0,userdata1))
line = ser.readline().strip() # read a '\n' terminated line
if(ser.readline().strip()!=b'a'):
raise IOError("ack failed")
return float(line)
def status(ser):
ser.reset_input_buffer()
ser.write(b's\n')
recording = int(ser.read_until(b',').strip(b','))
framechunks = int(ser.read_until(b',').strip(b','))
frameskipped = int(ser.read_until(b',').strip(b','))
framedata= ser.read(256)
ser.readline()
if(ser.readline().strip()!=b'a'):
raise IOError("ack failed")
return (recording,framechunks,frameskipped,framedata)
def eject_card(ser):
ser.reset_input_buffer()
ser.write(b'e\n')
if(ser.readline().strip()!=b'a'):
raise IOError("ack failed")
def frame_chunk_readout(ser,chunkid):
ser.write(b'f%d\n'%(chunkid))
chunkdata= ser.read(256*32)
ser.readline()
if(ser.readline().strip()!=b'a'):
raise IOError("ack failed")
return chunkdata
def testskippedframes():
portname=findteensy()
if (not portname):
print('no teensy found')
return
with serial.Serial(portname, 4000000, timeout=10) as ser:
framerates=[40000,20000,10000]
frameskips=[]
for framerate in framerates:
#100s recording
chunklen= int(framerate*100/32)
frameskip=0
for i in range(36):
recording_start(ser,framerate,chunklen,2,1,0,0)
time.sleep(10)
statusmessage= status(ser)
while(statusmessage[0]==1):
#print("checking status again")
#print(statusmessage)
time.sleep(10)
statusmessage= status(ser)
frameskip+= status(ser)[2]
print(frameskip)
frameskips.append(frameskip)
print(frameskips)
print(framerates)
return frameskips
def decodeframe(framedatas):
""" Returns the decoded frame info.
Specifically a tuple of
a numpy array of frame numbers,
this will show if any frame was skipped because the sd card was too slow
a numpy array of the voltages in volts, size (rows,adcs*2,numframes)
a numpy array of the aux channel, size (rows,numframes)
a float of the max range in volts,
a numpy array of userdata0,
a numpy array of userdata1,
framedatas is of type bytes of size a multiple of 256
"""
framesize=256;
rows= 10
adcs= 6
numframes= len(framedatas)//framesize
#put the data into a numpy array
data= np.frombuffer(framedatas,dtype='<u4')
framenums= np.copy(data[0::64])
userdata0s = np.copy(data[61::64])
userdata1s = np.copy(data[62::64])
crcs= np.copy(data[63::64])
#delete extra data
adcdata= np.delete(data,np.s_[63::64])
adcdata= np.delete(adcdata,np.s_[62::63])
adcdata= np.delete(adcdata,np.s_[61::62])
adcdata= np.delete(adcdata,np.s_[0::61])
for i in range(numframes):
framedata= framedatas[i*framesize:(i+1)*framesize]
#checksum
if(binascii.crc32(framedata[:-4]) != crcs[i]):
raise ValueError("crc mismatch")
leadingzeroH= (adcdata>>31)&1
leadingzeroL= (adcdata>>15)&1
rangeH= (adcdata>>30)&1
rangeL= (adcdata>>14)&1
channelH = (adcdata>>29)&1
channelL = (adcdata>>13)&1
BboolH = (adcdata>>28)&1
BboolL = (adcdata>>12)&1
dataH = (adcdata>>16)&0xFFF
dataL = (adcdata)&0xFFF
#validity checks
if (not all(leadingzeroH==0)):
raise ValueError("leading zero high error")
if (not all(leadingzeroL==0)):
raise ValueError("leading zero low error")
if (not all(rangeH==rangeL)):
raise ValueError("range high and low unequal")
if (not all(rangeH==rangeH[0])):
raise ValueError("range changes during recording")
if (not all(channelH==channelL)):
raise ValueError("channel high and low not equal")
if (not all(BboolH==BboolH[0])):
raise ValueError("BboolH changes")
if (not all(BboolL==BboolL[0])):
raise ValueError("BboolL changes")
if (BboolL[0]==BboolH[0]):
raise ValueError("Bbool high and low equal")
#convert to float
if(rangeH[0]==0):
#0v to Vref, straight binary
vH= dataH*(2.5/4096.0)
vL= dataL*(2.5/4096.0)
maxvoltage= 0xFFF*(2.5/4096.0)
else:
#2s complement Vref+-Vref
vH = (dataH ^ (1<<11))*(5.0/4096.0)
vL = (dataL ^ (1<<11))*(5.0/4096.0)
maxvoltage= 0xFFF*(5.0/4096.0)
if(BboolH[0]==1):
voltageB= vH
voltageA= vL
else:
voltageA= vH
voltageB= vL
channels= channelH[adcs-1::adcs]
channels= np.reshape(channels,(numframes,rows))
channels= np.transpose(channels)
#reshape voltages to build a 3d array
volts= np.zeros((rows,adcs*2,numframes))
voltageB= np.reshape(voltageB,(numframes,rows,adcs))
voltageA= np.reshape(voltageA,(numframes,rows,adcs))
for row in range(rows):
for adc in range(adcs):
volts[row,adc*2]= voltageB[:,row,adc]
volts[row,adc*2+1]= voltageA[:,row,adc]
return (framenums,volts,channels+1,maxvoltage,userdata0s,userdata1s)
if __name__ == '__main__':
testskippedframes()