Skip to content

Commit 6b14a1f

Browse files
committed
gather-write directly to asio when using sync mode and pushing a sample of buffers.
1 parent 89252cd commit 6b14a1f

10 files changed

+226
-122
lines changed

examples/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ addlslexample(ReceiveDataInChunks cpp)
3636
addlslexample(ReceiveDataSimple cpp)
3737
addlslexample(ReceiveStringMarkers cpp)
3838
addlslexample(ReceiveStringMarkersC c)
39+
addlslexample(SendData cpp)
3940
addlslexample(SendDataC c)
4041
addlslexample(SendDataInChunks cpp)
4142
addlslexample(SendDataSimple cpp)

examples/ReceiveDataInChunks.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,18 @@
88
int main(int argc, char **argv) {
99
std::cout << "ReceiveDataInChunks" << std::endl;
1010
std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl;
11-
std::cout << "- max_buffered -- duration in msec to buffer" << std::endl;
11+
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer in the receiver" << std::endl;
1212
std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl;
1313

1414
try {
1515

1616
std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
17-
double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.;
17+
double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.;
1818
bool flush = argc > 3;
1919
// resolve the stream of interest & make an inlet
20-
int32_t buf_samples = (int32_t)(max_buflen * 1000);
21-
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen,
22-
transp_bufsize_thousandths);
20+
lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0);
21+
lsl::stream_inlet inlet(inlet_info,(int32_t)(max_buffered * 1000),
22+
0, true, transp_bufsize_thousandths);
2323

2424
// Use set_postprocessing to get the timestamps in a common base clock.
2525
// Do not use if this application will record timestamps to disk -- it is better to

examples/SendData.cpp

