Skip to content

Synchronous Outlet for zero-copying socket writing #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ addlslexample(ReceiveDataInChunks cpp)
addlslexample(ReceiveDataSimple cpp)
addlslexample(ReceiveStringMarkers cpp)
addlslexample(ReceiveStringMarkersC c)
addlslexample(SendData cpp)
addlslexample(SendDataC c)
addlslexample(SendDataInChunks cpp)
addlslexample(SendDataSimple cpp)
Expand Down
15 changes: 12 additions & 3 deletions examples/ReceiveDataC.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@
* Example program that demonstrates how to resolve a specific stream on the lab network and how to
* connect to it in order to receive data.
*/
#define NCHANS 8


int main(int argc, char *argv[]) {

unsigned k, t; /* channel index */
lsl_streaminfo info; /* the streaminfo returned by the resolve call */
lsl_inlet inlet; /* a stream inlet to get samples from */
int errcode; /* error code (lsl_lost_error or timeouts) */
float cursample[8]; /* array to hold our current sample */
float cursample[NCHANS]; /* array to hold our current sample */
double timestamp; /* time stamp of the current sample (in sender time) */

/* resolve the stream of interest (result array: info, array capacity: 1 element, type shall be
* EEG, resolve at least 1 stream, wait forever if necessary) */
printf("Now waiting for an EEG stream...\n");
lsl_resolve_byprop(&info, 1, "type", "EEG", 1, LSL_FOREVER);

/* These next two variables aren't used for anything in this example.
* They simply demonstrate how to use streaminfo getters. */
lsl_channel_format_t fmt = lsl_get_channel_format(info);
double srate = lsl_get_nominal_srate(info);

/* make an inlet to read data from the stream (buffer max. 300 seconds of data, no preference
* regarding chunking, automatic recovery enabled) */
inlet = lsl_create_inlet(info, 300, LSL_NO_PREFERENCE, 1);
Expand All @@ -33,10 +40,12 @@ int main(int argc, char *argv[]) {
for (t = 0; t < 100000000; t++) {
/* get the next sample form the inlet (read into cursample, 8 values, wait forever if
* necessary) and return the timestamp if we got something */
timestamp = lsl_pull_sample_f(inlet, cursample, 8, LSL_FOREVER, &errcode);
timestamp = lsl_pull_sample_f(inlet, cursample, NCHANS, LSL_FOREVER, &errcode);

/* print the data */
for (k = 0; k < 8; ++k) printf("\t%.2f", cursample[k]);
printf("%.2f", timestamp);
for (k = 0; k < 8; ++k)
printf("\t%.2f", cursample[k]);
printf("\n");
}

Expand Down
8 changes: 6 additions & 2 deletions examples/ReceiveDataInChunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
int main(int argc, char **argv) {
std::cout << "ReceiveDataInChunks" << std::endl;
std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl;
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer in the receiver" << std::endl;
std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl;

try {

std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360;
double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.;
bool flush = argc > 3;
// resolve the stream of interest & make an inlet
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen);
lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0);
lsl::stream_inlet inlet(inlet_info,(int32_t)(max_buffered * 1000),
0, true, transp_bufsize_thousandths);

// Use set_postprocessing to get the timestamps in a common base clock.
// Do not use if this application will record timestamps to disk -- it is better to
Expand Down
6 changes: 5 additions & 1 deletion examples/ReceiveDataSimple.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <lsl_cpp.h>
#include <vector>
#include <iostream>

/**
* This is a minimal example that demonstrates how a multi-channel stream (here 128ch) of a
Expand All @@ -16,7 +17,10 @@ int main(int argc, char **argv) {

// receive data & time stamps forever (not displaying them here)
std::vector<float> sample;
while (true) inlet.pull_sample(sample);
while (true) {
double timestamp = inlet.pull_sample(sample);
std::cout << timestamp << "\t" << sample[0] << "\t" << sample[1] << "..." << std::endl;
}

return 0;
}
178 changes: 109 additions & 69 deletions examples/SendData.cpp
Original file line number Diff line number Diff line change
@@ -1,69 +1,109 @@
#include "lsl_cpp.h"
#include <iostream>
#include <stdlib.h>
#include <time.h>
using namespace std;

/**
* This example program offers an 8-channel stream, float-formatted, that resembles EEG data.
* The example demonstrates also how per-channel meta-data can be specified using the .desc() field
* of the stream information object.
*
* Note that the timer used in the send loop of this program is not particularly accurate.
*/


const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};

int main(int argc, char *argv[]) {
string name, type;
if (argc != 3) {
cout << "This opens a stream under some user-defined name and with a user-defined content "
"type."
<< endl;
cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without "
"the quotes)):"
<< endl;
cin >> name >> type;
} else {
name = argv[1];
type = argv[2];
}

try {

// make a new stream_info (100 Hz)
lsl::stream_info info(name, type, 8, 100, lsl::cf_float32, string(name) += type);

// add some description fields
info.desc().append_child_value("manufacturer", "BioSemi");
lsl::xml_element chns = info.desc().append_child("channels");
for (int k = 0; k < 8; k++)
chns.append_child("channel")
.append_child_value("label", channels[k])
.append_child_value("unit", "microvolts")
.append_child_value("type", "EEG");

// make a new outlet
lsl::stream_outlet outlet(info);

// send data forever
cout << "Now sending data... " << endl;
double starttime = ((double)clock()) / CLOCKS_PER_SEC;
for (unsigned t = 0;; t++) {

// wait a bit and create random data
while (((double)clock()) / CLOCKS_PER_SEC < starttime + t * 0.01)
;
float sample[8];
for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5);

// send the sample
outlet.push_sample(sample);
}

} catch (std::exception &e) { cerr << "Got an exception: " << e.what() << endl; }
cout << "Press any key to exit. " << endl;
cin.get();
return 0;
}
#include "lsl_cpp.h"
#include <iostream>
#include <stdlib.h>
#include <time.h>
#include <array>
#include <thread>

