Compare commits

..

42 Commits

Author SHA1 Message Date
384454b293 Bumped version to 0.6.2 2020-06-12 20:00:29 +02:00
34db6ae673 Made description mutexes non-recursive and fix deadlock 2020-06-12 19:59:44 +02:00
31c88b0783 Bumped version to v0.6.1 2020-06-12 18:13:49 +02:00
c502b1f207 Updated libjuice to v0.4.2 2020-06-12 18:12:27 +02:00
858e181be1 Merge branch 'dev' 2020-06-12 18:07:10 +02:00
d00c73e993 Changed error constants to defines 2020-06-12 17:49:29 +02:00
9403818a12 Fixed description 2020-06-12 17:46:04 +02:00
b233e655cc Updated libjuice 2020-06-12 17:43:33 +02:00
b625519c4a Merge pull request #90 from paullouisageneau/more-sctp-tuning
SCTP optimizations
2020-06-12 13:03:46 +00:00
82e604b869 Fixed benchmark 2020-06-12 14:42:48 +02:00
60169cc676 Prevent write lock contention in SCTP transport 2020-06-12 13:59:52 +02:00
ee0139402a Enable SCTP NR-SACKs and reduce SACK delay to 20ms 2020-06-12 13:59:43 +02:00
661d6827c6 Fixed typo 2020-06-11 22:02:26 +02:00
5a331f1087 Added JSEP 2020-06-11 21:26:57 +02:00
1aedbddc55 Merge pull request #88 from paullouisageneau/sctp-tuning
Tune SCTP
2020-06-11 15:10:56 +00:00
35d58bb4e5 Merge pull request #89 from paullouisageneau/fix-compilation-win32
Fix compilation for Windows
2020-06-11 13:08:34 +00:00
0da5985ef6 Fixed compilation for Windows 2020-06-11 14:57:08 +02:00
8440c085ca Fix set_target_properties wrong number of arguments error 2020-06-11 11:02:25 +02:00
b68ccb4d71 Lock remote description only when necessary 2020-06-11 10:44:47 +02:00
8e3ec73ca6 Lowered goodput test threshold 2020-06-11 09:46:52 +02:00
c29de9dd1e Optimized libjuice 2020-06-10 23:21:27 +02:00
2ca3b07938 Added GnuTLS explicit initialization 2020-06-10 19:24:21 +02:00
679263c9f7 Change SCTP congestion control to H-TCP 2020-06-10 18:55:18 +02:00
9d635feb30 Increase SCTP max chunks on queue 2020-06-10 18:29:19 +02:00
fc29073577 Increase the initial window size to 10 MTUs 2020-06-10 18:28:26 +02:00
83bb6878f7 Changed SCTP congestion control to HighSpeed TCP 2020-06-10 18:18:48 +02:00
672124aa29 Updated libjuice to increase UDP buffer size 2020-06-10 18:14:38 +02:00
1e734906d3 Merge pull request #87 from paullouisageneau/fix-capi
Properly catch exceptions in C API
2020-06-10 15:03:14 +00:00
d0695aa9cb Properly catch exceptions in C API 2020-06-10 16:54:32 +02:00
3a941367b8 Fixed dependabot.yml 2020-06-09 13:32:06 +02:00
52dcae6453 Create dependabot.yml 2020-06-09 13:30:48 +02:00
22a1c56863 Added optional preloading 2020-06-09 11:17:09 +02:00
74c5cbcf9f Some fixes for tests 2020-06-09 10:48:49 +02:00
5b2c0cbc08 Fixed compilation 2020-06-09 00:09:43 +02:00
d853bb59c3 Enhancements to tests 2020-06-09 00:04:23 +02:00
c30927b6fa Updated libjuice 2020-06-08 23:28:03 +02:00
3a72adf8c8 Removed cleanup 2020-06-08 16:03:28 +02:00
767d719563 Merge pull request #86 from paullouisageneau/benchmark
Added benchmark
2020-06-08 13:58:21 +00:00
af87e5a1b8 Fixed unsafe smart_ptr access from thread 2020-06-08 15:48:58 +02:00
c83196ee40 Lowered threshold for CI 2020-06-08 15:40:37 +02:00
dfcae8f4fd Added benchmark 2020-06-08 15:34:15 +02:00
64be7a62f4 Fixed potential deadlock when sending from buffer low callback 2020-06-08 15:29:47 +02:00
23 changed files with 813 additions and 507 deletions

6
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,6 @@
version: 2
updates:
- package-ecosystem: "npm"
directory: "examples/web"
schedule:
interval: "weekly"

View File