+95-69
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,95 @@
1-
#include "lsl_cpp.h"
2-
#include <iostream>
3-
#include <stdlib.h>
4-
#include <time.h>
5-
using namespace std;
6-
7-
/**
8-
* This example program offers an 8-channel stream, float-formatted, that resembles EEG data.
9-
* The example demonstrates also how per-channel meta-data can be specified using the .desc() field
10-
* of the stream information object.
11-
*
12-
* Note that the timer used in the send loop of this program is not particularly accurate.
13-
*/
14-
15-
16-
const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};
17-
18-
int main(int argc, char *argv[]) {
19-
string name, type;
20-
if (argc != 3) {
21-
cout << "This opens a stream under some user-defined name and with a user-defined content "
22-
"type."
23-
<< endl;
24-
cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without "
25-
"the quotes)):"
26-
<< endl;
27-
cin >> name >> type;
28-
} else {
29-
name = argv[1];
30-
type = argv[2];
31-
}
32-
33-
try {
34-
35-
// make a new stream_info (100 Hz)
36-
lsl::stream_info info(name, type, 8, 100, lsl::cf_float32, string(name) += type);
37-
38-
// add some description fields
39-
info.desc().append_child_value("manufacturer", "BioSemi");
40-
lsl::xml_element chns = info.desc().append_child("channels");
41-
for (int k = 0; k < 8; k++)
42-
chns.append_child("channel")
43-
.append_child_value("label", channels[k])
44-
.append_child_value("unit", "microvolts")
45-
.append_child_value("type", "EEG");
46-
47-
// make a new outlet
48-
lsl::stream_outlet outlet(info);
49-
50-
// send data forever
51-
cout << "Now sending data... " << endl;
52-
double starttime = ((double)clock()) / CLOCKS_PER_SEC;
53-
for (unsigned t = 0;; t++) {
54-
55-
// wait a bit and create random data
56-
while (((double)clock()) / CLOCKS_PER_SEC < starttime + t * 0.01)
57-
;
58-
float sample[8];
59-
for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5);
60-
61-
// send the sample
62-
outlet.push_sample(sample);
63-
}
64-
65-
} catch (std::exception &e) { cerr << "Got an exception: " << e.what() << endl; }
66-
cout << "Press any key to exit. " << endl;
67-
cin.get();
68-
return 0;
69-
}
1+
#include "lsl_cpp.h"
2+
#include <iostream>
3+
#include <stdlib.h>
4+
#include <time.h>
5+
#include <array>
6+
using namespace std;
7+
8+
/**
9+
* This example program offers an 8-channel stream, float-formatted, that resembles EEG data.
10+
* The example demonstrates also how per-channel meta-data can be specified using the .desc() field
11+
* of the stream information object.
12+
*
13+
* Note that the timer used in the send loop of this program is not particularly accurate.
14+
*/
15+
16+
17+
const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};
18+
19+
int main(int argc, char *argv[]) {
20+
string name, type;
21+
if (argc < 3) {
22+
cout << "This opens a stream under some user-defined name and with a user-defined content "
23+
"type." << endl;
24+
cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] contig[true]" << endl;
25+
cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without "
26+
"the quotes)):"
27+
<< endl;
28+
cin >> name >> type;
29+
} else {
30+
name = argv[1];
31+
type = argv[2];
32+
}
33+
int n_channels = argc > 3 ? std::stol(argv[3]) : 8;
34+
n_channels = n_channels < 8 ? 8 : n_channels;
35+
int samplingrate = argc > 4 ? std::stol(argv[4]) : 100;
36+
int max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
37+
bool sync = argc > 6 ? std::stol(argv[6]) > 0 : false;
38+
bool contig = argc > 7 ? std::stol(argv[7]) > 0 : true;
39+
40+
try {
41+
42+
// make a new stream_info (100 Hz)
43+
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_float32, string(name) += type);
44+
45+
// add some description fields
46+
info.desc().append_child_value("manufacturer", "LSL");
47+
lsl::xml_element chns = info.desc().append_child("channels");
48+
for (int k = 0; k < n_channels; k++)
49+
chns.append_child("channel")
50+
.append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k+1))
51+
.append_child_value("unit", "microvolts")
52+
.append_child_value("type", type);
53+
54+
// make a new outlet
55+
lsl::stream_outlet outlet(info, 0, max_buffered, sync ? transp_sync_blocking : transp_default);
56+
57+
// send data forever
58+
cout << "Now sending data... " << endl;
59+
double starttime = ((double)clock()) / CLOCKS_PER_SEC;
60+
61+
// Initialize 2 discontiguous data arrays.
62+
vector<float> sample(8, 0.0);
63+
vector<float> extra(n_channels - 8, 0.0);
64+
if (contig) {
65+
// If this is contiguous mode (default) then we combine the arrays.
66+
sample.insert(
67+
sample.end(),
68+
make_move_iterator(extra.begin()),
69+
make_move_iterator(extra.end()));
70+
}
71+
// bytes is used in !contig mode because we need to know how big each buffer is.
72+
array<uint32_t, 2> bytes = {8 * sizeof(float), static_cast<uint32_t>((n_channels - 8) * sizeof(float))};
73+
for (unsigned t = 0;; t++) {
74+
75+
// wait a bit and create random data
76+
while (((double)clock()) / CLOCKS_PER_SEC < starttime + t * 0.01)
77+
;
78+
for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5);
79+
80+
// send the sample
81+
if (contig)
82+
outlet.push_sample(sample);
83+
else {
84+
// Advanced: Push set of discontiguous buffers.
85+
array<float *, 2> bufs = {sample.data(), extra.data()};
86+
outlet.push_numeric_bufs(reinterpret_cast<const char **>(const_cast<const float**>(bufs.data())),
87+
bytes.data(), 2, lsl::local_clock(), true);
88+
}
89+
}
90+
91+
} catch (exception &e) { cerr << "Got an exception: " << e.what() << endl; }
92+
cout << "Press any key to exit. " << endl;
93+
cin.get();
94+
return 0;
95+
}

examples/SendDataInChunks.cpp