/**
* This example program offers an 8-channel stream, float-formatted, that resembles EEG data.
* The example demonstrates also how per-channel meta-data can be specified using the .desc() field
* of the stream information object.
*
* Note that the timer used in the send loop of this program is not particularly accurate.
*/


const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};

int main(int argc, char *argv[]) {
std::string name, type;
if (argc < 3) {
std::cout << "This opens a stream under some user-defined name and with a user-defined content "
"type." << std::endl;
std::cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] contig[true]" << std::endl;
std::cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without "
"the quotes)):"
<< std::endl;
std::cin >> name >> type;
} else {
name = argv[1];
type = argv[2];
}
int n_channels = argc > 3 ? std::stol(argv[3]) : 8;
n_channels = n_channels < 8 ? 8 : n_channels;
int samplingrate = argc > 4 ? std::stol(argv[4]) : 100;
int max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
bool sync = argc > 6 ? std::stol(argv[6]) > 0 : false;
bool contig = argc > 7 ? std::stol(argv[7]) > 0 : true;

try {
// if (!sync && !contig) {
// throw std::invalid_argument( "async is incompatible with discontig push_numeric_bufs (except for strings, not used here)." );
// }

// make a new stream_info (100 Hz)
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_float32, std::string(name) += type);

// add some description fields
info.desc().append_child_value("manufacturer", "LSL");
lsl::xml_element chns = info.desc().append_child("channels");
for (int k = 0; k < n_channels; k++)
chns.append_child("channel")
.append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k+1))
.append_child_value("unit", "microvolts")
.append_child_value("type", type);

// make a new outlet
lsl::stream_outlet outlet(info, 0, max_buffered, sync ? transp_sync_blocking : transp_default);

// Initialize 2 discontiguous data arrays.
std::vector<float> sample(8, 0.0);
std::vector<float> extra(n_channels - 8, 0.0);
// If this is contiguous mode (default) then we combine the arrays.
if (contig)
sample.insert(sample.end(), extra.begin(), extra.end());

// bytes is used in !contig mode because we need to know how big each buffer is.
std::array<uint32_t, 2> bytes = {8 * sizeof(float), static_cast<uint32_t>((n_channels - 8) * sizeof(float))};

// Your device might have its own timer. Or you can decide how often to poll
// your device, as we do here.
int32_t sample_dur_us = 1000000 / (samplingrate > 0 ? samplingrate : 100);
auto t_start = std::chrono::high_resolution_clock::now();
auto next_sample_time = t_start;

// send data forever
std::cout << "Now sending data... " << std::endl;
for (unsigned t = 0;; t++) {

// Create random data for the first 8 channels.
for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5);
// For the remaining channels, fill them with a sample counter (wraps at 1M).
if (contig)
std::fill(sample.begin()+8, sample.end(), t % 1000000);
else
std::fill(extra.begin(), extra.end(), t % 1000000);

// Wait until the next expected sample time.
next_sample_time += std::chrono::microseconds(sample_dur_us);
std::this_thread::sleep_until(next_sample_time);

// send the sample
if (contig) {
std::cout << sample[0] << "\t" << sample[8] << std::endl;
outlet.push_sample(sample);
}
else {
// Advanced: Push set of discontiguous buffers.
std::array<float *, 2> bufs = {sample.data(), extra.data()};
outlet.push_numeric_bufs((void **)bufs.data(),
bytes.data(), 2, lsl::local_clock(), true);
}
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
std::cout << "Press any key to exit. " << std::endl;
std::cin.get();
return 0;
}
Loading