Skip to content

Commit

Permalink
Reliability tests (#134)
Browse files Browse the repository at this point in the history
* Add cpp-data-channel-client-reliability-tests

* Update the readme and the room.

* Fix cpp-data-channel-client-reliability-tests.

* Add the return value for datachannel.send.

* Reduce delays.

* Release the GIL for data channel send methods.
  • Loading branch information
mamaheux authored Dec 19, 2023
1 parent db0d96c commit 816c00e
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 23 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,6 @@ add_subdirectory(signaling-server)

if(OPENTERA_WEBRTC_ENABLE_EXAMPLES)
add_subdirectory(examples/cpp-data-channel-client)
add_subdirectory(examples/cpp-data-channel-client-reliability-tests)
add_subdirectory(examples/cpp-stream-client)
endif()
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.6.6
0.6.7
4 changes: 4 additions & 0 deletions examples/cpp-data-channel-client-reliability-tests/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cmake-build-debug
cmake-build-release
build
.idea
28 changes: 28 additions & 0 deletions examples/cpp-data-channel-client-reliability-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
cmake_minimum_required(VERSION 3.14.0)

include_directories(${CMAKE_CURRENT_BINARY_DIR})

project(CppDataChannelReliabilityTests)

set(LIBRARY_OUTPUT_PATH bin/${CMAKE_BUILD_TYPE})

include_directories(${OpenCV_INCLUDE_DIRS})
include_directories(BEFORE SYSTEM ${webrtc_native_INCLUDE})
include_directories(../../opentera-webrtc-native-client/3rdParty/socket.io-client-cpp/src)
include_directories(../../opentera-webrtc-native-client/3rdParty/socket.io-client-cpp/lib/rapidjson/include)
include_directories(../../opentera-webrtc-native-client/3rdParty/cpp-httplib)
include_directories(../../opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include)

add_executable(CppDataChannelReliabilityTests main.cpp)

target_link_libraries(CppDataChannelReliabilityTests
OpenteraWebrtcNativeClient
)

if (NOT WIN32)
target_link_libraries(CppDataChannelReliabilityTests
pthread
)
endif()

set_property(TARGET CppDataChannelReliabilityTests PROPERTY CXX_STANDARD 17)
31 changes: 31 additions & 0 deletions examples/cpp-data-channel-client-reliability-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# cpp-data-channel-client-reliability-tests

This example tests the reliability of the data channel.

## How to use

1. Build the example.
```bash
cd ../..
mkdir build
cd build
cmake ..
cmake --build . --config Release|Debug
```

2. Start the signaling server.
```bash
./start_server.bash
```

3. Start a master client.
```bash
cd ../../build/bin/Release
./CppDataChannelReliabilityTests http://localhost:8080 master abc true
```

4. Start a slave client.
```bash
cd ../../build/bin/Release
./CppDataChannelReliabilityTests http://localhost:8080 slave abc false
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[
{
"urls": "stun:stun.l.google.com:19302"
}
]
210 changes: 210 additions & 0 deletions examples/cpp-data-channel-client-reliability-tests/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#include <OpenteraWebrtcNativeClient/DataChannelClient.h>

#include <iostream>
#include <csignal>
#include <atomic>
#include <chrono>
#include <thread>

using namespace opentera;
using namespace std;

constexpr chrono::milliseconds CLOSING_CONNECTION_DELAY(1000);

constexpr int MESSAGE_COUNT = 100;

atomic_bool isRunning = true;

void sigintSigtermCallbackHandler(int signum)
{
isRunning = false;
}

template <class F>
bool waitFor(F f)
{
constexpr chrono::milliseconds SLEEP_TIME(100);
constexpr chrono::milliseconds TIMEOUT(10000);

auto start = chrono::steady_clock::now();
while (!f())
{
this_thread::sleep_for(SLEEP_TIME);

if (std::chrono::duration_cast<std::chrono::milliseconds>(chrono::steady_clock::now() - start) > TIMEOUT)
{
return false;
}
else if (!isRunning)
{
exit(-1);
}
}

return true;
}

int main(int argc, char* argv[])
{
if (argc != 5)
{
cout << "Usage: CppDataChannelReliabilityTests base_url name password master(true|false)" << endl;
return -1;
}

string baseUrl = argv[1];
string name = argv[2];
string password = argv[3];
bool isMaster = string(argv[4]) != "false";

signal(SIGINT, sigintSigtermCallbackHandler);
signal(SIGTERM, sigintSigtermCallbackHandler);

vector<IceServer> iceServers;
if (!IceServer::fetchFromServer(baseUrl + "/iceservers", password, iceServers))
{
cout << "IceServer::fetchFromServer failed" << endl;
iceServers.clear();
}

cout << "Ice servers=" << endl;
for (auto s : iceServers)
{
cout << "\turls=" << endl;
for (auto u : s.urls())
{
cout << "\t\t" << u << endl;
}
cout << "\tusername=" << s.username() << endl;
cout << "\tcredential=" << s.credential() << endl;
}
cout << endl;

auto signalingServerConfiguration =
SignalingServerConfiguration::create(baseUrl, name, "reliability", password);
auto webrtcConfiguration = WebrtcConfiguration::create(iceServers);
auto dataChannelConfiguration = DataChannelConfiguration::create();
DataChannelClient client(signalingServerConfiguration, webrtcConfiguration, dataChannelConfiguration);

atomic_bool hasAnotherClient = false;
atomic_bool isDataChannelOpened = false;
atomic_int currentMessageId = 0;

int successfulConnectionCount = 0;
int failedConnectionCount = 0;
int successfulMessageGroupCount = 0;
int failedMessageGroupCount = 0;

client.setOnSignalingConnectionError(
[](const string& error)
{
cout << "OnSignalingConnectionClosed:" << endl << "\t" << error;
});

client.setOnRoomClientsChanged(
[&](const vector<RoomClient>& roomClients)
{
hasAnotherClient = roomClients.size() > 1;
});

client.setOnError(
[](const string& error)
{
cout << "error:" << endl;
cout << "\t" << error << endl;
});

client.setOnDataChannelOpened(
[&](const Client& client)
{
isDataChannelOpened = true;
});
client.setOnDataChannelError(
[](const Client& client, const string& error)
{
cout << "OnDataChannelError:" << endl;
cout << "\tid=" << client.id() << ", name=" << client.name() << endl;
cout << "\t" << error << endl;
});
client.setOnDataChannelMessageString(
[&](const Client& _, const string& message)
{
int receivedId = stoi(message);
currentMessageId = receivedId + 1;
if (!client.sendToAll(to_string(currentMessageId.load())))
{
cout << "sendToAll failed" << endl;
}

cout << "receivedId=" << receivedId << endl;
});

client.connect();

cout << "Connecting to the signaling server." << endl;
if (!waitFor([&](){ return client.isConnected(); }))
{
cout << "Signaling server connection failed." << endl;
return -1;
}

cout << "Waiting for another client." << endl;
if (!waitFor([&](){ return hasAnotherClient.load(); }))
{
cout << "No other client." << endl;
return -1;
}

while (isRunning)
{
isDataChannelOpened = false;
if (isMaster)
{
client.callAll();
}

if (waitFor([&](){ return isDataChannelOpened.load(); }))
{
successfulConnectionCount++;
currentMessageId = 0;
if (isMaster)
{
if (!client.sendToAll(to_string(currentMessageId.load())))
{
cout << "sendToAll failed" << endl;
}
}

if (waitFor([&](){ return currentMessageId.load() >= MESSAGE_COUNT; }))
{
successfulMessageGroupCount++;
}
else
{
failedMessageGroupCount++;
}
}
else
{
failedConnectionCount++;
}

if (isMaster)
{
client.closeAllRoomPeerConnections();
}

cout << endl << "************ Stats ************" << endl;
cout << "\t successfulConnectionCount=" << successfulConnectionCount << endl;
cout << "\t failedConnectionCount=" << failedConnectionCount << endl;
cout << "\t successfulMessageGroupCount=" << successfulMessageGroupCount << endl;
cout << "\t failedMessageGroupCount=" << failedMessageGroupCount << endl;
cout << "************ Stats ************" << endl << endl;

this_thread::sleep_for(CLOSING_CONNECTION_DELAY);
}

client.closeSync();

return 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

SCRIPT=`realpath $0`
SCRIPT_PATH=`dirname $SCRIPT`

cd $SCRIPT_PATH/../../signaling-server
python3 opentera-signaling-server --port 8080 --password abc --ice_servers $SCRIPT_PATH/iceServers.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ namespace opentera
DECLARE_NOT_COPYABLE(DataChannelClient);
DECLARE_NOT_MOVABLE(DataChannelClient);

void sendTo(const uint8_t* data, std::size_t size, const std::vector<std::string>& ids);
void sendTo(const std::string& message, const std::vector<std::string>& ids);
void sendToAll(const uint8_t* data, std::size_t size);
void sendToAll(const std::string& message);
bool sendTo(const uint8_t* data, std::size_t size, const std::vector<std::string>& ids);
bool sendTo(const std::string& message, const std::vector<std::string>& ids);
bool sendToAll(const uint8_t* data, std::size_t size);
bool sendToAll(const std::string& message);

void setOnDataChannelOpened(const std::function<void(const Client&)>& callback);
void setOnDataChannelClosed(const std::function<void(const Client&)>& callback);
Expand All @@ -45,8 +45,8 @@ namespace opentera
void setOnDataChannelMessageString(const std::function<void(const Client&, const std::string&)>& callback);

protected:
void sendTo(const webrtc::DataBuffer& buffer, const std::vector<std::string>& ids);
void sendToAll(const webrtc::DataBuffer& buffer);
bool sendTo(const webrtc::DataBuffer& buffer, const std::vector<std::string>& ids);
bool sendToAll(const webrtc::DataBuffer& buffer);

std::unique_ptr<PeerConnectionHandler>
createPeerConnectionHandler(const std::string& id, const Client& peerClient, bool isCaller) override;
Expand All @@ -59,9 +59,9 @@ namespace opentera
* @param size The binary data size
* @param ids The client ids
*/
inline void DataChannelClient::sendTo(const uint8_t* data, size_t size, const std::vector<std::string>& ids)
inline bool DataChannelClient::sendTo(const uint8_t* data, size_t size, const std::vector<std::string>& ids)
{
sendTo(webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data, size), true), ids);
return sendTo(webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data, size), true), ids);
}

/**
Expand All @@ -70,9 +70,9 @@ namespace opentera
* @param message The string message
* @param ids The client ids
*/
inline void DataChannelClient::sendTo(const std::string& message, const std::vector<std::string>& ids)
inline bool DataChannelClient::sendTo(const std::string& message, const std::vector<std::string>& ids)
{
sendTo(webrtc::DataBuffer(message), ids);
return sendTo(webrtc::DataBuffer(message), ids);
}

/**
Expand All @@ -81,17 +81,17 @@ namespace opentera
* @param data The binary data
* @param size The binary data size
*/
inline void DataChannelClient::sendToAll(const uint8_t* data, size_t size)
inline bool DataChannelClient::sendToAll(const uint8_t* data, size_t size)
{
sendToAll(webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data, size), true));
return sendToAll(webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data, size), true));
}

/**
* @brief Sends a string message to all clients.
*
* @param message The string message
*/
inline void DataChannelClient::sendToAll(const std::string& message) { sendToAll(webrtc::DataBuffer(message)); }
inline bool DataChannelClient::sendToAll(const std::string& message) { return sendToAll(webrtc::DataBuffer(message)); }

/**
* @brief Sets the callback that is called when a data channel opens.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace opentera

void setPeerConnection(const rtc::scoped_refptr<webrtc::PeerConnectionInterface>& peerConnection) override;

void send(const webrtc::DataBuffer& buffer);
bool send(const webrtc::DataBuffer& buffer);

// Observer methods
void OnDataChannel(rtc::scoped_refptr<webrtc::DataChannelInterface> dataChannel) override;
Expand Down
Loading

0 comments on commit 816c00e

Please sign in to comment.