+18-8
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ int main(int argc, char **argv) {
104104
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
105105
bool nodata = argc > 7;
106106
bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true;
107+
bool b_contig = true && do_sync; // Set true to test gather-write operations.
107108

108109
int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
109110
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk
@@ -118,7 +119,7 @@ int main(int argc, char **argv) {
118119
lsl::xml_element chn = chns.append_child("channel");
119120
chn.append_child_value("label", "Chan-" + std::to_string(c));
120121
chn.append_child_value("unit", "microvolts");
121-
chn.append_child_value("type", "EEG");
122+
chn.append_child_value("type", type);
122123
}
123124
int32_t buf_samples = max_buffered * samplingrate;
124125
lsl::stream_outlet outlet(info, chunk_samples, buf_samples,
@@ -127,32 +128,41 @@ int main(int argc, char **argv) {
127128
std::cout << "Stream UID: " << info.uid() << std::endl;
128129

129130
// Create a connection to our device.
130-
fake_device my_device(n_channels, (float)samplingrate);
131+
int dev_chans = b_contig ? n_channels : n_channels + 1;
132+
fake_device my_device(dev_chans, (float)samplingrate);
131133

132134
// Prepare buffer to get data from 'device'.
133135
// The buffer should be larger than you think you need. Here we make it 4x as large.
134-
std::vector<int16_t> chunk_buffer(4 * chunk_samples * n_channels);
135-
std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0);
136+
std::vector<int16_t> dev_buffer(4 * chunk_samples * dev_chans);
137+
std::fill(dev_buffer.begin(), dev_buffer.end(), 0);
136138

137139
std::cout << "Now sending data..." << std::endl;
138140

139141
// Your device might have its own timer. Or you can decide how often to poll
140142
// your device, as we do here.
141-
auto next_chunk_time = std::chrono::high_resolution_clock::now();
143+
auto t_start = std::chrono::high_resolution_clock::now();
144+
auto next_chunk_time = t_start;
142145
for (unsigned c = 0;; c++) {
143146
// wait a bit
144147
next_chunk_time += std::chrono::milliseconds(chunk_duration);
145148
std::this_thread::sleep_until(next_chunk_time);
146149

147150
// Get data from device
148-
std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata);
151+
std::size_t returned_samples = my_device.get_data(dev_buffer, nodata);
149152

150153
// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
151154
// other push_chunk methods are easier but slightly slower.
152155
double ts = lsl::local_clock();
153-
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, ts, true);
156+
if (b_contig) {
157+
// Push a chunk of a contiguous buffer.
158+
outlet.push_chunk_multiplexed(dev_buffer.data(), returned_samples * n_channels, ts, true);
159+
} else {
160+
std::cout << "Discontiguous push_chunk not yet supported." << std::endl;
161+
std::cout << "See SendData.cpp for discontiguous push_sample, then set " << std::endl;
162+
std::cout << "timestamps as LSL_DEDUCED_TIMESTAMP and pushtrough as false " << std::endl;
163+
std::cout << "for all samples except the the first or last in a chunk." << std::endl;
164+
}
154165
}
155-
156166
} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
157167
std::cout << "Press any key to exit. " << std::endl;
158168
std::cin.get();

include/lsl/outlet.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data
9999
* @see lsl_push_sample_ftp
100100
* @param out The lsl_outlet object through which to push the data.
101101
* @param data A pointer to values to push. The number of values pointed to must be no less than the number of channels in the sample.
102-
* @param lengths A pointer the number of elements to push for each channel (string lengths).
102+
* @param lengths A pointer the number of elements to push for each channel (string lengths, or number of bytes).
103103
*/
104104
extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths);
105105
/** @copydoc lsl_push_sample_buf
@@ -108,6 +108,11 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da
108108
/** @copydoc lsl_push_sample_buft
109109
* @param pushthrough @see lsl_push_sample_ftp */
110110
extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough);
111+
/** @copydoc lsl_push_sample_buftp
112+
* @param nbufs Number of values pointed to in `data` and number of items in `lengths` -- doesn't assume one buffer
113+
* per channel but each array in data must be longer than each item in lengths.
114+
*/
115+
extern LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs);
111116

