-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathLSL.py
164 lines (139 loc) · 6.57 KB
/
LSL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import os.path
import threading
import pandas as pd
import pylsl
import config
class LSL:
"""
A class to interface with a local network Laboratory Streaming Layer to collect EEG data, responsible for handling
LSL input, collection, and formatting.
"""
streams = None # The LSL streams being tracked
collected_data = None # The collected data to be held and reviewed between start_collection() and stop_collection()
collecting = False # Flag if data is currently being collected
collection_thread = None # The current thread data is being collected on, if any
collection_label = None # The current label to be appended to the data, if any
@staticmethod
def init_lsl_stream():
"""
MUST BE CALLED TO initialize streams, data, timestamp offset, and the collection thread.
"""
# Variables to hold streams, data, and the collection thread
LSL.streams = {}
LSL.collected_data = {}
for stream_type, enabled in config.SUPPORTED_STREAMS.items():
if enabled:
LSL.streams[stream_type] = None
LSL.collected_data[stream_type] = []
# Initialize all required streams
for stream_type in LSL.streams.keys():
LSL._find_and_initialize_stream(stream_type)
@staticmethod
def clear_stream_buffers():
"""
Clears the buffer of each LSL stream to ensure no old data is included in the new collection.
"""
for stream_type, stream in LSL.streams.items():
if stream:
print(f"Clearing buffer for {stream_type} stream...")
# Continuously pull from the stream until no more samples are returned
while True:
sample, timestamp = stream.pull_sample(timeout=0.0) # Non-blocking pull
if not sample: # If no sample is returned, the buffer is considered cleared
break
print(f"{stream_type} stream buffer cleared.")
@staticmethod
def start_collection():
"""
Function to start data collection.
"""
LSL.clear_stream_buffers()
print("Started data collection.")
LSL.collecting = True
for stream_type in LSL.collected_data.keys():
LSL.collected_data[stream_type] = []
LSL.collection_thread = threading.Thread(target=LSL._collect_data)
LSL.collection_thread.start()
@staticmethod
def stop_collection(path: str):
"""
Function to stop data collection and save to CSV.
:param path: Path to the FOLDER that the data should be saved to.
"""
if LSL.collecting:
LSL.collecting = False
LSL.collection_thread.join()
print("Data collection stopped. Saving collected data.")
LSL._save_collected_data(path)
@staticmethod
def start_label(event: str):
"""
Function to start labelling each data frame until stop_label() is called
"""
if event != LSL.collection_label:
LSL.collection_label = event
print(f"Labeling Data: {event}")
@staticmethod
def stop_label():
"""
Function to stop labelling each data frame and revert to no label
"""
if LSL.collection_label:
print(f"Stopped Labeling Data: {LSL.collection_label}")
LSL.collection_label = None
#
# HELPER METHODS
#
@staticmethod
def _find_and_initialize_stream(stream_type: str):
"""
Function to find and initialize a specific LSL stream
:param stream_type: The type of the LSL stream.
"""
print(f"Looking for a {stream_type} stream...")
streams_info = pylsl.resolve_byprop('type', stream_type, 1, config.LSL_RESOLUTION_TIMEOUT)
if len(streams_info) > 0:
print(f"{stream_type} stream found.")
LSL.streams[stream_type] = pylsl.StreamInlet(streams_info[0])
LSL.streams[stream_type].time_correction() # Initialize time correction to accurately convert to system
else:
print(f"No {stream_type} stream found. Exiting Data Collector.")
exit(1)
@staticmethod
def _collect_data():
"""
Helper function to collect data in the LSL stream on a separate thread to run tests with.
This works using a constant while loop that continuously polls all LSL streams for samples. If a sample is not
returned from the poll, it will not be logged. The sample rate is currently as fast as possible with no buffer
for real-time data processing. Uses StreamInlet.time_correction() to convert LSL to system timestamps using a
constantly updated offset. The precision of these estimates should be below 1 ms (empirically within +/-0.2 ms).
"""
while LSL.collecting:
for stream_type, stream in LSL.streams.items():
data_row = {'Timestamp': None, 'Label': config.DEFAULT_LABEL if not LSL.collection_label else LSL.collection_label}
if stream:
sample, timestamp = stream.pull_sample(timeout=0.0) # Non-blocking pull
if sample:
# Set timestamp from the first stream and add time correction offset
data_row['Timestamp'] = timestamp
# Flatten the data row into a single list and append to collected data
flattened_data_row = [data_row['Timestamp']] + [data_row['Label']] + sample
LSL.collected_data[stream_type] += [flattened_data_row]
@staticmethod
def _save_collected_data(path: str):
"""
Function to save data collected after collection has been stopped.
:param path: Path to the FOLDER that the data should be saved to
"""
if LSL.collected_data:
for stream_type in LSL.streams.keys():
channel_count = LSL.streams[stream_type].info().channel_count() if LSL.streams[stream_type] else 0
# Define column headers
columns = ['Timestamp'] + ['Label'] + [f'{stream_type}_{i + 1}' for i in range(channel_count)]
# Convert collected data to a DataFrame, format with columns above, and write to CSV
df = pd.DataFrame(LSL.collected_data[stream_type], columns=columns)
df = df.sort_values(by='Timestamp')
df.to_csv(os.path.join(path, f"{stream_type}_data.csv"), index=False)
print(f"Collected {stream_type} data saved.")
else:
print("No data to save.")