@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.7)
project(libdatachannel
DESCRIPTION "WebRTC DataChannels Library"
VERSION 0.6.0
VERSION 0.6.2
LANGUAGES CXX)
option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
@ -74,6 +74,7 @@ set(TESTS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/test/connectivity.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/capi.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/websocket.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/benchmark.cpp
)
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
@ -160,10 +161,10 @@ if (USE_GNUTLS)
if(NOT TARGET GnuTLS::GnuTLS)
add_library(GnuTLS::GnuTLS UNKNOWN IMPORTED)
set_target_properties(GnuTLS::GnuTLS PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${GNUTLS_INCLUDE_DIRS}
INTERFACE_COMPILE_DEFINITIONS ${GNUTLS_DEFINITIONS}
INTERFACE_INCLUDE_DIRECTORIES "${GNUTLS_INCLUDE_DIRS}"
INTERFACE_COMPILE_DEFINITIONS "${GNUTLS_DEFINITIONS}"
IMPORTED_LINK_INTERFACE_LANGUAGES C
IMPORTED_LOCATION ${GNUTLS_LIBRARIES})
IMPORTED_LOCATION "${GNUTLS_LIBRARIES}")
endif()
target_compile_definitions(datachannel PRIVATE USE_GNUTLS=1)
target_compile_definitions(datachannel-static PRIVATE USE_GNUTLS=1)
@ -206,6 +207,16 @@ set_target_properties(datachannel-tests PROPERTIES OUTPUT_NAME tests)
target_include_directories(datachannel-tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(datachannel-tests datachannel nlohmann_json::nlohmann_json)
# Benchmark
add_executable(datachannel-benchmark test/benchmark.cpp)
set_target_properties(datachannel-benchmark PROPERTIES
VERSION ${PROJECT_VERSION}
CXX_STANDARD 17)
set_target_properties(datachannel-benchmark PROPERTIES OUTPUT_NAME benchmark)
target_compile_definitions(datachannel-benchmark PRIVATE BENCHMARK_MAIN=1)
target_include_directories(datachannel-benchmark PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(datachannel-benchmark datachannel)
# Examples
set(JSON_BuildTests OFF CACHE INTERNAL "")
add_subdirectory(deps/json)

View File

@ -1,7 +1,6 @@
# libdatachannel - C/C++ WebRTC Data Channels
libdatachannel is a standalone implementation of WebRTC Data Channels and WebSockets in C++17 with C bindings for POSIX platforms (including Linux and Apple macOS) and Microsoft Windows. It enables direct connectivity between native applications and web browsers without the pain of importing the entire WebRTC stack. Its API is modelled as a simplified version of the JavaScript WebRTC and WebSocket browser API, in order to ease the design of cross-environment applications.
libdatachannel is a standalone implementation of WebRTC Data Channels and WebSockets in C++17 with C bindings for POSIX platforms (including Linux and Apple macOS) and Microsoft Windows. It enables direct connectivity between native applications and web browsers without the pain of importing the entire WebRTC stack. The interface consists of simplified versions of the JavaScript WebRTC and WebSocket APIs present in browsers, in order to ease the design of cross-environment applications.
It can be compiled with multiple backends:
- The security layer can be provided through [GnuTLS](https://www.gnutls.org/) or [OpenSSL](https://www.openssl.org/).
- The connectivity for WebRTC can be provided through my ad-hoc ICE library [libjuice](https://github.com/paullouisageneau/libjuice) as submodule or through [libnice](https://github.com/libnice/libnice).
@ -26,6 +25,7 @@ Protocol stack:
Features:
- Full IPv6 support
- Trickle ICE ([draft-ietf-ice-trickle-21](https://tools.ietf.org/html/draft-ietf-ice-trickle-21))
- JSEP compatible ([draft-ietf-rtcweb-jsep-26](https://tools.ietf.org/html/draft-ietf-rtcweb-jsep-26))
- Multicast DNS candidates ([draft-ietf-rtcweb-mdns-ice-candidates-04](https://tools.ietf.org/html/draft-ietf-rtcweb-mdns-ice-candidates-04))
- TURN relaying ([RFC5766](https://tools.ietf.org/html/rfc5766)) with [libnice](https://github.com/libnice/libnice) as ICE backend
- SRTP media transport ([RFC3711](https://tools.ietf.org/html/rfc3711)) with [libSRTP](https://github.com/cisco/libsrtp)

2
deps/libjuice vendored

View File

@ -19,12 +19,19 @@
#include <rtc/rtc.h>
#include <ctype.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef _WIN32
#include "getline.h"
#include <windows.h>
static void sleep(unsigned int secs) { Sleep(secs * 1000); }
#else
#include <unistd.h> // for sleep
#include <ctype.h>
#endif
typedef struct {
rtcState state;
@ -65,11 +72,9 @@ int main(int argc, char **argv) {
Peer *peer = (Peer *)malloc(sizeof(Peer));
if (!peer) {
printf("Error allocating memory for peer\n");
deletePeer(peer);
}
fprintf(stderr, "Error allocating memory for peer\n");
return -1;
}
memset(peer, 0, sizeof(Peer));
printf("Peer created\n");
@ -86,17 +91,12 @@ int main(int argc, char **argv) {
rtcSetUserPointer(peer->dc, NULL);
rtcSetDataChannelCallback(peer->pc, dataChannelCallback);
sleep(1);
bool exit = false;
bool exit = false;
while (!exit) {
printf("\n");
printf("***************************************************************************************\n");
// << endl
printf("* 0: Exit /"
" 1: Enter remote description /"
" 2: Enter remote candidate /"
@ -106,16 +106,16 @@ int main(int argc, char **argv) {
int command = -1;
int c;
// int check_scan
if (scanf("%d", &command)) {
}else {
break;
}
while ((c = getchar()) != '\n' && c != EOF) { }
if (!scanf("%d", &command)) {
break;
}
fflush(stdin);
switch (command) {
while ((c = getchar()) != '\n' && c != EOF) {
}
fflush(stdin);
switch (command) {
case 0: {
exit = true;
break;
@ -123,8 +123,6 @@ int main(int argc, char **argv) {
case 1: {
// Parse Description
printf("[Description]: ");
char *line = NULL;
size_t len = 0;
size_t read = 0;
@ -151,12 +149,12 @@ int main(int argc, char **argv) {
rtcAddRemoteCandidate(peer->pc, candidate, "0");
free(candidate);
}else {
printf("Error reading line\n");
break;
}
} else {
printf("Error reading line\n");
break;
}
break;
break;
}
case 3: {
// Send Message
@ -171,11 +169,11 @@ int main(int argc, char **argv) {
if(getline(&message, &message_size, stdin)) {
rtcSendMessage(peer->dc, message, -1);
free(message);
}else {
printf("Error reading line\n");
break;
}
break;
} else {
printf("Error reading line\n");
break;
}
break;
}
case 4: {
// Connection Info
@ -200,12 +198,10 @@ int main(int argc, char **argv) {
}
}
deletePeer(peer);
return 0;
}
static void descriptionCallback(const char *sdp, const char *type, void *ptr) {
// Peer *peer = (Peer *)ptr;
printf("Description %s:\n%s\n", "answerer", sdp);
@ -229,22 +225,19 @@ static void gatheringStateCallback(rtcGatheringState state, void *ptr) {
printf("Gathering state %s: %s\n", "answerer", rtcGatheringState_print(state));
}
static void closedCallback(void *ptr) {
Peer *peer = (Peer *)ptr;
peer->connected = false;
}
static void messageCallback(const char *message, int size, void *ptr) {
// Peer *peer = (Peer *)ptr;
if (size < 0) { // negative size indicates a null-terminated string
printf("Message %s: %s\n", "answerer", message);
} else {
printf("Message %s: [binary of size %d]\n", "answerer", size);
}
}
static void deletePeer(Peer *peer) {
if (peer) {
if (peer->dc)
@ -264,16 +257,16 @@ static void dataChannelCallback(int dc, void *ptr) {
char buffer[256];
if (rtcGetDataChannelLabel(dc, buffer, 256) >= 0)
printf("DataChannel %s: Received with label \"%s\"\n", "answerer", buffer);
}
int all_space(const char *str) {
while (*str) {
if (!isspace(*str++)) {
return 0;
}
}
return 1;
return 1;
}
char* state_print(rtcState state) {
@ -302,7 +295,6 @@ char* state_print(rtcState state) {
}
return str;
}
char* rtcGatheringState_print(rtcState state) {
@ -322,5 +314,4 @@ char* rtcGatheringState_print(rtcState state) {
}
return str;
}

View File

@ -0,0 +1,48 @@
// Simple POSIX getline() implementation
// This code is public domain
#include "malloc.h"
#include "stdio.h"
ssize_t getline(char **lineptr, size_t *n, FILE *stream) {
if (!lineptr || !stream || !n)
return -1;
int c = getc(stream);
if (c == EOF)
return -1;
if (!*lineptr) {
*lineptr = malloc(128);
if (!*lineptr)
return -1;
*n = 128;
}
size_t pos = 0;
while(c != EOF) {
if (pos + 1 >= *n) {
size_t new_size = *n + (*n >> 2);
if (new_size < 128)
new_size = 128;
char *new_ptr = realloc(*lineptr, new_size);
if (!new_ptr)
return -1;
*n = new_size;
*lineptr = new_ptr;
}
((unsigned char *)(*lineptr))[pos ++] = c;
if (c == '\n')
break;
c = getc(stream);
}
(*lineptr)[pos] = '\0';
return pos;
}

View File

@ -19,12 +19,19 @@
#include <rtc/rtc.h>
#include <ctype.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef _WIN32
#include "getline.h"
#include <windows.h>
static void sleep(unsigned int secs) { Sleep(secs * 1000); }
#else
#include <unistd.h> // for sleep
#include <ctype.h>
#endif
char* state_print(rtcState state);
char* rtcGatheringState_print(rtcState state);
@ -65,13 +72,11 @@ int main(int argc, char **argv){
memset(&config, 0, sizeof(config));
Peer *peer = (Peer *)malloc(sizeof(Peer));
if (!peer) {
printf("Error allocating memory for peer\n");
deletePeer(peer);
}
memset(peer, 0, sizeof(Peer));
if (!peer) {
fprintf(stderr, "Error allocating memory for peer\n");
return -1;
}
memset(peer, 0, sizeof(Peer));
printf("Peer created\n");
@ -85,25 +90,17 @@ int main(int argc, char **argv){
// Since this is the offere, we will create a datachannel
peer->dc = rtcCreateDataChannel(peer->pc, "test");
rtcSetOpenCallback(peer->dc, openCallback);
rtcSetClosedCallback(peer->dc, closedCallback);
rtcSetMessageCallback(peer->dc, messageCallback);
sleep(1);
sleep(1);
bool exit = false;
bool exit = false;
while (!exit) {
printf("\n");
printf("***************************************************************************************\n");
// << endl
printf("* 0: Exit /"
" 1: Enter remote description /"
" 2: Enter remote candidate /"
@ -113,13 +110,14 @@ int main(int argc, char **argv){
int command = -1;
int c;
if (scanf("%d", &command)) {
}else {
break;
}
while ((c = getchar()) != '\n' && c != EOF) { }
fflush(stdin);
if (!scanf("%d", &command)) {
break;
}
while ((c = getchar()) != '\n' && c != EOF) {
}
fflush(stdin);
switch (command) {
case 0: {
@ -210,11 +208,6 @@ int main(int argc, char **argv){
return 0;
}
static void descriptionCallback(const char *sdp, const char *type, void *ptr) {
// Peer *peer = (Peer *)ptr;
printf("Description %s:\n%s\n", "offerer", sdp);
@ -238,22 +231,17 @@ static void gatheringStateCallback(rtcGatheringState state, void *ptr) {
printf("Gathering state %s: %s\n", "offerer", rtcGatheringState_print(state));
}
static void openCallback(void *ptr) {
Peer *peer = (Peer *)ptr;
peer->connected = true;
char buffer[256];
if (rtcGetDataChannelLabel(peer->dc, buffer, 256) >= 0)
printf("DataChannel %s: Received with label \"%s\"\n","offerer", buffer);
}
static void closedCallback(void *ptr) {
Peer *peer = (Peer *)ptr;
peer->connected = false;
}
static void messageCallback(const char *message, int size, void *ptr) {
@ -264,6 +252,7 @@ static void messageCallback(const char *message, int size, void *ptr) {
printf("Message %s: [binary of size %d]\n", "offerer", size);
}
}
static void deletePeer(Peer *peer) {
if (peer) {
if (peer->dc)
@ -274,14 +263,14 @@ static void deletePeer(Peer *peer) {
}
}
int all_space(const char *str) {
while (*str) {
if (!isspace(*str++)) {
return 0;
}
}
return 1;
return 1;
}
char* state_print(rtcState state) {
@ -330,5 +319,4 @@ char* rtcGatheringState_print(rtcState state) {
}
return str;
}

View File

@ -31,6 +31,7 @@ using init_token = std::shared_ptr<Init>;
class Init {
public:
static init_token Token();
static void Preload();
static void Cleanup();
~Init();
@ -43,6 +44,7 @@ private:
static std::mutex Mutex;
};
inline void Preload() { Init::Preload(); }
inline void Cleanup() { Init::Cleanup(); }
} // namespace rtc

View File

@ -145,7 +145,7 @@ private:
init_token mInitToken = Init::Token();
std::optional<Description> mLocalDescription, mRemoteDescription;
mutable std::recursive_mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;
mutable std::mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;
std::shared_ptr<IceTransport> mIceTransport;
std::shared_ptr<DtlsTransport> mDtlsTransport;

View File

@ -60,6 +60,10 @@ typedef enum { // Don't change, it must match plog severity
RTC_LOG_VERBOSE = 6
} rtcLogLevel;
#define RTC_ERR_SUCCESS 0
#define RTC_ERR_INVALID -1 // invalid argument
#define RTC_ERR_FAILURE -2 // runtime error
typedef struct {
const char **iceServers;
int iceServersCount;
@ -129,7 +133,8 @@ int rtcGetAvailableAmount(int id); // total size available to receive
int rtcSetAvailableCallback(int id, availableCallbackFunc cb);
int rtcReceiveMessage(int id, char *buffer, int *size);
// Cleanup
// Optional preload and cleanup
void rtcPreload();
void rtcCleanup();
#ifdef __cplusplus

View File

@ -235,7 +235,7 @@ namespace {
template <class F, class... Args>
std::future<std::result_of_t<std::decay_t<F>(std::decay_t<Args>...)>> thread_call(F &&f,
Args &&... args) {
using R = std::result_of_t<std::decay_t<F>(std::decay_t<Args>...)>;
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
std::packaged_task<R()> task(std::bind(f, std::forward<Args>(args)...));
std::future<R> future = task.get_future();
std::thread t(std::move(task));

View File

@ -61,7 +61,7 @@ string make_fingerprint(X509 *x509);
using certificate_ptr = std::shared_ptr<Certificate>;
using future_certificate_ptr = std::shared_future<certificate_ptr>;
future_certificate_ptr make_certificate(string commonName); // cached
future_certificate_ptr make_certificate(string commonName = "libdatachannel"); // cached
void CleanupCertificateCache();

View File

@ -36,12 +36,10 @@ namespace rtc {
#if USE_GNUTLS
void DtlsTransport::Init() {
// Nothing to do
gnutls_global_init(); // optional
}
void DtlsTransport::Cleanup() {
// Nothing to do
}
void DtlsTransport::Cleanup() { gnutls_global_deinit(); }
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr certificate,
verifier_callback verifierCallback, state_callback stateChangeCallback)

View File

@ -55,6 +55,11 @@ init_token Init::Token() {
return Global;
}
void Init::Preload() {
Token(); // pre-init
make_certificate().wait(); // preload certificate
}
void Init::Cleanup() { Global.reset(); }
Init::Init() {

View File

@ -39,9 +39,12 @@ using std::weak_ptr;
PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
PeerConnection::PeerConnection(const Configuration &config)
: mConfig(config), mCertificate(make_certificate("libdatachannel")), mState(State::New),
: mConfig(config), mCertificate(make_certificate()), mState(State::New),
mGatheringState(GatheringState::New) {
PLOG_VERBOSE << "Creating PeerConnection";
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
throw std::invalid_argument("Invalid port range");
}
PeerConnection::~PeerConnection() {
@ -89,18 +92,20 @@ void PeerConnection::setLocalDescription(std::optional<Description> description)
void PeerConnection::setRemoteDescription(Description description) {
description.hintType(localDescription() ? Description::Type::Answer : Description::Type::Offer);
auto remoteCandidates = description.extractCandidates();
std::lock_guard lock(mRemoteDescriptionMutex);
mRemoteDescription.emplace(std::move(description));
auto type = description.type();
auto remoteCandidates = description.extractCandidates(); // Candidates will be added at the end
auto iceTransport = std::atomic_load(&mIceTransport);
if (!iceTransport)
iceTransport = initIceTransport(Description::Role::ActPass);
iceTransport->setRemoteDescription(description);
iceTransport->setRemoteDescription(*mRemoteDescription);
{
std::lock_guard lock(mRemoteDescriptionMutex);
mRemoteDescription.emplace(std::move(description));
}
if (mRemoteDescription->type() == Description::Type::Offer) {
if (type == Description::Type::Offer) {
// This is an offer and we are the answerer.
Description localDescription = iceTransport->getLocalDescription(Description::Type::Answer);
localDescription.addMedia(description); // blindly accept media
@ -131,14 +136,11 @@ void PeerConnection::setRemoteDescription(Description description) {
}
void PeerConnection::addRemoteCandidate(Candidate candidate) {
std::lock_guard lock(mRemoteDescriptionMutex);
auto iceTransport = std::atomic_load(&mIceTransport);
if (!mRemoteDescription || !iceTransport)
throw std::logic_error("Remote candidate set without remote description");
mRemoteDescription->addCandidate(candidate);
if (candidate.resolve(Candidate::ResolveMode::Simple)) {
iceTransport->addRemoteCandidate(candidate);
} else {
@ -151,6 +153,9 @@ void PeerConnection::addRemoteCandidate(Candidate candidate) {
});
t.detach();
}
std::lock_guard lock(mRemoteDescriptionMutex);
mRemoteDescription->addCandidate(candidate);
}
std::optional<string> PeerConnection::localAddress() const {
@ -594,11 +599,13 @@ void PeerConnection::processLocalDescription(Description description) {
auto certificate = mCertificate.get(); // wait for certificate if not ready
std::lock_guard lock(mLocalDescriptionMutex);
mLocalDescription.emplace(std::move(description));
mLocalDescription->setFingerprint(certificate->fingerprint());
mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
{
std::lock_guard lock(mLocalDescriptionMutex);
mLocalDescription.emplace(std::move(description));
mLocalDescription->setFingerprint(certificate->fingerprint());
mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
}
mLocalDescriptionCallback(*mLocalDescription);
}

View File

@ -29,6 +29,7 @@
#include <exception>
#include <mutex>
#include <type_traits>
#include <unordered_map>
#include <utility>
@ -36,14 +37,6 @@ using namespace rtc;
using std::shared_ptr;
using std::string;
#define CATCH(statement) \
try { \
statement; \
} catch (const std::exception &e) { \
PLOG_ERROR << e.what(); \
return -1; \
}
namespace {
std::unordered_map<int, shared_ptr<PeerConnection>> peerConnectionMap;
@ -71,14 +64,18 @@ void setUserPointer(int i, void *ptr) {
shared_ptr<PeerConnection> getPeerConnection(int id) {
std::lock_guard lock(mutex);
auto it = peerConnectionMap.find(id);
return it != peerConnectionMap.end() ? it->second : nullptr;
if (auto it = peerConnectionMap.find(id); it != peerConnectionMap.end())
return it->second;
else
throw std::invalid_argument("PeerConnection ID does not exist");
}
shared_ptr<DataChannel> getDataChannel(int id) {
std::lock_guard lock(mutex);
auto it = dataChannelMap.find(id);
return it != dataChannelMap.end() ? it->second : nullptr;
if (auto it = dataChannelMap.find(id); it != dataChannelMap.end())
return it->second;
else
throw std::invalid_argument("DataChannel ID does not exist");
}
int emplacePeerConnection(shared_ptr<PeerConnection> ptr) {
@ -95,27 +92,27 @@ int emplaceDataChannel(shared_ptr<DataChannel> ptr) {
return dc;
}
bool erasePeerConnection(int pc) {
void erasePeerConnection(int pc) {
std::lock_guard lock(mutex);
if (peerConnectionMap.erase(pc) == 0)
return false;
throw std::invalid_argument("PeerConnection ID does not exist");
userPointerMap.erase(pc);
return true;
}
bool eraseDataChannel(int dc) {
void eraseDataChannel(int dc) {
std::lock_guard lock(mutex);
if (dataChannelMap.erase(dc) == 0)
return false;
throw std::invalid_argument("DataChannel ID does not exist");
userPointerMap.erase(dc);
return true;
}
#if RTC_ENABLE_WEBSOCKET
shared_ptr<WebSocket> getWebSocket(int id) {
std::lock_guard lock(mutex);
auto it = webSocketMap.find(id);
return it != webSocketMap.end() ? it->second : nullptr;
if (auto it = webSocketMap.find(id); it != webSocketMap.end())
return it->second;
else
throw std::invalid_argument("WebSocket ID does not exist");
}
int emplaceWebSocket(shared_ptr<WebSocket> ptr) {
@ -125,12 +122,11 @@ int emplaceWebSocket(shared_ptr<WebSocket> ptr) {
return ws;
}
bool eraseWebSocket(int ws) {
void eraseWebSocket(int ws) {
std::lock_guard lock(mutex);
if (webSocketMap.erase(ws) == 0)
return false;
throw std::invalid_argument("WebSocket ID does not exist");
userPointerMap.erase(ws);
return true;
}
#endif
@ -142,9 +138,28 @@ shared_ptr<Channel> getChannel(int id) {
if (auto it = webSocketMap.find(id); it != webSocketMap.end())
return it->second;
#endif
return nullptr;
throw std::invalid_argument("DataChannel or WebSocket ID does not exist");
}
template <typename F> int wrap(F func) {
try {
return func();
} catch (const std::invalid_argument &e) {
PLOG_ERROR << e.what();
return RTC_ERR_INVALID;
} catch (const std::exception &e) {
PLOG_ERROR << e.what();
return RTC_ERR_FAILURE;
}
}
#define WRAP(statement) \
wrap([&]() { \
statement; \
return RTC_ERR_SUCCESS; \
})
} // namespace
void rtcInitLogger(rtcLogLevel level) { InitLogger(static_cast<LogLevel>(level)); }
@ -152,370 +167,349 @@ void rtcInitLogger(rtcLogLevel level) { InitLogger(static_cast<LogLevel>(level))
void rtcSetUserPointer(int i, void *ptr) { setUserPointer(i, ptr); }
int rtcCreatePeerConnection(const rtcConfiguration *config) {
Configuration c;
for (int i = 0; i < config->iceServersCount; ++i)
c.iceServers.emplace_back(string(config->iceServers[i]));
return WRAP({
Configuration c;
for (int i = 0; i < config->iceServersCount; ++i)
c.iceServers.emplace_back(string(config->iceServers[i]));
if (config->portRangeBegin || config->portRangeEnd) {
c.portRangeBegin = config->portRangeBegin;
c.portRangeEnd = config->portRangeEnd;
}
if (config->portRangeBegin || config->portRangeEnd) {
c.portRangeBegin = config->portRangeBegin;
c.portRangeEnd = config->portRangeEnd;
}
return emplacePeerConnection(std::make_shared<PeerConnection>(c));
return emplacePeerConnection(std::make_shared<PeerConnection>(c));
});
}
int rtcDeletePeerConnection(int pc) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
return WRAP({
auto peerConnection = getPeerConnection(pc);
peerConnection->onDataChannel(nullptr);
peerConnection->onLocalDescription(nullptr);
peerConnection->onLocalCandidate(nullptr);
peerConnection->onStateChange(nullptr);
peerConnection->onGatheringStateChange(nullptr);
peerConnection->onDataChannel(nullptr);
peerConnection->onLocalDescription(nullptr);
peerConnection->onLocalCandidate(nullptr);
peerConnection->onStateChange(nullptr);
peerConnection->onGatheringStateChange(nullptr);
erasePeerConnection(pc);
return 0;
erasePeerConnection(pc);
});
}
int rtcCreateDataChannel(int pc, const char *label) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
int dc = emplaceDataChannel(peerConnection->createDataChannel(string(label)));
void *ptr = getUserPointer(pc);
rtcSetUserPointer(dc, ptr);
return dc;
return WRAP({
auto peerConnection = getPeerConnection(pc);
int dc = emplaceDataChannel(peerConnection->createDataChannel(string(label)));
rtcSetUserPointer(dc, getUserPointer(pc));
return dc;
});
}
int rtcDeleteDataChannel(int dc) {
auto dataChannel = getDataChannel(dc);
if (!dataChannel)
return -1;
return WRAP({
auto dataChannel = getDataChannel(dc);
dataChannel->onOpen(nullptr);
dataChannel->onClosed(nullptr);
dataChannel->onError(nullptr);
dataChannel->onMessage(nullptr);
dataChannel->onBufferedAmountLow(nullptr);
dataChannel->onAvailable(nullptr);
dataChannel->onOpen(nullptr);
dataChannel->onClosed(nullptr);
dataChannel->onError(nullptr);
dataChannel->onMessage(nullptr);
dataChannel->onBufferedAmountLow(nullptr);
dataChannel->onAvailable(nullptr);
eraseDataChannel(dc);
return 0;
eraseDataChannel(dc);
});
}
#if RTC_ENABLE_WEBSOCKET
int rtcCreateWebSocket(const char *url) {
auto ws = std::make_shared<WebSocket>();
ws->open(url);
return emplaceWebSocket(ws);
}
int rtcDeleteWebsocket(int ws) {
auto webSocket = getWebSocket(ws);
if (!webSocket)
return -1;
webSocket->onOpen(nullptr);
webSocket->onClosed(nullptr);
webSocket->onError(nullptr);
webSocket->onMessage(nullptr);
webSocket->onBufferedAmountLow(nullptr);
webSocket->onAvailable(nullptr);
eraseWebSocket(ws);
return 0;
}
#endif
int rtcSetDataChannelCallback(int pc, dataChannelCallbackFunc cb) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
if (cb)
peerConnection->onDataChannel([pc, cb](std::shared_ptr<DataChannel> dataChannel) {
int dc = emplaceDataChannel(dataChannel);
void *ptr = getUserPointer(pc);
rtcSetUserPointer(dc, ptr);
cb(dc, ptr);
});
else
peerConnection->onDataChannel(nullptr);
return 0;
}
int rtcSetLocalDescriptionCallback(int pc, descriptionCallbackFunc cb) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
if (cb)
peerConnection->onLocalDescription([pc, cb](const Description &desc) {
cb(string(desc).c_str(), desc.typeString().c_str(), getUserPointer(pc));
});
else
peerConnection->onLocalDescription(nullptr);
return 0;
}
int rtcSetLocalCandidateCallback(int pc, candidateCallbackFunc cb) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
if (cb)
peerConnection->onLocalCandidate([pc, cb](const Candidate &cand) {
cb(cand.candidate().c_str(), cand.mid().c_str(), getUserPointer(pc));
});
else
peerConnection->onLocalCandidate(nullptr);
return 0;
}
int rtcSetStateChangeCallback(int pc, stateChangeCallbackFunc cb) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
if (cb)
peerConnection->onStateChange([pc, cb](PeerConnection::State state) {
cb(static_cast<rtcState>(state), getUserPointer(pc));
});
else
peerConnection->onStateChange(nullptr);
return 0;
}
int rtcSetGatheringStateChangeCallback(int pc, gatheringStateCallbackFunc cb) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
if (cb)
peerConnection->onGatheringStateChange([pc, cb](PeerConnection::GatheringState state) {
cb(static_cast<rtcGatheringState>(state), getUserPointer(pc));
});
else
peerConnection->onGatheringStateChange(nullptr);
return 0;
}
int rtcSetRemoteDescription(int pc, const char *sdp, const char *type) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
CATCH(peerConnection->setRemoteDescription({string(sdp), type ? string(type) : ""}));
return 0;
}
int rtcAddRemoteCandidate(int pc, const char *cand, const char *mid) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
CATCH(peerConnection->addRemoteCandidate({string(cand), mid ? string(mid) : ""}))
return 0;
}
int rtcGetLocalAddress(int pc, char *buffer, int size) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
if (auto addr = peerConnection->localAddress()) {
size = std::min(size_t(size - 1), addr->size());
std::copy(addr->data(), addr->data() + size, buffer);
buffer[size] = '\0';
return size + 1;
}
return -1;
}
int rtcGetRemoteAddress(int pc, char *buffer, int size) {
auto peerConnection = getPeerConnection(pc);
if (!peerConnection)
return -1;
if (auto addr = peerConnection->remoteAddress()) {
size = std::min(size_t(size - 1), addr->size());
std::copy(addr->data(), addr->data() + size, buffer);
buffer[size] = '\0';
return size + 1;
}
return -1;
}
int rtcGetDataChannelLabel(int dc, char *buffer, int size) {
auto dataChannel = getDataChannel(dc);
if (!dataChannel)
return -1;
if (!size)
return 0;
string label = dataChannel->label();
size = std::min(size_t(size - 1), label.size());
std::copy(label.data(), label.data() + size, buffer);
buffer[size] = '\0';
return size + 1;
}
int rtcSetOpenCallback(int id, openCallbackFunc cb) {
auto channel = getChannel(id);
if (!channel)
return -1;
if (cb)
channel->onOpen([id, cb]() { cb(getUserPointer(id)); });
else
channel->onOpen(nullptr);
return 0;
}
int rtcSetClosedCallback(int id, closedCallbackFunc cb) {
auto channel = getChannel(id);
if (!channel)
return -1;
if (cb)
channel->onClosed([id, cb]() { cb(getUserPointer(id)); });
else
channel->onClosed(nullptr);
return 0;
}
int rtcSetErrorCallback(int id, errorCallbackFunc cb) {
auto channel = getChannel(id);
if (!channel)
return -1;
if (cb)
channel->onError([id, cb](const string &error) { cb(error.c_str(), getUserPointer(id)); });
else
channel->onError(nullptr);
return 0;
}
int rtcSetMessageCallback(int id, messageCallbackFunc cb) {
auto channel = getChannel(id);
if (!channel)
return -1;
if (cb)
channel->onMessage(
[id, cb](const binary &b) {
cb(reinterpret_cast<const char *>(b.data()), b.size(), getUserPointer(id));
},
[id, cb](const string &s) { cb(s.c_str(), -1, getUserPointer(id)); });
else
channel->onMessage(nullptr);
return 0;
}
int rtcSendMessage(int id, const char *data, int size) {
auto channel = getChannel(id);
if (!channel)
return -1;
if (size >= 0) {
auto b = reinterpret_cast<const byte *>(data);
CATCH(channel->send(binary(b, b + size)));
return size;
} else {
string str(data);
int len = str.size();
CATCH(channel->send(std::move(str)));
return len;
}
}
int rtcGetBufferedAmount(int id) {
auto channel = getChannel(id);
if (!channel)
return -1;
CATCH(return int(channel->bufferedAmount()));
}
int rtcSetBufferedAmountLowThreshold(int id, int amount) {
auto channel = getChannel(id);
if (!channel)
return -1;
CATCH(channel->setBufferedAmountLowThreshold(size_t(amount)));
return 0;
}
int rtcSetBufferedAmountLowCallback(int id, bufferedAmountLowCallbackFunc cb) {
auto channel = getChannel(id);
if (!channel)
return -1;
if (cb)
channel->onBufferedAmountLow([id, cb]() { cb(getUserPointer(id)); });
else
channel->onBufferedAmountLow(nullptr);
return 0;
}
int rtcGetAvailableAmount(int id) {
auto channel = getChannel(id);
if (!channel)
return -1;
CATCH(return int(channel->availableAmount()));
}
int rtcSetAvailableCallback(int id, availableCallbackFunc cb) {
auto channel = getChannel(id);
if (!channel)
return -1;
if (cb)
channel->onOpen([id, cb]() { cb(getUserPointer(id)); });
else
channel->onOpen(nullptr);
return 0;
}
int rtcReceiveMessage(int id, char *buffer, int *size) {
auto channel = getChannel(id);
if (!channel)
return -1;
if (!size)
return -1;
CATCH({
auto message = channel->receive();
if (!message)
return 0;
return std::visit( //
overloaded{ //
[&](const binary &b) {
*size = std::min(*size, int(b.size()));
auto data = reinterpret_cast<const char *>(b.data());
std::copy(data, data + *size, buffer);
return *size;
},
[&](const string &s) {
int len = std::min(*size - 1, int(s.size()));
if (len >= 0) {
std::copy(s.data(), s.data() + len, buffer);
buffer[len] = '\0';
}
*size = -(len + 1);
return len + 1;
}},
*message);
return WRAP({
auto ws = std::make_shared<WebSocket>();
ws->open(url);
return emplaceWebSocket(ws);
});
}
int rtcDeleteWebsocket(int ws) {
return WRAP({
auto webSocket = getWebSocket(ws);
webSocket->onOpen(nullptr);
webSocket->onClosed(nullptr);
webSocket->onError(nullptr);
webSocket->onMessage(nullptr);
webSocket->onBufferedAmountLow(nullptr);
webSocket->onAvailable(nullptr);
eraseWebSocket(ws);
});
}
#endif
int rtcSetDataChannelCallback(int pc, dataChannelCallbackFunc cb) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onDataChannel([pc, cb](std::shared_ptr<DataChannel> dataChannel) {
int dc = emplaceDataChannel(dataChannel);
void *ptr = getUserPointer(pc);
rtcSetUserPointer(dc, ptr);
cb(dc, ptr);
});
else
peerConnection->onDataChannel(nullptr);
});
}
int rtcSetLocalDescriptionCallback(int pc, descriptionCallbackFunc cb) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onLocalDescription([pc, cb](const Description &desc) {
cb(string(desc).c_str(), desc.typeString().c_str(), getUserPointer(pc));
});
else
peerConnection->onLocalDescription(nullptr);
});
}
int rtcSetLocalCandidateCallback(int pc, candidateCallbackFunc cb) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onLocalCandidate([pc, cb](const Candidate &cand) {
cb(cand.candidate().c_str(), cand.mid().c_str(), getUserPointer(pc));
});
else
peerConnection->onLocalCandidate(nullptr);
});
}
int rtcSetStateChangeCallback(int pc, stateChangeCallbackFunc cb) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onStateChange([pc, cb](PeerConnection::State state) {
cb(static_cast<rtcState>(state), getUserPointer(pc));
});
else
peerConnection->onStateChange(nullptr);
});
}
int rtcSetGatheringStateChangeCallback(int pc, gatheringStateCallbackFunc cb) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onGatheringStateChange([pc, cb](PeerConnection::GatheringState state) {
cb(static_cast<rtcGatheringState>(state), getUserPointer(pc));
});
else
peerConnection->onGatheringStateChange(nullptr);
});
}
int rtcSetRemoteDescription(int pc, const char *sdp, const char *type) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (!sdp)
throw std::invalid_argument("Unexpected null pointer");
peerConnection->setRemoteDescription({string(sdp), type ? string(type) : ""});
});
}
int rtcAddRemoteCandidate(int pc, const char *cand, const char *mid) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (!cand)
throw std::invalid_argument("Unexpected null pointer");
peerConnection->addRemoteCandidate({string(cand), mid ? string(mid) : ""});
});
}
int rtcGetLocalAddress(int pc, char *buffer, int size) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (!buffer)
throw std::invalid_argument("Unexpected null pointer");
if (auto addr = peerConnection->localAddress()) {
size = std::min(size_t(size - 1), addr->size());
std::copy(addr->data(), addr->data() + size, buffer);
buffer[size] = '\0';
return size + 1;
}
});
}
int rtcGetRemoteAddress(int pc, char *buffer, int size) {
return WRAP({
auto peerConnection = getPeerConnection(pc);
if (!buffer)
throw std::invalid_argument("Unexpected null pointer");
if (auto addr = peerConnection->remoteAddress()) {
size = std::min(size_t(size - 1), addr->size());
std::copy(addr->data(), addr->data() + size, buffer);
buffer[size] = '\0';
return size + 1;
}
});
}
int rtcGetDataChannelLabel(int dc, char *buffer, int size) {
return WRAP({
auto dataChannel = getDataChannel(dc);
if (!buffer)
throw std::invalid_argument("Unexpected null pointer");
if (size >= 0) {
string label = dataChannel->label();
size = std::min(size_t(size - 1), label.size());
std::copy(label.data(), label.data() + size, buffer);
buffer[size] = '\0';
return size + 1;
} else {
return 0;
}
});
}
int rtcSetOpenCallback(int id, openCallbackFunc cb) {
return WRAP({
auto channel = getChannel(id);
if (cb)
channel->onOpen([id, cb]() { cb(getUserPointer(id)); });
else
channel->onOpen(nullptr);
});
}
int rtcSetClosedCallback(int id, closedCallbackFunc cb) {
return WRAP({
auto channel = getChannel(id);
if (cb)
channel->onClosed([id, cb]() { cb(getUserPointer(id)); });
else
channel->onClosed(nullptr);
});
}
int rtcSetErrorCallback(int id, errorCallbackFunc cb) {
return WRAP({
auto channel = getChannel(id);
if (cb)
channel->onError(
[id, cb](const string &error) { cb(error.c_str(), getUserPointer(id)); });
else
channel->onError(nullptr);
});
}
int rtcSetMessageCallback(int id, messageCallbackFunc cb) {
return WRAP({
auto channel = getChannel(id);
if (cb)
channel->onMessage(
[id, cb](const binary &b) {
cb(reinterpret_cast<const char *>(b.data()), b.size(), getUserPointer(id));
},
[id, cb](const string &s) { cb(s.c_str(), -1, getUserPointer(id)); });
else
channel->onMessage(nullptr);
});
}
int rtcSendMessage(int id, const char *data, int size) {
return WRAP({
auto channel = getChannel(id);
if (!data)
throw std::invalid_argument("Unexpected null pointer");
if (size >= 0) {
auto b = reinterpret_cast<const byte *>(data);
channel->send(binary(b, b + size));
return size;
} else {
string str(data);
int len = str.size();
channel->send(std::move(str));
return len;
}
});
}
int rtcGetBufferedAmount(int id) {
return WRAP({
auto channel = getChannel(id);
return int(channel->bufferedAmount());
});
}
int rtcSetBufferedAmountLowThreshold(int id, int amount) {
return WRAP({
auto channel = getChannel(id);
channel->setBufferedAmountLowThreshold(size_t(amount));
});
}
int rtcSetBufferedAmountLowCallback(int id, bufferedAmountLowCallbackFunc cb) {
return WRAP({
auto channel = getChannel(id);
if (cb)
channel->onBufferedAmountLow([id, cb]() { cb(getUserPointer(id)); });
else
channel->onBufferedAmountLow(nullptr);
});
}
int rtcGetAvailableAmount(int id) {
return WRAP({ return int(getChannel(id)->availableAmount()); });
}
int rtcSetAvailableCallback(int id, availableCallbackFunc cb) {
return WRAP({
auto channel = getChannel(id);
if (cb)
channel->onOpen([id, cb]() { cb(getUserPointer(id)); });
else
channel->onOpen(nullptr);
});
}
int rtcReceiveMessage(int id, char *buffer, int *size) {
return WRAP({
auto channel = getChannel(id);
if (!buffer || !size)
throw std::invalid_argument("Unexpected null pointer");
if (auto message = channel->receive())
return std::visit( //
overloaded{ //
[&](const binary &b) {
*size = std::min(*size, int(b.size()));
auto data = reinterpret_cast<const char *>(b.data());
std::copy(data, data + *size, buffer);
return 1;
},
[&](const string &s) {
int len = std::min(*size - 1, int(s.size()));
if (len >= 0) {
std::copy(s.data(), s.data() + len, buffer);
buffer[len] = '\0';
}
*size = -(len + 1);
return 1;
}},
*message);
else
return 0;
});
}
void rtcPreload() { rtc::Preload(); }
void rtcCleanup() { rtc::Cleanup(); }

View File

@ -61,6 +61,20 @@ void SctpTransport::Init() {
usrsctp_sysctl_set_sctp_rto_initial_default(1 * 1000); // ms
usrsctp_sysctl_set_sctp_init_rto_max_default(10 * 1000); // ms
usrsctp_sysctl_set_sctp_heartbeat_interval_default(10 * 1000); // ms
usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
// Change congestion control from the default TCP Reno (RFC 2581) to H-TCP
usrsctp_sysctl_set_sctp_default_cc_module(SCTP_CC_HTCP);
// Enable Non-Renegable Selective Acknowledgments (NR-SACKs)
usrsctp_sysctl_set_sctp_nrsack_enable(1);
// Increase the initial window size to 10 MTUs (RFC 6928)
usrsctp_sysctl_set_sctp_initial_cwnd(10);
// Reduce SACK delay from the default 200ms to 20ms
usrsctp_sysctl_set_sctp_delayed_sack_time_default(20); // ms
}
void SctpTransport::Cleanup() {
@ -158,11 +172,11 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
// The default send and receive window size of usrsctp is 256KiB, which is too small for
// realistic RTTs, therefore we increase it to 1MiB for better performance.
// See https://bugzilla.mozilla.org/show_bug.cgi?id=1051685
int bufSize = 1024 * 1024;
if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &bufSize, sizeof(bufSize)))
int bufferSize = 1024 * 1024;
if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)))
throw std::runtime_error("Could not set SCTP recv buffer size, errno=" +
std::to_string(errno));
if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &bufSize, sizeof(bufSize)))
if (usrsctp_setsockopt(mSock, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)))
throw std::runtime_error("Could not set SCTP send buffer size, errno=" +
std::to_string(errno));
@ -265,7 +279,7 @@ void SctpTransport::incoming(message_ptr message) {
// There could be a race condition here where we receive the remote INIT before the local one is
// sent, which would result in the connection being aborted. Therefore, we need to wait for data
// to be sent on our side (i.e. the local INIT) before proceeding.
{
if (!mWrittenOnce) { // test the atomic boolean is not set first to prevent a lock contention
std::unique_lock lock(mWriteMutex);
mWrittenCondition.wait(lock, [&]() { return mWrittenOnce || state() != State::Connected; });
}
@ -362,18 +376,20 @@ bool SctpTransport::trySendMessage(message_ptr message) {
ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
}
if (ret >= 0) {
PLOG_VERBOSE << "SCTP sent size=" << message->size();
if (message->type == Message::Type::Binary || message->type == Message::Type::String)
mBytesSent += message->size();
return true;
} else if (errno == EWOULDBLOCK || errno == EAGAIN) {
PLOG_VERBOSE << "SCTP sending not possible";
return false;
} else {
if (ret < 0) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
PLOG_VERBOSE << "SCTP sending not possible";
return false;
}
PLOG_ERROR << "SCTP sending failed, errno=" << errno;
throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
}
PLOG_VERBOSE << "SCTP sent size=" << message->size();
if (message->type == Message::Type::Binary || message->type == Message::Type::String)
mBytesSent += message->size();
return true;
}
void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
@ -385,7 +401,13 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
else
it->second = amount;
mBufferedAmountCallback(streamId, amount);
mSendMutex.unlock();
try {
mBufferedAmountCallback(streamId, amount);
} catch (const std::exception &e) {
PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
}
mSendMutex.lock();
}
void SctpTransport::sendReset(uint16_t streamId) {
@ -421,7 +443,7 @@ bool SctpTransport::safeFlush() {
return true;
} catch (const std::exception &e) {
PLOG_ERROR << "SCTP flush: " << e.what();
PLOG_WARNING << "SCTP flush: " << e.what();
return false;
}
}
@ -471,6 +493,7 @@ int SctpTransport::handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_
std::unique_lock lock(mWriteMutex);
if (!outgoing(make_message(data, data + len)))
return -1;
mWritten = true;
mWrittenOnce = true;
mWrittenCondition.notify_all();

View File

@ -97,7 +97,7 @@ private:
std::mutex mWriteMutex;
std::condition_variable mWrittenCondition;
std::atomic<bool> mWritten = false; // written outside lock
bool mWrittenOnce = false;
std::atomic<bool> mWrittenOnce = false; // same
binary mPartialRecv, mPartialStringData, mPartialBinaryData;

View File

@ -59,7 +59,7 @@ int SelectInterrupter::prepare(fd_set &readfds, fd_set &writefds) {
if (mDummySock == INVALID_SOCKET)
mDummySock = ::socket(AF_INET, SOCK_DGRAM, 0);
FD_SET(mDummySock, &readfds);
return SOCK_TO_INT(mDummySock) + 1;
return SOCKET_TO_INT(mDummySock) + 1;
#else
int ret;
do {
@ -226,7 +226,7 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
int error = 0;
socklen_t errorlen = sizeof(error);
if (::getsockopt(mSock, SOL_SOCKET, SO_ERROR, &error, &errorlen) != 0)
if (::getsockopt(mSock, SOL_SOCKET, SO_ERROR, (char *)&error, &errorlen) != 0)
throw std::runtime_error("Failed to get socket error code");
if (error != 0) {
@ -271,7 +271,7 @@ bool TcpTransport::trySendMessage(message_ptr &message) {
auto data = reinterpret_cast<const char *>(message->data());
auto size = message->size();
while (size) {
#ifdef __APPLE__
#if defined(__APPLE__) or defined(_WIN32)
int flags = 0;
#else
int flags = MSG_NOSIGNAL;

View File

@ -26,7 +26,6 @@
#if RTC_ENABLE_WEBSOCKET
#include <mutex>
#include <thread>
namespace rtc {

199
test/benchmark.cpp Normal file
View File

@ -0,0 +1,199 @@
/**
* Copyright (c) 2019 Paul-Louis Ageneau
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "rtc/rtc.hpp"
#include <atomic>
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>
using namespace rtc;
using namespace std;
using namespace chrono_literals;
using chrono::duration_cast;
using chrono::milliseconds;
using chrono::steady_clock;
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
size_t benchmark(milliseconds duration) {
rtc::InitLogger(LogLevel::Warning);
rtc::Preload();
Configuration config1;
// config1.iceServers.emplace_back("stun:stun.l.google.com:19302");
auto pc1 = std::make_shared<PeerConnection>(config1);
Configuration config2;
// config2.iceServers.emplace_back("stun:stun.l.google.com:19302");
auto pc2 = std::make_shared<PeerConnection>(config2);
pc1->onLocalDescription([wpc2 = make_weak_ptr(pc2)](const Description &sdp) {
auto pc2 = wpc2.lock();
if (!pc2)
return;
cout << "Description 1: " << sdp << endl;
pc2->setRemoteDescription(sdp);
});
pc1->onLocalCandidate([wpc2 = make_weak_ptr(pc2)](const Candidate &candidate) {
auto pc2 = wpc2.lock();
if (!pc2)
return;
cout << "Candidate 1: " << candidate << endl;
pc2->addRemoteCandidate(candidate);
});
pc1->onStateChange([](PeerConnection::State state) { cout << "State 1: " << state << endl; });
pc1->onGatheringStateChange([](PeerConnection::GatheringState state) {
cout << "Gathering state 1: " << state << endl;
});
pc2->onLocalDescription([wpc1 = make_weak_ptr(pc1)](const Description &sdp) {
auto pc1 = wpc1.lock();
if (!pc1)
return;
cout << "Description 2: " << sdp << endl;
pc1->setRemoteDescription(sdp);
});
pc2->onLocalCandidate([wpc1 = make_weak_ptr(pc1)](const Candidate &candidate) {
auto pc1 = wpc1.lock();
if (!pc1)
return;
cout << "Candidate 2: " << candidate << endl;
pc1->addRemoteCandidate(candidate);
});
pc2->onStateChange([](PeerConnection::State state) { cout << "State 2: " << state << endl; });
pc2->onGatheringStateChange([](PeerConnection::GatheringState state) {
cout << "Gathering state 2: " << state << endl;
});
const size_t messageSize = 65535;
binary messageData(messageSize);
fill(messageData.begin(), messageData.end(), byte(0xFF));
atomic<size_t> receivedSize = 0;
atomic<bool> finished = false;
steady_clock::time_point startTime, openTime, receivedTime, endTime;
shared_ptr<DataChannel> dc2;
pc2->onDataChannel(
[&dc2, &finished, &receivedSize, &receivedTime, &endTime](shared_ptr<DataChannel> dc) {
dc->onMessage([&receivedTime, &receivedSize](const variant<binary, string> &message) {
if (holds_alternative<binary>(message)) {
const auto &bin = get<binary>(message);
if (receivedSize == 0)
receivedTime = steady_clock::now();
receivedSize += bin.size();
}
});
dc->onClosed([&finished, &endTime]() {
cout << "DataChannel closed." << endl;
endTime = steady_clock::now();
finished = true;
});
std::atomic_store(&dc2, dc);
});
startTime = steady_clock::now();
auto dc1 = pc1->createDataChannel("benchmark");
dc1->onOpen([wdc1 = make_weak_ptr(dc1), &messageData, &openTime]() {
auto dc1 = wdc1.lock();
if (!dc1)
return;
openTime = steady_clock::now();
cout << "DataChannel open, sending data..." << endl;
while (dc1->bufferedAmount() == 0) {
dc1->send(messageData);
}
// When sent data is buffered in the DataChannel,
// wait for onBufferedAmountLow callback to continue
});
dc1->onBufferedAmountLow([wdc1 = make_weak_ptr(dc1), &messageData]() {
auto dc1 = wdc1.lock();
if (!dc1)
return;
// Continue sending
while (dc1->bufferedAmount() == 0) {
dc1->send(messageData);
}
});
const int steps = 10;
const auto stepDuration = duration / 10;
for (int i = 0; i < steps; ++i) {
this_thread::sleep_for(stepDuration);
cout << "Received: " << receivedSize.load() / 1000 << " KB" << endl;
}
if (auto adc2 = std::atomic_load(&dc2)) {
dc1->close();
while (!finished && adc2->isOpen())
this_thread::sleep_for(100ms);
}
auto connectDuration = duration_cast<milliseconds>(openTime - startTime);
auto transferDuration = duration_cast<milliseconds>(endTime - receivedTime);
cout << "Test duration: " << duration.count() << " ms" << endl;
cout << "Connect duration: " << connectDuration.count() << " ms" << endl;
size_t received = receivedSize.load();
size_t goodput = transferDuration.count() > 0 ? received / transferDuration.count() : 0;
cout << "Goodput: " << goodput * 0.001 << " MB/s"
<< " (" << goodput * 0.001 * 8 << " Mbit/s)" << endl;
pc1->close();
pc2->close();
this_thread::sleep_for(1s);
rtc::Cleanup();
return goodput;
}
#ifdef BENCHMARK_MAIN
int main(int argc, char **argv) {
try {
size_t goodput = benchmark(30s);
if (goodput == 0)
throw runtime_error("No data received");
return 0;
} catch (const std::exception &e) {
cerr << "Benchmark failed: " << e.what() << endl;
return -1;
}
}
#endif

View File

@ -18,6 +18,7 @@
#include "rtc/rtc.hpp"
#include <atomic>
#include <chrono>
#include <iostream>
#include <memory>
@ -63,6 +64,7 @@ void test_connectivity() {
});
pc1->onStateChange([](PeerConnection::State state) { cout << "State 1: " << state << endl; });
pc1->onGatheringStateChange([](PeerConnection::GatheringState state) {
cout << "Gathering state 1: " << state << endl;
});
@ -84,6 +86,7 @@ void test_connectivity() {
});
pc2->onStateChange([](PeerConnection::State state) { cout << "State 2: " << state << endl; });
pc2->onGatheringStateChange([](PeerConnection::GatheringState state) {
cout << "Gathering state 2: " << state << endl;
});
@ -91,13 +94,16 @@ void test_connectivity() {
shared_ptr<DataChannel> dc2;
pc2->onDataChannel([&dc2](shared_ptr<DataChannel> dc) {
cout << "DataChannel 2: Received with label \"" << dc->label() << "\"" << endl;
dc2 = dc;
dc2->onMessage([](const variant<binary, string> &message) {
dc->onMessage([](const variant<binary, string> &message) {
if (holds_alternative<string>(message)) {
cout << "Message 2: " << get<string>(message) << endl;
}
});
dc2->send("Hello from 2");
dc->send("Hello from 2");
std::atomic_store(&dc2, dc);
});
auto dc1 = pc1->createDataChannel("test");
@ -105,6 +111,7 @@ void test_connectivity() {
auto dc1 = wdc1.lock();
if (!dc1)
return;
cout << "DataChannel 1: Open" << endl;
dc1->send("Hello from 1");
});
@ -115,14 +122,15 @@ void test_connectivity() {
});
int attempts = 10;
while ((!dc2 || !dc2->isOpen() || !dc1->isOpen()) && attempts--)
shared_ptr<DataChannel> adc2;
while ((!(adc2 = std::atomic_load(&dc2)) || !adc2->isOpen() || !dc1->isOpen()) && attempts--)
this_thread::sleep_for(1s);
if (pc1->state() != PeerConnection::State::Connected &&
pc2->state() != PeerConnection::State::Connected)
throw runtime_error("PeerConnection is not connected");
if (!dc1->isOpen() || !dc2->isOpen())
if (!adc2 || !adc2->isOpen() || !dc1->isOpen())
throw runtime_error("DataChannel is not open");
if (auto addr = pc1->localAddress())

View File

@ -16,13 +16,27 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <chrono>
#include <iostream>
using namespace std;
using namespace chrono_literals;
void test_connectivity();
void test_capi();
void test_websocket();
size_t benchmark(chrono::milliseconds duration);
void test_benchmark() {
size_t goodput = benchmark(10s);
if (goodput == 0)
throw runtime_error("No data received");
const size_t threshold = 1000; // 1 MB/s;
if (goodput < threshold)
throw runtime_error("Goodput is too low");
}
int main(int argc, char **argv) {
try {
@ -51,5 +65,13 @@ int main(int argc, char **argv) {
return -1;
}
#endif
try {
cout << endl << "*** Running WebRTC benchmark..." << endl;
test_benchmark();
cout << "*** Finished WebRTC benchmark" << endl;
} catch (const exception &e) {
cerr << "WebRTC benchmark failed: " << e.what() << endl;
return -1;
}
return 0;
}