112117
/** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided.
113118
*

include/lsl_cpp.h

+15-1
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ class stream_outlet {
499499
}
500500

501501
/** Push a pointer to raw numeric data as one sample into the outlet.
502-
* This is the lowest-level function; performns no checking whatsoever. Can not be used for
502+
* This is the lowest-level function; performs no checking whatsoever. Cannot be used for
503503
* variable-size / string-formatted channels.
504504
* @param sample A pointer to the raw sample data to push.
505505
* @param timestamp Optionally the capture time of the sample, in agreement with local_clock();
@@ -512,6 +512,20 @@ class stream_outlet {
512512
lsl_push_sample_vtp(obj.get(), (sample), timestamp, pushthrough);
513513
}
514514

515+
/**
516+
* Push a pointer to an array of buffers of variable size as one sample into the outlet.
517+
*
518+
* @param bufs A pointer to an array of data buffers.
519+
* @param bytes An array of sizes (number of bytes) of buffers in bufs.
520+
* @param nbufs Total number of buffers.
521+
* @param timestamp Optionally the capture time of the sample, in agreement with local_clock();
522+
* @param pushthrough Whether to push the sample through to the receivers immediately instead of
523+
* concatenating with subsequent samples.
524+
*/
525+
void push_numeric_bufs(const char **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, bool pushthrough = true) {
526+
lsl_push_sample_buftpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs);
527+
}
528+
515529

516530
// ===================================================
517531
// === Pushing an chunk of samples into the outlet ===

src/lsl_outlet_c.cpp

+24-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ LIBLSL_C_API lsl_outlet lsl_create_outlet_ex(
2828
buf_samples /= 1000;
2929
buf_samples = (buf_samples > 0) ? buf_samples : 1;
3030
return create_object_noexcept<stream_outlet_impl>(
31-
*info, chunk_size, buf_samples);
31+
*info, chunk_size, buf_samples, flags);
3232
}
3333

3434
LIBLSL_C_API lsl_outlet lsl_create_outlet(
@@ -171,14 +171,32 @@ LIBLSL_C_API int32_t lsl_push_sample_buft(
171171

172172
LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data,
173173
const uint32_t *lengths, double timestamp, int32_t pushthrough) {
174+
stream_outlet_impl *outimpl = out;
175+
return lsl_push_sample_buftpn(out, data, lengths, timestamp, pushthrough,
176+
(uint32_t)outimpl->info().channel_count());
177+
}
178+
179+
LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data,
180+
const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs) {
181+
stream_outlet_impl *outimpl = out;
174182
try {
175-
stream_outlet_impl *outimpl = out;
176-
std::vector<std::string> tmp;
177-
for (uint32_t k = 0; k < (uint32_t)outimpl->info().channel_count(); k++)
178-
tmp.emplace_back(data[k], lengths[k]);
179-
return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough);
183+
if (outimpl->is_sync_blocking()) {
184+
// Convert input to a vector of asio buffers
185+
std::vector<asio::const_buffer> buffs;
186+
for (auto buf_ix = 0; buf_ix < nbufs; buf_ix++) {
187+
buffs.push_back(asio::buffer(data[buf_ix], lengths[buf_ix]));
188+
}
189+
return outimpl->push_sample_gather(buffs, timestamp, pushthrough);
190+
} else {
191+
std::vector<std::string> tmp;
192+
for (uint32_t k = 0; k < nbufs; k++)
193+
tmp.emplace_back(data[k], lengths[k]);
194+
return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough);
195+
}
180196
} catch (std::exception &e) {
181197
LOG_F(WARNING, "Unexpected error during push_sample: %s", e.what());
198+
if (!outimpl->is_sync_blocking() && outimpl->info().channel_format() != cft_string)
199+
LOG_F(ERROR, "lsl_push_sample_buftpn only compatible with string type or when outlet is using sync writes.");
182200
return lsl_internal_error;
183201
}
184202
}

0 commit comments

Comments
 (0)