-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreaming_test.py
76 lines (53 loc) · 2.13 KB
/
streaming_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import time
import os
from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds, BrainFlowPresets
import brainflow
"""
DEMO CODE for using Brainflow and Unicorn
See:
https://brainflow.readthedocs.io/en/stable/Examples.html#python
For more details
- Leonardo
"""
def parse_config(config_file):
"""
Checks the local config file and returns it's contents as a dictionary
"""
dir = os.getcwd()
config_path = os.path.join(dir, config_file)
config_dict = {}
with open(config_path, "r") as f:
for line in f:
content = line.split(": ")
print(content)
config_dict[content[0]] = content[1]
return config_dict
def run(runtime=10, needsPort=False):
"""
Records and saves data from the Unicorn Board
runtime: the time taken for recording
needsPort: whether we need a serial port or not, the bluetooth dongle functions as bluetooth so likely unneeded
"""
BoardShim.enable_dev_board_logger() # Enable Internal Brainflow Logger
params = BrainFlowInputParams() # Initialize BrainFlowInputParams container object as 'params'
config = parse_config("config.dat")
# Only useful if port is needed, may need to remove
if needsPort:
port = config["PORT"]
new_config = input(f"Use new serial port, or used stored serial port? Stored: {port}")
if new_config != "":
port = new_config
params.serial_port = port
board = BoardShim(BoardIds.UNICORN_BOARD.value, params)
board.prepare_session()
board.start_stream()
time.sleep(runtime)
# data = board.get_current_board_data (256) # get latest 256 packages or less, doesnt remove them from internal buffer
data = board.get_board_data() # get all data and remove it from internal buffer
board.stop_stream()
board.release_session()
filename = input("What do you want to name your output file?")
brainflow.DataFilter.write_file(data, f'{filename}.csv', 'w') # use 'a' for append mode
print("Data has been saved. Closing.")
if __name__ == "__main__":
run(runtime=10, needsPort=False)