mirror of
https://github.com/mii443/libdatachannel.git
synced 2025-08-23 15:48:03 +00:00
Compare commits
63 Commits
Author | SHA1 | Date | |
---|---|---|---|
c93f44f132 | |||
517b69043f | |||
b04db3a744 | |||
36090a24e4 | |||
6b75b3e227 | |||
afed83f5f0 | |||
75c42592bf | |||
b35bfbeb0a | |||
e481e896cb | |||
202467928a | |||
71650ce163 | |||
706a8b7160 | |||
6f419a32ea | |||
f5ff042d62 | |||
822b2e6558 | |||
5b251af1d7 | |||
5b56291b67 | |||
777f5a8dfe | |||
971e6e8b91 | |||
a3fb52c173 | |||
db00253c18 | |||
dd2967b0e1 | |||
cc4e215067 | |||
bbeed01eb0 | |||
aecc2b8fda | |||
d60e18d963 | |||
e41019a1f0 | |||
5825e44fc8 | |||
dc9a8114bc | |||
3e827f9798 | |||
0a6b263bc3 | |||
e02c30027b | |||
c4380ebcc4 | |||
add0649335 | |||
cd28340de3 | |||
c675aedb83 | |||
e32d139056 | |||
a790161168 | |||
2697ef0d76 | |||
5044aedbec | |||
44c90c1cb4 | |||
be79c68540 | |||
226a927df1 | |||
4e1b9bb3c2 | |||
8bc016cc08 | |||
5afbe10d01 | |||
8df07ca68d | |||
884bd2316e | |||
dadecce709 | |||
103935bdd5 | |||
62e6954949 | |||
3ac2d155cc | |||
6d8788c2a1 | |||
603dd01b87 | |||
8091508428 | |||
b38f63f077 | |||
d87539937e | |||
eb09cadded | |||
79e0c62321 | |||
ef38777129 | |||
313f081061 | |||
6108b05e0d | |||
f68601b45f |
2
.github/workflows/build-openssl.yml
vendored
2
.github/workflows/build-openssl.yml
vendored
@ -43,6 +43,8 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: ilammy/msvc-dev-cmd@v1
|
||||
- name: install packages
|
||||
run: choco install openssl
|
||||
- name: submodules
|
||||
run: git submodule update --init --recursive
|
||||
- name: cmake
|
||||
|
@ -1,12 +1,14 @@
|
||||
cmake_minimum_required(VERSION 3.7)
|
||||
project(libdatachannel
|
||||
DESCRIPTION "WebRTC Data Channels Library"
|
||||
VERSION 0.6.3
|
||||
VERSION 0.6.5
|
||||
LANGUAGES CXX)
|
||||
|
||||
option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
|
||||
option(USE_JUICE "Use libjuice instead of libnice" OFF)
|
||||
option(RTC_ENABLE_WEBSOCKET "Build WebSocket support" ON)
|
||||
option(USE_SRTP "Enable SRTP for media support" OFF)
|
||||
option(NO_WEBSOCKET "Disable WebSocket support" OFF)
|
||||
option(NO_EXAMPLES "Disable examples" OFF)
|
||||
|
||||
if(USE_GNUTLS)
|
||||
option(USE_NETTLE "Use Nettle instead of OpenSSL in libjuice" ON)
|
||||
@ -42,6 +44,8 @@ set(LIBDATACHANNEL_SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
|
||||
)
|
||||
|
||||
set(LIBDATACHANNEL_WEBSOCKET_SOURCES
|
||||
@ -82,7 +86,6 @@ set(TESTS_SOURCES
|
||||
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
|
||||
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
|
||||
find_package(Threads REQUIRED)
|
||||
find_package(SRTP)
|
||||
|
||||
set(CMAKE_POLICY_DEFAULT_CMP0048 NEW)
|
||||
add_subdirectory(deps/plog)
|
||||
@ -99,7 +102,14 @@ endif()
|
||||
add_library(Usrsctp::Usrsctp ALIAS usrsctp)
|
||||
add_library(Usrsctp::UsrsctpStatic ALIAS usrsctp-static)
|
||||
|
||||
if (RTC_ENABLE_WEBSOCKET)
|
||||
if (NO_WEBSOCKET)
|
||||
add_library(datachannel SHARED
|
||||
${LIBDATACHANNEL_SOURCES})
|
||||
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL
|
||||
${LIBDATACHANNEL_SOURCES})
|
||||
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=0)
|
||||
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=0)
|
||||
else()
|
||||
add_library(datachannel SHARED
|
||||
${LIBDATACHANNEL_SOURCES}
|
||||
${LIBDATACHANNEL_WEBSOCKET_SOURCES})
|
||||
@ -108,13 +118,6 @@ if (RTC_ENABLE_WEBSOCKET)
|
||||
${LIBDATACHANNEL_WEBSOCKET_SOURCES})
|
||||
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=1)
|
||||
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=1)
|
||||
else()
|
||||
add_library(datachannel SHARED
|
||||
${LIBDATACHANNEL_SOURCES})
|
||||
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL
|
||||
${LIBDATACHANNEL_SOURCES})
|
||||
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=0)
|
||||
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=0)
|
||||
endif()
|
||||
|
||||
set_target_properties(datachannel PROPERTIES
|
||||
@ -141,7 +144,8 @@ if(WIN32)
|
||||
target_link_libraries(datachannel-static PRIVATE wsock32 ws2_32) # winsock2
|
||||
endif()
|
||||
|
||||
if(SRTP_FOUND)
|
||||
if(USE_SRTP)
|
||||
find_package(SRTP REQUIRED)
|
||||
if(NOT TARGET SRTP::SRTP)
|
||||
add_library(SRTP::SRTP UNKNOWN IMPORTED)
|
||||
set_target_properties(SRTP::SRTP PROPERTIES
|
||||
@ -228,9 +232,11 @@ else()
|
||||
endif()
|
||||
|
||||
# Examples
|
||||
if(NOT NO_EXAMPLES)
|
||||
set(JSON_BuildTests OFF CACHE INTERNAL "")
|
||||
add_subdirectory(deps/json)
|
||||
add_subdirectory(examples/client)
|
||||
add_subdirectory(examples/copy-paste)
|
||||
add_subdirectory(examples/copy-paste-capi)
|
||||
endif()
|
||||
|
||||
|
58
Jamfile
58
Jamfile
@ -25,7 +25,7 @@ lib libdatachannel
|
||||
<library>/libdatachannel//usrsctp
|
||||
<library>/libdatachannel//juice
|
||||
<library>/libdatachannel//plog
|
||||
<gnutls>on:<library>gnutls
|
||||
<gnutls>on:<library>gnutls/<link>shared
|
||||
<gnutls>off:<library>ssl
|
||||
<gnutls>off:<library>crypto
|
||||
: # default build
|
||||
@ -85,20 +85,37 @@ alias juice
|
||||
make libusrsctp.a : : @make_libusrsctp ;
|
||||
make usrsctp.lib : : @make_libusrsctp_msvc ;
|
||||
|
||||
rule make_libusrsctp ( targets * : sources * : properties * )
|
||||
{
|
||||
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
|
||||
VARIANT on $(targets) = $(VARIANT) ;
|
||||
if <gnutls>on in $(properties)
|
||||
{ BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ; }
|
||||
else
|
||||
{ BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ; }
|
||||
}
|
||||
actions make_libusrsctp
|
||||
{
|
||||
(cd $(CWD)/deps/usrsctp && mkdir build && cd build && cmake -DCMAKE_C_FLAGS="-fPIC" .. && make -j2 usrsctp-static)
|
||||
cp $(CWD)/deps/usrsctp/build/usrsctplib/libusrsctp.a $(<)
|
||||
(cd $(CWD)/deps/usrsctp && mkdir $(BUILD_DIR) && cd $(BUILD_DIR) && cmake -DCMAKE_BUILD_TYPE=$(VARIANT) -DCMAKE_C_FLAGS="-fPIC" .. && make -j2 usrsctp-static)
|
||||
cp $(CWD)/deps/usrsctp/$(BUILD_DIR)/usrsctplib/libusrsctp.a $(<)
|
||||
}
|
||||
rule make_libusrsctp_msvc ( targets * : sources * : properties * )
|
||||
{
|
||||
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
|
||||
VARIANT on $(targets) = $(VARIANT) ;
|
||||
if <gnutls>on in $(properties)
|
||||
{ BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ; }
|
||||
else
|
||||
{ BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ; }
|
||||
}
|
||||
actions make_libusrsctp_msvc
|
||||
{
|
||||
SET OLDD=%CD%
|
||||
cd $(CWD)/deps/usrsctp
|
||||
mkdir build
|
||||
cd build
|
||||
mkdir $(BUILD_SIR)
|
||||
cd $(BUILD_DIR)
|
||||
cmake -G "Visual Studio 16 2019" ..
|
||||
cd build
|
||||
msbuild usrsctplib.sln /property:Configuration=Release
|
||||
msbuild usrsctplib.sln /property:Configuration=$(VARIANT)
|
||||
cd %OLDD%
|
||||
cp $(CWD)/deps/usrsctp/build/usrsctplib/Release/usrsctp.lib $(<)
|
||||
}
|
||||
@ -108,11 +125,15 @@ make juice-static.lib : : @make_libjuice_msvc ;
|
||||
|
||||
rule make_libjuice ( targets * : sources * : properties * )
|
||||
{
|
||||
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
|
||||
VARIANT on $(targets) = $(VARIANT) ;
|
||||
if <gnutls>on in $(properties)
|
||||
{
|
||||
BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ;
|
||||
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ;
|
||||
}
|
||||
else {
|
||||
else
|
||||
{
|
||||
local OPENSSL_INCLUDE = [ feature.get-values <openssl-include> : $(properties) ] ;
|
||||
|
||||
if <target-os>darwin in $(properties) && $(OPENSSL_INCLUDE) = ""
|
||||
@ -122,26 +143,29 @@ rule make_libjuice ( targets * : sources * : properties * )
|
||||
OPENSSL_INCLUDE = /usr/local/opt/openssl/include ;
|
||||
}
|
||||
|
||||
BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ;
|
||||
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ;
|
||||
if $(OPENSSL_INCLUDE) != ""
|
||||
{
|
||||
CMAKEOPTS on $(targets) += " -DOPENSSL_ROOT_DIR=$(OPENSSL_INCLUDE)/.." ;
|
||||
}
|
||||
{ CMAKEOPTS on $(targets) += " -DOPENSSL_ROOT_DIR=$(OPENSSL_INCLUDE)/.." ; }
|
||||
}
|
||||
}
|
||||
actions make_libjuice
|
||||
{
|
||||
(cd $(CWD)/deps/libjuice && mkdir build && cd build && cmake -DCMAKE_C_FLAGS="-fPIC" $(CMAKEOPTS) .. && make -j2 juice-static)
|
||||
cp $(CWD)/deps/libjuice/build/libjuice-static.a $(<)
|
||||
(cd $(CWD)/deps/libjuice && mkdir $(BUILD_DIR) && cd $(BUILD_DIR) && cmake -DCMAKE_C_FLAGS="-fPIC" -DCMAKE_BUILD_TYPE=$(VARIANT) $(CMAKEOPTS) .. && make -j2 juice-static)
|
||||
cp $(CWD)/deps/libjuice/$(BUILD_DIR)/libjuice-static.a $(<)
|
||||
}
|
||||
rule make_libjuice_msvc ( targets * : sources * : properties * )
|
||||
{
|
||||
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
|
||||
VARIANT on $(targets) = $(VARIANT) ;
|
||||
if <gnutls>on in $(properties)
|
||||
{
|
||||
BUILD_DIR on $(targets) += "build-gnutls-$(VARIANT)" ;
|
||||
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ;
|
||||
}
|
||||
else
|
||||
{
|
||||
BUILD_DIR on $(targets) += "build-openssl-$(VARIANT)" ;
|
||||
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ;
|
||||
}
|
||||
}
|
||||
@ -149,12 +173,12 @@ actions make_libjuice_msvc
|
||||
{
|
||||
SET OLDD=%CD%
|
||||
cd $(CWD)/deps/libjuice
|
||||
mkdir build
|
||||
cd build
|
||||
mkdir $(BUILD_DIR)
|
||||
cd $(BUILD_DIR)
|
||||
cmake -G "Visual Studio 16 2019" $(CMAKEOPTS) ..
|
||||
msbuild libjuice.sln /property:Configuration=Release
|
||||
msbuild libjuice.sln /property:Configuration=$(VARIANT)
|
||||
cd %OLDD%
|
||||
cp $(CWD)/deps/libjuice/build/Release/juice-static.lib $(<)
|
||||
cp $(CWD)/deps/libjuice/$(BUILD_DIR)/Release/juice-static.lib $(<)
|
||||
}
|
||||
|
||||
# the search path to pick up the openssl libraries from. This is the <search>
|
||||
|
10
Makefile
10
Makefile
@ -38,22 +38,22 @@ else
|
||||
LIBS+=glib-2.0 gobject-2.0 nice
|
||||
endif
|
||||
|
||||
RTC_ENABLE_MEDIA ?= 0
|
||||
ifneq ($(RTC_ENABLE_MEDIA), 0)
|
||||
USE_SRTP ?= 0
|
||||
ifneq ($(USE_SRTP), 0)
|
||||
CPPFLAGS+=-DRTC_ENABLE_MEDIA=1
|
||||
LIBS+=srtp
|
||||
else
|
||||
CPPFLAGS+=-DRTC_ENABLE_MEDIA=0
|
||||
endif
|
||||
|
||||
RTC_ENABLE_WEBSOCKET ?= 1
|
||||
ifneq ($(RTC_ENABLE_WEBSOCKET), 0)
|
||||
|
||||
NO_WEBSOCKET ?= 0
|
||||
ifeq ($(NO_WEBSOCKET), 0)
|
||||
CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=1
|
||||
else
|
||||
CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=0
|
||||
endif
|
||||
|
||||
|
||||
INCLUDES+=$(shell pkg-config --cflags $(LIBS))
|
||||
LDLIBS+=$(LOCALLIBS) $(shell pkg-config --libs $(LIBS))
|
||||
|
||||
|
2
deps/libjuice
vendored
2
deps/libjuice
vendored
Submodule deps/libjuice updated: c6566d3c6f...92a2ed7d44
2
deps/usrsctp
vendored
2
deps/usrsctp
vendored
Submodule deps/usrsctp updated: fdc4d2b99b...ffed0925f2
@ -49,6 +49,7 @@ public:
|
||||
bool ended() const;
|
||||
|
||||
void hintType(Type type);
|
||||
void setDataMid(string mid);
|
||||
void setFingerprint(string fingerprint);
|
||||
void setSctpPort(uint16_t port);
|
||||
void setMaxMessageSize(size_t size);
|
||||
@ -86,7 +87,7 @@ private:
|
||||
string mid;
|
||||
std::vector<string> attributes;
|
||||
};
|
||||
std::map<string, Media> mMedia; // by mid
|
||||
std::map<int, Media> mMedia; // by m-line index
|
||||
|
||||
// Candidates
|
||||
std::vector<Candidate> mCandidates;
|
||||
|
@ -64,6 +64,8 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
|
||||
const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536; // Remote max message size if not specified in SDP
|
||||
const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
|
||||
|
||||
const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
|
||||
|
||||
// overloaded helper
|
||||
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
|
||||
template <class... Ts> overloaded(Ts...)->overloaded<Ts...>;
|
||||
|
@ -25,8 +25,7 @@
|
||||
|
||||
namespace rtc {
|
||||
|
||||
class Init;
|
||||
using init_token = std::shared_ptr<Init>;
|
||||
using init_token = std::shared_ptr<void>;
|
||||
|
||||
class Init {
|
||||
public:
|
||||
@ -39,9 +38,10 @@ public:
|
||||
private:
|
||||
Init();
|
||||
|
||||
static std::weak_ptr<Init> Weak;
|
||||
static init_token Global;
|
||||
static std::mutex Mutex;
|
||||
static std::weak_ptr<void> Weak;
|
||||
static std::shared_ptr<void> *Global;
|
||||
static bool Initialized;
|
||||
static std::recursive_mutex Mutex;
|
||||
};
|
||||
|
||||
inline void Preload() { Init::Preload(); }
|
||||
|
@ -41,6 +41,7 @@
|
||||
namespace rtc {
|
||||
|
||||
class Certificate;
|
||||
class Processor;
|
||||
class IceTransport;
|
||||
class DtlsTransport;
|
||||
class SctpTransport;
|
||||
@ -101,7 +102,7 @@ public:
|
||||
// Media
|
||||
bool hasMedia() const;
|
||||
void sendMedia(const binary &packet);
|
||||
void send(const byte *packet, size_t size);
|
||||
void sendMedia(const byte *packet, size_t size);
|
||||
|
||||
void onMedia(std::function<void(const binary &packet)> callback);
|
||||
|
||||
@ -139,10 +140,10 @@ private:
|
||||
|
||||
void outgoingMedia(message_ptr message);
|
||||
|
||||
const init_token mInitToken = Init::Token();
|
||||
const Configuration mConfig;
|
||||
const future_certificate_ptr mCertificate;
|
||||
|
||||
init_token mInitToken = Init::Token();
|
||||
const std::unique_ptr<Processor> mProcessor;
|
||||
|
||||
std::optional<Description> mLocalDescription, mRemoteDescription;
|
||||
mutable std::mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;
|
||||
|
@ -28,6 +28,7 @@
|
||||
#else
|
||||
#include <netdb.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#endif
|
||||
|
||||
#include <sys/types.h>
|
||||
@ -107,8 +108,9 @@ bool Candidate::resolve(ResolveMode mode) {
|
||||
oss << sp << nodebuffer << sp << servbuffer << sp << "typ" << sp << type;
|
||||
oss << left;
|
||||
mCandidate = oss.str();
|
||||
mIsResolved = true;
|
||||
PLOG_VERBOSE << "Resolved candidate: " << mCandidate;
|
||||
return mIsResolved = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -116,7 +118,7 @@ bool Candidate::resolve(ResolveMode mode) {
|
||||
freeaddrinfo(result);
|
||||
}
|
||||
|
||||
return false;
|
||||
return mIsResolved;
|
||||
}
|
||||
|
||||
bool Candidate::isResolved() const { return mIsResolved; }
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
|
||||
#include "certificate.hpp"
|
||||
#include "threadpool.hpp"
|
||||
|
||||
#include <cassert>
|
||||
#include <chrono>
|
||||
@ -201,8 +202,8 @@ certificate_ptr make_certificate_impl(string commonName) {
|
||||
auto *commonNameBytes =
|
||||
reinterpret_cast<unsigned char *>(const_cast<char *>(commonName.c_str()));
|
||||
|
||||
if (!X509_gmtime_adj(X509_get_notBefore(x509.get()), 3600 * -1) ||
|
||||
!X509_gmtime_adj(X509_get_notAfter(x509.get()), 3600 * 24 * 365) ||
|
||||
if (!X509_gmtime_adj(X509_getm_notBefore(x509.get()), 3600 * -1) ||
|
||||
!X509_gmtime_adj(X509_getm_notAfter(x509.get()), 3600 * 24 * 365) ||
|
||||
!X509_set_version(x509.get(), 1) || !X509_set_pubkey(x509.get(), pkey.get()) ||
|
||||
!BN_pseudo_rand(serial_number.get(), serialSize, 0, 0) ||
|
||||
!BN_to_ASN1_INTEGER(serial_number.get(), X509_get_serialNumber(x509.get())) ||
|
||||
@ -230,19 +231,6 @@ namespace rtc {
|
||||
|
||||
namespace {
|
||||
|
||||
// Helper function roughly equivalent to std::async with policy std::launch::async
|
||||
// since std::async might be unreliable on some platforms (e.g. Mingw32 on Windows)
|
||||
template <class F, class... Args>
|
||||
std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>>
|
||||
thread_call(F &&f, Args &&... args) {
|
||||
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
|
||||
std::packaged_task<R()> task(std::bind(f, std::forward<Args>(args)...));
|
||||
std::future<R> future = task.get_future();
|
||||
std::thread t(std::move(task));
|
||||
t.detach();
|
||||
return future;
|
||||
}
|
||||
|
||||
static std::unordered_map<string, future_certificate_ptr> CertificateCache;
|
||||
static std::mutex CertificateCacheMutex;
|
||||
|
||||
@ -254,7 +242,7 @@ future_certificate_ptr make_certificate(string commonName) {
|
||||
if (auto it = CertificateCache.find(commonName); it != CertificateCache.end())
|
||||
return it->second;
|
||||
|
||||
auto future = thread_call(make_certificate_impl, commonName);
|
||||
auto future = ThreadPool::Instance().enqueue(make_certificate_impl, commonName);
|
||||
auto shared = future.share();
|
||||
CertificateCache.emplace(std::move(commonName), shared);
|
||||
return shared;
|
||||
|
@ -96,7 +96,7 @@ void DataChannel::close() {
|
||||
mIsClosed = true;
|
||||
if (mIsOpen.exchange(false))
|
||||
if (auto transport = mSctpTransport.lock())
|
||||
transport->close(mStream);
|
||||
transport->closeStream(mStream);
|
||||
|
||||
mSctpTransport.reset();
|
||||
resetCallbacks();
|
||||
|
@ -63,6 +63,7 @@ Description::Description(const string &sdp, Type type, Role role)
|
||||
std::istringstream ss(sdp);
|
||||
std::optional<Media> currentMedia;
|
||||
|
||||
int mlineIndex = 0;
|
||||
bool finished;
|
||||
do {
|
||||
string line;
|
||||
@ -76,7 +77,9 @@ Description::Description(const string &sdp, Type type, Role role)
|
||||
if (currentMedia->type == "application")
|
||||
mData.mid = currentMedia->mid;
|
||||
else
|
||||
mMedia.emplace(currentMedia->mid, std::move(*currentMedia));
|
||||
mMedia.emplace(mlineIndex, std::move(*currentMedia));
|
||||
|
||||
++mlineIndex;
|
||||
|
||||
} else if (line.find(" ICE/SDP") != string::npos) {
|
||||
PLOG_WARNING << "SDP \"m=\" line has no corresponding mid, ignoring";
|
||||
@ -163,6 +166,8 @@ void Description::hintType(Type type) {
|
||||
}
|
||||
}
|
||||
|
||||
void Description::setDataMid(string mid) { mData.mid = mid; }
|
||||
|
||||
void Description::setFingerprint(string fingerprint) {
|
||||
mFingerprint.emplace(std::move(fingerprint));
|
||||
}
|
||||
@ -188,10 +193,7 @@ bool Description::hasMedia() const { return !mMedia.empty(); }
|
||||
|
||||
void Description::addMedia(const Description &source) {
|
||||
for (auto p : source.mMedia)
|
||||
if (p.first != mData.mid)
|
||||
mMedia.emplace(std::move(p));
|
||||
else
|
||||
PLOG_WARNING << "Media mid \"" << p.first << "\" is the same as data mid, ignoring";
|
||||
mMedia.emplace(p);
|
||||
}
|
||||
|
||||
Description::operator string() const { return generateSdp("\r\n"); }
|
||||
@ -212,13 +214,40 @@ string Description::generateSdp(const string &eol) const {
|
||||
// see Negotiating Media Multiplexing Using the Session Description Protocol
|
||||
// https://tools.ietf.org/html/draft-ietf-mmusic-sdp-bundle-negotiation-54
|
||||
sdp << "a=group:BUNDLE";
|
||||
for (const auto &m : mMedia)
|
||||
sdp << " " << m.first; // mid
|
||||
sdp << " " << mData.mid << eol;
|
||||
for (int i = 0; i < int(mMedia.size() + 1); ++i)
|
||||
if (auto it = mMedia.find(i); it != mMedia.end())
|
||||
sdp << ' ' << it->second.mid;
|
||||
else
|
||||
sdp << ' ' << mData.mid;
|
||||
sdp << eol;
|
||||
|
||||
sdp << "a=msid-semantic: WMS" << eol;
|
||||
|
||||
// Non-data media
|
||||
if (!mMedia.empty()) {
|
||||
// Lip-sync
|
||||
sdp << "a=group:LS";
|
||||
for (const auto &p : mMedia)
|
||||
sdp << " " << p.second.mid;
|
||||
sdp << eol;
|
||||
}
|
||||
|
||||
// Descriptions and attributes
|
||||
for (int i = 0; i < int(mMedia.size() + 1); ++i) {
|
||||
if (auto it = mMedia.find(i); it != mMedia.end()) {
|
||||
// Non-data media
|
||||
const auto &media = it->second;
|
||||
sdp << "m=" << media.type << ' ' << 0 << ' ' << media.description << eol;
|
||||
sdp << "c=IN IP4 0.0.0.0" << eol;
|
||||
sdp << "a=bundle-only" << eol;
|
||||
sdp << "a=mid:" << media.mid << eol;
|
||||
for (const auto &attr : media.attributes)
|
||||
sdp << "a=" << attr << eol;
|
||||
|
||||
} else {
|
||||
// Data
|
||||
const string dataDescription = "UDP/DTLS/SCTP webrtc-datachannel";
|
||||
sdp << "m=application" << ' ' << (!mMedia.empty() ? 0 : 9) << ' ' << dataDescription << eol;
|
||||
const string description = "UDP/DTLS/SCTP webrtc-datachannel";
|
||||
sdp << "m=application" << ' ' << (!mMedia.empty() ? 0 : 9) << ' ' << description << eol;
|
||||
sdp << "c=IN IP4 0.0.0.0" << eol;
|
||||
if (!mMedia.empty())
|
||||
sdp << "a=bundle-only" << eol;
|
||||
@ -227,33 +256,18 @@ string Description::generateSdp(const string &eol) const {
|
||||
sdp << "a=sctp-port:" << *mData.sctpPort << eol;
|
||||
if (mData.maxMessageSize)
|
||||
sdp << "a=max-message-size:" << *mData.maxMessageSize << eol;
|
||||
|
||||
// Non-data media
|
||||
if (!mMedia.empty()) {
|
||||
// Lip-sync
|
||||
sdp << "a=group:LS";
|
||||
for (const auto &m : mMedia)
|
||||
sdp << " " << m.first; // mid
|
||||
sdp << eol;
|
||||
|
||||
// Descriptions and attributes
|
||||
for (const auto &m : mMedia) {
|
||||
const auto &media = m.second;
|
||||
sdp << "m=" << media.type << ' ' << 0 << ' ' << media.description << eol;
|
||||
sdp << "c=IN IP4 0.0.0.0" << eol;
|
||||
sdp << "a=bundle-only" << eol;
|
||||
sdp << "a=mid:" << media.mid << eol;
|
||||
for (const auto &attr : media.attributes)
|
||||
sdp << "a=" << attr << eol;
|
||||
}
|
||||
}
|
||||
|
||||
// Common
|
||||
if (!mEnded)
|
||||
sdp << "a=ice-options:trickle" << eol;
|
||||
|
||||
sdp << "a=ice-ufrag:" << mIceUfrag << eol;
|
||||
sdp << "a=ice-pwd:" << mIcePwd << eol;
|
||||
sdp << "a=setup:" << roleToString(mRole) << eol;
|
||||
sdp << "a=tls-id:1" << eol;
|
||||
|
||||
if (mFingerprint)
|
||||
sdp << "a=fingerprint:sha-256 " << *mFingerprint << eol;
|
||||
|
||||
|
@ -43,37 +43,65 @@ DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower,
|
||||
std::move(stateChangeCallback)),
|
||||
mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback
|
||||
|
||||
PLOG_DEBUG << "Initializing SRTP transport";
|
||||
PLOG_DEBUG << "Initializing DTLS-SRTP transport";
|
||||
|
||||
#if USE_GNUTLS
|
||||
PLOG_DEBUG << "Initializing DTLS-SRTP transport (GnuTLS)";
|
||||
gnutls::check(gnutls_srtp_set_profile(mSession, GNUTLS_SRTP_AES128_CM_HMAC_SHA1_80),
|
||||
"Failed to set SRTP profile");
|
||||
#else
|
||||
PLOG_DEBUG << "Initializing DTLS-SRTP transport (OpenSSL)";
|
||||
openssl::check(SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"),
|
||||
"Failed to set SRTP profile");
|
||||
#endif
|
||||
if (srtp_err_status_t err = srtp_create(&mSrtpIn, nullptr)) {
|
||||
throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err)));
|
||||
}
|
||||
if (srtp_err_status_t err = srtp_create(&mSrtpOut, nullptr)) {
|
||||
srtp_dealloc(mSrtpIn);
|
||||
throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err)));
|
||||
}
|
||||
}
|
||||
|
||||
DtlsSrtpTransport::~DtlsSrtpTransport() {
|
||||
stop();
|
||||
|
||||
if (mCreated)
|
||||
srtp_dealloc(mSrtp);
|
||||
srtp_dealloc(mSrtpIn);
|
||||
srtp_dealloc(mSrtpOut);
|
||||
}
|
||||
|
||||
bool DtlsSrtpTransport::send(message_ptr message) {
|
||||
bool DtlsSrtpTransport::sendMedia(message_ptr message) {
|
||||
if (!message)
|
||||
return false;
|
||||
|
||||
if (!mInitDone) {
|
||||
PLOG_WARNING << "SRTP media sent before keys are derived";
|
||||
return false;
|
||||
}
|
||||
|
||||
int size = message->size();
|
||||
PLOG_VERBOSE << "Send size=" << size;
|
||||
|
||||
// srtp_protect() assumes that it can write SRTP_MAX_TRAILER_LEN (for the authentication tag)
|
||||
// into the location in memory immediately following the RTP packet.
|
||||
// The RTP header has a minimum size of 12 bytes
|
||||
if (size < 12)
|
||||
throw std::runtime_error("RTP/RTCP packet too short");
|
||||
|
||||
// srtp_protect() and srtp_protect_rtcp() assume that they can write SRTP_MAX_TRAILER_LEN (for
|
||||
// the authentication tag) into the location in memory immediately following the RTP packet.
|
||||
message->resize(size + SRTP_MAX_TRAILER_LEN);
|
||||
if (srtp_err_status_t err = srtp_protect(mSrtp, message->data(), &size)) {
|
||||
|
||||
uint8_t value2 = to_integer<uint8_t>(*(message->begin() + 1)) & 0x7F;
|
||||
PLOG_VERBOSE << "Demultiplexing SRTCP and SRTP with RTP payload type, value="
|
||||
<< unsigned(value2);
|
||||
|
||||
// RFC 5761 Multiplexing RTP and RTCP 4. Distinguishable RTP and RTCP Packets
|
||||
// It is RECOMMENDED to follow the guidelines in the RTP/AVP profile for the choice of RTP
|
||||
// payload type values, with the additional restriction that payload type values in the
|
||||
// range 64-95 MUST NOT be used. Specifically, dynamic RTP payload types SHOULD be chosen in
|
||||
// the range 96-127 where possible. Values below 64 MAY be used if that is insufficient
|
||||
// [...]
|
||||
if (value2 >= 64 && value2 <= 95) { // Range 64-95 (inclusive) MUST be RTCP
|
||||
if (srtp_err_status_t err = srtp_protect_rtcp(mSrtpOut, message->data(), &size)) {
|
||||
if (err == srtp_err_status_replay_fail)
|
||||
throw std::runtime_error("SRTCP packet is a replay");
|
||||
else
|
||||
throw std::runtime_error("SRTCP protect error, status=" +
|
||||
to_string(static_cast<int>(err)));
|
||||
}
|
||||
PLOG_VERBOSE << "Protected SRTCP packet, size=" << size;
|
||||
} else {
|
||||
if (srtp_err_status_t err = srtp_protect(mSrtpOut, message->data(), &size)) {
|
||||
if (err == srtp_err_status_replay_fail)
|
||||
throw std::runtime_error("SRTP packet is a replay");
|
||||
else
|
||||
@ -81,12 +109,20 @@ bool DtlsSrtpTransport::send(message_ptr message) {
|
||||
to_string(static_cast<int>(err)));
|
||||
}
|
||||
PLOG_VERBOSE << "Protected SRTP packet, size=" << size;
|
||||
}
|
||||
|
||||
message->resize(size);
|
||||
outgoing(message);
|
||||
return true;
|
||||
}
|
||||
|
||||
void DtlsSrtpTransport::incoming(message_ptr message) {
|
||||
if (!mInitDone) {
|
||||
// Bypas
|
||||
DtlsTransport::incoming(message);
|
||||
return;
|
||||
}
|
||||
|
||||
int size = message->size();
|
||||
if (size == 0)
|
||||
return;
|
||||
@ -95,15 +131,39 @@ void DtlsSrtpTransport::incoming(message_ptr message) {
|
||||
// The process for demultiplexing a packet is as follows. The receiver looks at the first byte
|
||||
// of the packet. [...] If the value is in between 128 and 191 (inclusive), then the packet is
|
||||
// RTP (or RTCP [...]). If the value is between 20 and 63 (inclusive), the packet is DTLS.
|
||||
uint8_t value = to_integer<uint8_t>(*message->begin());
|
||||
uint8_t value1 = to_integer<uint8_t>(*message->begin());
|
||||
PLOG_VERBOSE << "Demultiplexing DTLS and SRTP/SRTCP with first byte, value="
|
||||
<< unsigned(value1);
|
||||
|
||||
if (value >= 128 && value <= 192) {
|
||||
if (value1 >= 20 && value1 <= 63) {
|
||||
PLOG_VERBOSE << "Incoming DTLS packet, size=" << size;
|
||||
DtlsTransport::incoming(message);
|
||||
} else if (value >= 20 && value <= 64) {
|
||||
PLOG_VERBOSE << "Incoming SRTP packet, size=" << size;
|
||||
|
||||
if (srtp_err_status_t err = srtp_unprotect(mSrtp, message->data(), &size)) {
|
||||
} else if (value1 >= 128 && value1 <= 191) {
|
||||
// The RTP header has a minimum size of 12 bytes
|
||||
if (size < 12) {
|
||||
PLOG_WARNING << "Incoming SRTP/SRTCP packet too short, size=" << size;
|
||||
return;
|
||||
}
|
||||
|
||||
uint8_t value2 = to_integer<uint8_t>(*(message->begin() + 1)) & 0x7F;
|
||||
PLOG_VERBOSE << "Demultiplexing SRTCP and SRTP with RTP payload type, value="
|
||||
<< unsigned(value2);
|
||||
|
||||
// See RFC 5761 reference above
|
||||
if (value2 >= 64 && value2 <= 95) { // Range 64-95 (inclusive) MUST be RTCP
|
||||
PLOG_VERBOSE << "Incoming SRTCP packet, size=" << size;
|
||||
if (srtp_err_status_t err = srtp_unprotect_rtcp(mSrtpIn, message->data(), &size)) {
|
||||
if (err == srtp_err_status_replay_fail)
|
||||
PLOG_WARNING << "Incoming SRTCP packet is a replay";
|
||||
else
|
||||
PLOG_WARNING << "SRTCP unprotect error, status=" << err;
|
||||
return;
|
||||
}
|
||||
PLOG_VERBOSE << "Unprotected SRTCP packet, size=" << size;
|
||||
} else {
|
||||
PLOG_VERBOSE << "Incoming SRTP packet, size=" << size;
|
||||
if (srtp_err_status_t err = srtp_unprotect(mSrtpIn, message->data(), &size)) {
|
||||
if (err == srtp_err_status_replay_fail)
|
||||
PLOG_WARNING << "Incoming SRTP packet is a replay";
|
||||
else
|
||||
@ -111,33 +171,40 @@ void DtlsSrtpTransport::incoming(message_ptr message) {
|
||||
return;
|
||||
}
|
||||
PLOG_VERBOSE << "Unprotected SRTP packet, size=" << size;
|
||||
}
|
||||
|
||||
message->resize(size);
|
||||
mSrtpRecvCallback(message);
|
||||
|
||||
} else {
|
||||
PLOG_WARNING << "Unknown packet type, value=" << value << ", size=" << size;
|
||||
PLOG_WARNING << "Unknown packet type, value=" << unsigned(value1) << ", size=" << size;
|
||||
}
|
||||
}
|
||||
|
||||
void DtlsSrtpTransport::postCreation() {
|
||||
#if USE_GNUTLS
|
||||
PLOG_DEBUG << "Setting SRTP profile (GnuTLS)";
|
||||
gnutls::check(gnutls_srtp_set_profile(mSession, GNUTLS_SRTP_AES128_CM_HMAC_SHA1_80),
|
||||
"Failed to set SRTP profile");
|
||||
#else
|
||||
PLOG_DEBUG << "Setting SRTP profile (OpenSSL)";
|
||||
// returns 0 on success, 1 on error
|
||||
if (SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"), "Failed to set SRTP profile")
|
||||
throw std::runtime_error("Failed to set SRTP profile: " + openssl::error_string(ERR_get_error()));
|
||||
#endif
|
||||
}
|
||||
|
||||
void DtlsSrtpTransport::postHandshake() {
|
||||
if (mCreated)
|
||||
if (mInitDone)
|
||||
return;
|
||||
|
||||
srtp_policy_t inbound = {};
|
||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtp);
|
||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtcp);
|
||||
inbound.ssrc.type = ssrc_any_inbound;
|
||||
|
||||
srtp_policy_t outbound = {};
|
||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtp);
|
||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtcp);
|
||||
outbound.ssrc.type = ssrc_any_outbound;
|
||||
|
||||
const size_t materialLen = SRTP_AES_ICM_128_KEY_LEN_WSALT * 2;
|
||||
unsigned char material[materialLen];
|
||||
const unsigned char *clientKey, *clientSalt, *serverKey, *serverSalt;
|
||||
|
||||
#if USE_GNUTLS
|
||||
PLOG_INFO << "Deriving SRTP keying material (GnuTLS)";
|
||||
|
||||
gnutls_datum_t clientKeyDatum, clientSaltDatum, serverKeyDatum, serverSaltDatum;
|
||||
gnutls::check(gnutls_srtp_get_keys(mSession, material, materialLen, &clientKeyDatum,
|
||||
&clientSaltDatum, &serverKeyDatum, &serverSaltDatum),
|
||||
@ -160,18 +227,23 @@ void DtlsSrtpTransport::postHandshake() {
|
||||
serverKey = reinterpret_cast<const unsigned char *>(serverKeyDatum.data);
|
||||
serverSalt = reinterpret_cast<const unsigned char *>(serverSaltDatum.data);
|
||||
#else
|
||||
// This provides the client write master key, the server write master key, the client write
|
||||
// master salt and the server write master salt in that order.
|
||||
PLOG_INFO << "Deriving SRTP keying material (OpenSSL)";
|
||||
|
||||
// The extractor provides the client write master key, the server write master key, the client
|
||||
// write master salt and the server write master salt in that order.
|
||||
const string label = "EXTRACTOR-dtls_srtp";
|
||||
openssl::check(SSL_export_keying_material(mSsl, material, materialLen, label.c_str(),
|
||||
label.size(), nullptr, 0, 0),
|
||||
"Failed to derive SRTP keys");
|
||||
|
||||
// returns 1 on success, 0 or -1 on failure (OpenSSL API is a complete mess...)
|
||||
if (SSL_export_keying_material(mSsl, material, materialLen, label.c_str(), label.size(),
|
||||
nullptr, 0, 0) <= 0)
|
||||
throw std::runtime_error("Failed to derive SRTP keys: " +
|
||||
openssl::error_string(ERR_get_error()));
|
||||
|
||||
clientKey = material;
|
||||
clientSalt = clientKey + SRTP_AES_128_KEY_LEN;
|
||||
|
||||
serverKey = material + SRTP_AES_ICM_128_KEY_LEN_WSALT;
|
||||
serverSalt = serverSalt + SRTP_AES_128_KEY_LEN;
|
||||
serverSalt = serverKey + SRTP_AES_128_KEY_LEN;
|
||||
#endif
|
||||
|
||||
unsigned char clientSessionKey[SRTP_AES_ICM_128_KEY_LEN_WSALT];
|
||||
@ -182,22 +254,31 @@ void DtlsSrtpTransport::postHandshake() {
|
||||
std::memcpy(serverSessionKey, serverKey, SRTP_AES_128_KEY_LEN);
|
||||
std::memcpy(serverSessionKey + SRTP_AES_128_KEY_LEN, serverSalt, SRTP_SALT_LEN);
|
||||
|
||||
if (mIsClient) {
|
||||
inbound.key = serverSessionKey;
|
||||
outbound.key = clientSessionKey;
|
||||
} else {
|
||||
inbound.key = clientSessionKey;
|
||||
outbound.key = serverSessionKey;
|
||||
}
|
||||
srtp_policy_t inbound = {};
|
||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtp);
|
||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtcp);
|
||||
inbound.ssrc.type = ssrc_any_inbound;
|
||||
inbound.ssrc.value = 0;
|
||||
inbound.key = mIsClient ? serverSessionKey : clientSessionKey;
|
||||
inbound.next = nullptr;
|
||||
|
||||
srtp_policy_t *policies = &inbound;
|
||||
inbound.next = &outbound;
|
||||
if (srtp_err_status_t err = srtp_add_stream(mSrtpIn, &inbound))
|
||||
throw std::runtime_error("SRTP add inbound stream failed, status=" +
|
||||
to_string(static_cast<int>(err)));
|
||||
|
||||
srtp_policy_t outbound = {};
|
||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtp);
|
||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtcp);
|
||||
outbound.ssrc.type = ssrc_any_outbound;
|
||||
outbound.ssrc.value = 0;
|
||||
outbound.key = mIsClient ? clientSessionKey : serverSessionKey;
|
||||
outbound.next = nullptr;
|
||||
|
||||
if (srtp_err_status_t err = srtp_create(&mSrtp, policies))
|
||||
throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err)));
|
||||
if (srtp_err_status_t err = srtp_add_stream(mSrtpOut, &outbound))
|
||||
throw std::runtime_error("SRTP add outbound stream failed, status=" +
|
||||
to_string(static_cast<int>(err)));
|
||||
|
||||
mCreated = true;
|
||||
mInitDone = true;
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
@ -38,16 +38,17 @@ public:
|
||||
state_callback stateChangeCallback);
|
||||
~DtlsSrtpTransport();
|
||||
|
||||
bool send(message_ptr message) override;
|
||||
bool sendMedia(message_ptr message);
|
||||
|
||||
private:
|
||||
void incoming(message_ptr message) override;
|
||||
void postCreation() override;
|
||||
void postHandshake() override;
|
||||
|
||||
message_callback mSrtpRecvCallback;
|
||||
|
||||
srtp_t mSrtp;
|
||||
bool mCreated = false;
|
||||
srtp_t mSrtpIn, mSrtpOut;
|
||||
bool mInitDone = false;
|
||||
};
|
||||
|
||||
} // namespace rtc
|
||||
|
@ -85,6 +85,8 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr cer
|
||||
gnutls_transport_set_pull_function(mSession, ReadCallback);
|
||||
gnutls_transport_set_pull_timeout_function(mSession, TimeoutCallback);
|
||||
|
||||
postCreation();
|
||||
|
||||
mRecvThread = std::thread(&DtlsTransport::runRecvLoop, this);
|
||||
registerIncoming();
|
||||
|
||||
@ -137,6 +139,10 @@ void DtlsTransport::incoming(message_ptr message) {
|
||||
mIncomingQueue.push(message);
|
||||
}
|
||||
|
||||
void DtlsTransport::postCreation() {
|
||||
// Dummy
|
||||
}
|
||||
|
||||
void DtlsTransport::postHandshake() {
|
||||
// Dummy
|
||||
}
|
||||
@ -408,6 +414,10 @@ void DtlsTransport::incoming(message_ptr message) {
|
||||
mIncomingQueue.push(message);
|
||||
}
|
||||
|
||||
void DtlsTransport::postCreation() {
|
||||
// Dummy
|
||||
}
|
||||
|
||||
void DtlsTransport::postHandshake() {
|
||||
// Dummy
|
||||
}
|
||||
|
@ -52,6 +52,7 @@ public:
|
||||
|
||||
protected:
|
||||
virtual void incoming(message_ptr message) override;
|
||||
virtual void postCreation();
|
||||
virtual void postHandshake();
|
||||
void runRecvLoop();
|
||||
|
||||
|
90
src/init.cpp
90
src/init.cpp
@ -21,6 +21,7 @@
|
||||
#include "certificate.hpp"
|
||||
#include "dtlstransport.hpp"
|
||||
#include "sctptransport.hpp"
|
||||
#include "threadpool.hpp"
|
||||
#include "tls.hpp"
|
||||
|
||||
#if RTC_ENABLE_WEBSOCKET
|
||||
@ -39,39 +40,19 @@ using std::shared_ptr;
|
||||
|
||||
namespace rtc {
|
||||
|
||||
std::weak_ptr<Init> Init::Weak;
|
||||
init_token Init::Global;
|
||||
std::mutex Init::Mutex;
|
||||
namespace {
|
||||
|
||||
init_token Init::Token() {
|
||||
std::lock_guard lock(Mutex);
|
||||
void doInit() {
|
||||
PLOG_DEBUG << "Global initialization";
|
||||
|
||||
if (!Global) {
|
||||
if (auto token = Weak.lock())
|
||||
Global = token;
|
||||
else
|
||||
Global = shared_ptr<Init>(new Init());
|
||||
}
|
||||
return Global;
|
||||
}
|
||||
|
||||
void Init::Preload() {
|
||||
Token(); // pre-init
|
||||
make_certificate().wait(); // preload certificate
|
||||
}
|
||||
|
||||
void Init::Cleanup() {
|
||||
Global.reset();
|
||||
CleanupCertificateCache();
|
||||
}
|
||||
|
||||
Init::Init() {
|
||||
#ifdef _WIN32
|
||||
WSADATA wsaData;
|
||||
if (WSAStartup(MAKEWORD(2, 2), &wsaData))
|
||||
throw std::runtime_error("WSAStartup failed, error=" + std::to_string(WSAGetLastError()));
|
||||
#endif
|
||||
|
||||
ThreadPool::Instance().spawn(THREADPOOL_SIZE);
|
||||
|
||||
#if USE_GNUTLS
|
||||
// Nothing to do
|
||||
#else
|
||||
@ -88,7 +69,12 @@ Init::Init() {
|
||||
#endif
|
||||
}
|
||||
|
||||
Init::~Init() {
|
||||
void doCleanup() {
|
||||
PLOG_DEBUG << "Global cleanup";
|
||||
|
||||
ThreadPool::Instance().join();
|
||||
CleanupCertificateCache();
|
||||
|
||||
SctpTransport::Cleanup();
|
||||
DtlsTransport::Cleanup();
|
||||
#if RTC_ENABLE_WEBSOCKET
|
||||
@ -103,5 +89,57 @@ Init::~Init() {
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
std::weak_ptr<void> Init::Weak;
|
||||
std::shared_ptr<void> *Init::Global = nullptr;
|
||||
bool Init::Initialized = false;
|
||||
std::recursive_mutex Init::Mutex;
|
||||
|
||||
init_token Init::Token() {
|
||||
std::unique_lock lock(Mutex);
|
||||
if (auto token = Weak.lock())
|
||||
return token;
|
||||
|
||||
delete Global;
|
||||
Global = new shared_ptr<void>(new Init());
|
||||
Weak = *Global;
|
||||
return *Global;
|
||||
}
|
||||
|
||||
void Init::Preload() {
|
||||
std::unique_lock lock(Mutex);
|
||||
auto token = Token();
|
||||
if (!Global)
|
||||
Global = new shared_ptr<void>(token);
|
||||
|
||||
PLOG_DEBUG << "Preloading certificate";
|
||||
make_certificate().wait();
|
||||
}
|
||||
|
||||
void Init::Cleanup() {
|
||||
std::unique_lock lock(Mutex);
|
||||
delete Global;
|
||||
Global = nullptr;
|
||||
}
|
||||
|
||||
Init::Init() {
|
||||
// Mutex is locked by Token() here
|
||||
if (!std::exchange(Initialized, true))
|
||||
doInit();
|
||||
}
|
||||
|
||||
Init::~Init() {
|
||||
std::thread t([]() {
|
||||
// We need to lock Mutex ourselves
|
||||
std::unique_lock lock(Mutex);
|
||||
if (Global)
|
||||
return;
|
||||
if (std::exchange(Initialized, false))
|
||||
doCleanup();
|
||||
});
|
||||
t.detach();
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
||||
|
@ -18,9 +18,12 @@
|
||||
|
||||
#include "peerconnection.hpp"
|
||||
#include "certificate.hpp"
|
||||
#include "include.hpp"
|
||||
#include "processor.hpp"
|
||||
#include "threadpool.hpp"
|
||||
|
||||
#include "dtlstransport.hpp"
|
||||
#include "icetransport.hpp"
|
||||
#include "include.hpp"
|
||||
#include "sctptransport.hpp"
|
||||
|
||||
#if RTC_ENABLE_MEDIA
|
||||
@ -39,8 +42,8 @@ using std::weak_ptr;
|
||||
PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
|
||||
|
||||
PeerConnection::PeerConnection(const Configuration &config)
|
||||
: mConfig(config), mCertificate(make_certificate()), mState(State::New),
|
||||
mGatheringState(GatheringState::New) {
|
||||
: mConfig(config), mCertificate(make_certificate()), mProcessor(std::make_unique<Processor>()),
|
||||
mState(State::New), mGatheringState(GatheringState::New) {
|
||||
PLOG_VERBOSE << "Creating PeerConnection";
|
||||
|
||||
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
|
||||
@ -145,6 +148,7 @@ void PeerConnection::addRemoteCandidate(Candidate candidate) {
|
||||
iceTransport->addRemoteCandidate(candidate);
|
||||
} else {
|
||||
// OK, we might need a lookup, do it asynchronously
|
||||
// We don't use the thread pool because we have no control on the timeout
|
||||
weak_ptr<IceTransport> weakIceTransport{iceTransport};
|
||||
std::thread t([weakIceTransport, candidate]() mutable {
|
||||
if (candidate.resolve(Candidate::ResolveMode::Lookup))
|
||||
@ -227,7 +231,7 @@ void PeerConnection::sendMedia(const binary &packet) {
|
||||
outgoingMedia(make_message(packet.begin(), packet.end(), Message::Binary));
|
||||
}
|
||||
|
||||
void PeerConnection::send(const byte *packet, size_t size) {
|
||||
void PeerConnection::sendMedia(const byte *packet, size_t size) {
|
||||
outgoingMedia(make_message(packet, packet + size, Message::Binary));
|
||||
}
|
||||
|
||||
@ -244,7 +248,7 @@ void PeerConnection::outgoingMedia(message_ptr message) {
|
||||
if (!transport)
|
||||
throw std::runtime_error("PeerConnection is not open");
|
||||
|
||||
std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)->send(message);
|
||||
std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)->sendMedia(message);
|
||||
#else
|
||||
PLOG_WARNING << "Ignoring sent media (not compiled with SRTP support)";
|
||||
#endif
|
||||
@ -445,8 +449,7 @@ void PeerConnection::closeTransports() {
|
||||
auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
|
||||
auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
|
||||
auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
|
||||
if (sctp || dtls || ice) {
|
||||
std::thread t([sctp, dtls, ice, token = mInitToken]() mutable {
|
||||
ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
|
||||
if (sctp)
|
||||
sctp->stop();
|
||||
if (dtls)
|
||||
@ -458,8 +461,6 @@ void PeerConnection::closeTransports() {
|
||||
dtls.reset();
|
||||
ice.reset();
|
||||
});
|
||||
t.detach();
|
||||
}
|
||||
}
|
||||
|
||||
void PeerConnection::endLocalCandidates() {
|
||||
@ -502,7 +503,7 @@ void PeerConnection::forwardMessage(message_ptr message) {
|
||||
mDataChannels.insert(std::make_pair(message->stream, channel));
|
||||
} else {
|
||||
// Invalid, close the DataChannel
|
||||
sctpTransport->close(message->stream);
|
||||
sctpTransport->closeStream(message->stream);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -594,20 +595,26 @@ void PeerConnection::remoteCloseDataChannels() {
|
||||
|
||||
void PeerConnection::processLocalDescription(Description description) {
|
||||
std::optional<uint16_t> remoteSctpPort;
|
||||
if (auto remote = remoteDescription())
|
||||
std::optional<string> remoteDataMid;
|
||||
if (auto remote = remoteDescription()) {
|
||||
remoteDataMid = remote->dataMid();
|
||||
remoteSctpPort = remote->sctpPort();
|
||||
}
|
||||
|
||||
auto certificate = mCertificate.get(); // wait for certificate if not ready
|
||||
|
||||
{
|
||||
std::lock_guard lock(mLocalDescriptionMutex);
|
||||
mLocalDescription.emplace(std::move(description));
|
||||
if (remoteDataMid)
|
||||
mLocalDescription->setDataMid(*remoteDataMid);
|
||||
|
||||
mLocalDescription->setFingerprint(certificate->fingerprint());
|
||||
mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
|
||||
mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
|
||||
}
|
||||
|
||||
mLocalDescriptionCallback(*mLocalDescription);
|
||||
mProcessor->enqueue([this]() { mLocalDescriptionCallback(*mLocalDescription); });
|
||||
}
|
||||
|
||||
void PeerConnection::processLocalCandidate(Candidate candidate) {
|
||||
@ -617,7 +624,8 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
|
||||
|
||||
mLocalDescription->addCandidate(candidate);
|
||||
|
||||
mLocalCandidateCallback(candidate);
|
||||
mProcessor->enqueue(
|
||||
[this, candidate = std::move(candidate)]() { mLocalCandidateCallback(candidate); });
|
||||
}
|
||||
|
||||
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
|
||||
@ -625,7 +633,8 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
|
||||
if (!dataChannel)
|
||||
return;
|
||||
|
||||
mDataChannelCallback(dataChannel);
|
||||
mProcessor->enqueue(
|
||||
[this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
|
||||
}
|
||||
|
||||
bool PeerConnection::changeState(State state) {
|
||||
@ -639,13 +648,13 @@ bool PeerConnection::changeState(State state) {
|
||||
|
||||
} while (!mState.compare_exchange_weak(current, state));
|
||||
|
||||
mStateChangeCallback(state);
|
||||
mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PeerConnection::changeGatheringState(GatheringState state) {
|
||||
if (mGatheringState.exchange(state) != state)
|
||||
mGatheringStateChangeCallback(state);
|
||||
mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
|
||||
return true;
|
||||
}
|
||||
|
||||
|
44
src/processor.cpp
Normal file
44
src/processor.cpp
Normal file
@ -0,0 +1,44 @@
|
||||
/**
|
||||
* Copyright (c) 2020 Paul-Louis Ageneau
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2.1 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
#include "processor.hpp"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
Processor::~Processor() { join(); }
|
||||
|
||||
void Processor::join() {
|
||||
std::unique_lock lock(mMutex);
|
||||
mCondition.wait(lock, [this]() { return !mPending && mTasks.empty(); });
|
||||
}
|
||||
|
||||
void Processor::schedule() {
|
||||
std::unique_lock lock(mMutex);
|
||||
if (mTasks.empty()) {
|
||||
// No more tasks
|
||||
mPending = false;
|
||||
mCondition.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
ThreadPool::Instance().enqueue(std::move(mTasks.front()));
|
||||
mTasks.pop();
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
92
src/processor.hpp
Normal file
92
src/processor.hpp
Normal file
@ -0,0 +1,92 @@
|
||||
/**
|
||||
* Copyright (c) 2020 Paul-Louis Ageneau
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2.1 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
#ifndef RTC_PROCESSOR_H
|
||||
#define RTC_PROCESSOR_H
|
||||
|
||||
#include "include.hpp"
|
||||
#include "init.hpp"
|
||||
#include "threadpool.hpp"
|
||||
|
||||
#include <condition_variable>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
||||
namespace rtc {
|
||||
|
||||
// Processed tasks in order by delegating them to the thread pool
|
||||
class Processor final {
|
||||
public:
|
||||
Processor() = default;
|
||||
~Processor();
|
||||
|
||||
Processor(const Processor &) = delete;
|
||||
Processor &operator=(const Processor &) = delete;
|
||||
Processor(Processor &&) = delete;
|
||||
Processor &operator=(Processor &&) = delete;
|
||||
|
||||
void join();
|
||||
|
||||
template <class F, class... Args>
|
||||
auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
|
||||
|
||||
protected:
|
||||
void schedule();
|
||||
|
||||
// Keep an init token
|
||||
const init_token mInitToken = Init::Token();
|
||||
|
||||
std::queue<std::function<void()>> mTasks;
|
||||
bool mPending = false; // true iff a task is pending in the thread pool
|
||||
|
||||
mutable std::mutex mMutex;
|
||||
std::condition_variable mCondition;
|
||||
};
|
||||
|
||||
template <class F, class... Args>
|
||||
auto Processor::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
|
||||
std::unique_lock lock(mMutex);
|
||||
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
|
||||
auto task = std::make_shared<std::packaged_task<R()>>(
|
||||
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
|
||||
std::future<R> result = task->get_future();
|
||||
|
||||
auto bundle = [this, task = std::move(task)]() {
|
||||
try {
|
||||
(*task)();
|
||||
} catch (const std::exception &e) {
|
||||
PLOG_WARNING << "Unhandled exception in task: " << e.what();
|
||||
}
|
||||
schedule(); // chain the next task
|
||||
};
|
||||
|
||||
if (!mPending) {
|
||||
ThreadPool::Instance().enqueue(std::move(bundle));
|
||||
mPending = true;
|
||||
} else {
|
||||
mTasks.emplace(std::move(bundle));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
||||
#endif
|
66
src/rtc.cpp
66
src/rtc.cpp
@ -55,18 +55,15 @@ std::unordered_map<int, void *> userPointerMap;
|
||||
std::mutex mutex;
|
||||
int lastId = 0;
|
||||
|
||||
void *getUserPointer(int id) {
|
||||
std::optional<void *> getUserPointer(int id) {
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = userPointerMap.find(id);
|
||||
return it != userPointerMap.end() ? it->second : nullptr;
|
||||
return it != userPointerMap.end() ? std::make_optional(it->second) : nullopt;
|
||||
}
|
||||
|
||||
void setUserPointer(int i, void *ptr) {
|
||||
std::lock_guard lock(mutex);
|
||||
if (ptr)
|
||||
userPointerMap[i] = ptr;
|
||||
else
|
||||
userPointerMap.erase(i);
|
||||
}
|
||||
|
||||
shared_ptr<PeerConnection> getPeerConnection(int id) {
|
||||
@ -89,6 +86,7 @@ int emplacePeerConnection(shared_ptr<PeerConnection> ptr) {
|
||||
std::lock_guard lock(mutex);
|
||||
int pc = ++lastId;
|
||||
peerConnectionMap.emplace(std::make_pair(pc, ptr));
|
||||
userPointerMap.emplace(std::make_pair(pc, nullptr));
|
||||
return pc;
|
||||
}
|
||||
|
||||
@ -96,6 +94,7 @@ int emplaceDataChannel(shared_ptr<DataChannel> ptr) {
|
||||
std::lock_guard lock(mutex);
|
||||
int dc = ++lastId;
|
||||
dataChannelMap.emplace(std::make_pair(dc, ptr));
|
||||
userPointerMap.emplace(std::make_pair(dc, nullptr));
|
||||
return dc;
|
||||
}
|
||||
|
||||
@ -126,6 +125,7 @@ int emplaceWebSocket(shared_ptr<WebSocket> ptr) {
|
||||
std::lock_guard lock(mutex);
|
||||
int ws = ++lastId;
|
||||
webSocketMap.emplace(std::make_pair(ws, ptr));
|
||||
userPointerMap.emplace(std::make_pair(ws, nullptr));
|
||||
return ws;
|
||||
}
|
||||
|
||||
@ -244,7 +244,8 @@ int rtcCreateDataChannel(int pc, const char *label) {
|
||||
return WRAP({
|
||||
auto peerConnection = getPeerConnection(pc);
|
||||
int dc = emplaceDataChannel(peerConnection->createDataChannel(string(label)));
|
||||
rtcSetUserPointer(dc, getUserPointer(pc));
|
||||
if (auto ptr = getUserPointer(pc))
|
||||
rtcSetUserPointer(dc, *ptr);
|
||||
return dc;
|
||||
});
|
||||
}
|
||||
@ -293,9 +294,10 @@ int rtcSetDataChannelCallback(int pc, rtcDataChannelCallbackFunc cb) {
|
||||
if (cb)
|
||||
peerConnection->onDataChannel([pc, cb](std::shared_ptr<DataChannel> dataChannel) {
|
||||
int dc = emplaceDataChannel(dataChannel);
|
||||
void *ptr = getUserPointer(pc);
|
||||
rtcSetUserPointer(dc, ptr);
|
||||
cb(dc, ptr);
|
||||
if (auto ptr = getUserPointer(pc)) {
|
||||
rtcSetUserPointer(dc, *ptr);
|
||||
cb(dc, *ptr);
|
||||
}
|
||||
});
|
||||
else
|
||||
peerConnection->onDataChannel(nullptr);
|
||||
@ -307,7 +309,8 @@ int rtcSetLocalDescriptionCallback(int pc, rtcDescriptionCallbackFunc cb) {
|
||||
auto peerConnection = getPeerConnection(pc);
|
||||
if (cb)
|
||||
peerConnection->onLocalDescription([pc, cb](const Description &desc) {
|
||||
cb(string(desc).c_str(), desc.typeString().c_str(), getUserPointer(pc));
|
||||
if (auto ptr = getUserPointer(pc))
|
||||
cb(string(desc).c_str(), desc.typeString().c_str(), *ptr);
|
||||
});
|
||||
else
|
||||
peerConnection->onLocalDescription(nullptr);
|
||||
@ -319,7 +322,8 @@ int rtcSetLocalCandidateCallback(int pc, rtcCandidateCallbackFunc cb) {
|
||||
auto peerConnection = getPeerConnection(pc);
|
||||
if (cb)
|
||||
peerConnection->onLocalCandidate([pc, cb](const Candidate &cand) {
|
||||
cb(cand.candidate().c_str(), cand.mid().c_str(), getUserPointer(pc));
|
||||
if (auto ptr = getUserPointer(pc))
|
||||
cb(cand.candidate().c_str(), cand.mid().c_str(), *ptr);
|
||||
});
|
||||
else
|
||||
peerConnection->onLocalCandidate(nullptr);
|
||||
@ -331,7 +335,8 @@ int rtcSetStateChangeCallback(int pc, rtcStateChangeCallbackFunc cb) {
|
||||
auto peerConnection = getPeerConnection(pc);
|
||||
if (cb)
|
||||
peerConnection->onStateChange([pc, cb](PeerConnection::State state) {
|
||||
cb(static_cast<rtcState>(state), getUserPointer(pc));
|
||||
if (auto ptr = getUserPointer(pc))
|
||||
cb(static_cast<rtcState>(state), *ptr);
|
||||
});
|
||||
else
|
||||
peerConnection->onStateChange(nullptr);
|
||||
@ -343,7 +348,8 @@ int rtcSetGatheringStateChangeCallback(int pc, rtcGatheringStateCallbackFunc cb)
|
||||
auto peerConnection = getPeerConnection(pc);
|
||||
if (cb)
|
||||
peerConnection->onGatheringStateChange([pc, cb](PeerConnection::GatheringState state) {
|
||||
cb(static_cast<rtcGatheringState>(state), getUserPointer(pc));
|
||||
if (auto ptr = getUserPointer(pc))
|
||||
cb(static_cast<rtcGatheringState>(state), *ptr);
|
||||
});
|
||||
else
|
||||
peerConnection->onGatheringStateChange(nullptr);
|
||||
@ -435,7 +441,10 @@ int rtcSetOpenCallback(int id, rtcOpenCallbackFunc cb) {
|
||||
return WRAP({
|
||||
auto channel = getChannel(id);
|
||||
if (cb)
|
||||
channel->onOpen([id, cb]() { cb(getUserPointer(id)); });
|
||||
channel->onOpen([id, cb]() {
|
||||
if (auto ptr = getUserPointer(id))
|
||||
cb(*ptr);
|
||||
});
|
||||
else
|
||||
channel->onOpen(nullptr);
|
||||
});
|
||||
@ -445,7 +454,10 @@ int rtcSetClosedCallback(int id, rtcClosedCallbackFunc cb) {
|
||||
return WRAP({
|
||||
auto channel = getChannel(id);
|
||||
if (cb)
|
||||
channel->onClosed([id, cb]() { cb(getUserPointer(id)); });
|
||||
channel->onClosed([id, cb]() {
|
||||
if (auto ptr = getUserPointer(id))
|
||||
cb(*ptr);
|
||||
});
|
||||
else
|
||||
channel->onClosed(nullptr);
|
||||
});
|
||||
@ -455,8 +467,10 @@ int rtcSetErrorCallback(int id, rtcErrorCallbackFunc cb) {
|
||||
return WRAP({
|
||||
auto channel = getChannel(id);
|
||||
if (cb)
|
||||
channel->onError(
|
||||
[id, cb](const string &error) { cb(error.c_str(), getUserPointer(id)); });
|
||||
channel->onError([id, cb](const string &error) {
|
||||
if (auto ptr = getUserPointer(id))
|
||||
cb(error.c_str(), *ptr);
|
||||
});
|
||||
else
|
||||
channel->onError(nullptr);
|
||||
});
|
||||
@ -468,9 +482,13 @@ int rtcSetMessageCallback(int id, rtcMessageCallbackFunc cb) {
|
||||
if (cb)
|
||||
channel->onMessage(
|
||||
[id, cb](const binary &b) {
|
||||
cb(reinterpret_cast<const char *>(b.data()), int(b.size()), getUserPointer(id));
|
||||
if (auto ptr = getUserPointer(id))
|
||||
cb(reinterpret_cast<const char *>(b.data()), int(b.size()), *ptr);
|
||||
},
|
||||
[id, cb](const string &s) { cb(s.c_str(), -1, getUserPointer(id)); });
|
||||
[id, cb](const string &s) {
|
||||
if (auto ptr = getUserPointer(id))
|
||||
cb(s.c_str(), -int(s.size() + 1), *ptr);
|
||||
});
|
||||
else
|
||||
channel->onMessage(nullptr);
|
||||
});
|
||||
@ -514,7 +532,10 @@ int rtcSetBufferedAmountLowCallback(int id, rtcBufferedAmountLowCallbackFunc cb)
|
||||
return WRAP({
|
||||
auto channel = getChannel(id);
|
||||
if (cb)
|
||||
channel->onBufferedAmountLow([id, cb]() { cb(getUserPointer(id)); });
|
||||
channel->onBufferedAmountLow([id, cb]() {
|
||||
if (auto ptr = getUserPointer(id))
|
||||
cb(*ptr);
|
||||
});
|
||||
else
|
||||
channel->onBufferedAmountLow(nullptr);
|
||||
});
|
||||
@ -528,7 +549,10 @@ int rtcSetAvailableCallback(int id, rtcAvailableCallbackFunc cb) {
|
||||
return WRAP({
|
||||
auto channel = getChannel(id);
|
||||
if (cb)
|
||||
channel->onOpen([id, cb]() { cb(getUserPointer(id)); });
|
||||
channel->onOpen([id, cb]() {
|
||||
if (auto ptr = getUserPointer(id))
|
||||
cb(*ptr);
|
||||
});
|
||||
else
|
||||
channel->onOpen(nullptr);
|
||||
});
|
||||
|
@ -50,6 +50,9 @@ using std::shared_ptr;
|
||||
|
||||
namespace rtc {
|
||||
|
||||
std::unordered_set<SctpTransport *> SctpTransport::Instances;
|
||||
std::shared_mutex SctpTransport::InstancesMutex;
|
||||
|
||||
void SctpTransport::Init() {
|
||||
usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
|
||||
usrsctp_sysctl_set_sctp_ecn_enable(0);
|
||||
@ -92,6 +95,11 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
||||
PLOG_DEBUG << "Initializing SCTP transport";
|
||||
|
||||
usrsctp_register_address(this);
|
||||
{
|
||||
std::unique_lock lock(InstancesMutex);
|
||||
Instances.insert(this);
|
||||
}
|
||||
|
||||
mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, &SctpTransport::RecvCallback,
|
||||
&SctpTransport::SendCallback, 0, this);
|
||||
if (!mSock)
|
||||
@ -186,14 +194,21 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
||||
|
||||
SctpTransport::~SctpTransport() {
|
||||
stop();
|
||||
|
||||
if (mSock)
|
||||
usrsctp_close(mSock);
|
||||
close();
|
||||
|
||||
usrsctp_deregister_address(this);
|
||||
{
|
||||
std::unique_lock lock(InstancesMutex);
|
||||
Instances.erase(this);
|
||||
}
|
||||
}
|
||||
|
||||
bool SctpTransport::stop() {
|
||||
// Transport::stop() will unregister incoming() from the lower layer, therefore we need to make
|
||||
// sure the thread from lower layers is not blocked in incoming() by the WrittenOnce condition.
|
||||
mWrittenOnce = true;
|
||||
mWrittenCondition.notify_all();
|
||||
|
||||
if (!Transport::stop())
|
||||
return false;
|
||||
|
||||
@ -204,6 +219,13 @@ bool SctpTransport::stop() {
|
||||
return true;
|
||||
}
|
||||
|
||||
void SctpTransport::close() {
|
||||
if (mSock) {
|
||||
usrsctp_close(mSock);
|
||||
mSock = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void SctpTransport::connect() {
|
||||
if (!mSock)
|
||||
return;
|
||||
@ -240,9 +262,7 @@ void SctpTransport::shutdown() {
|
||||
PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
|
||||
}
|
||||
|
||||
// close() abort the connection when linger is disabled, call it now
|
||||
usrsctp_close(mSock);
|
||||
mSock = nullptr;
|
||||
close();
|
||||
|
||||
PLOG_INFO << "SCTP disconnected";
|
||||
changeState(State::Disconnected);
|
||||
@ -266,7 +286,7 @@ bool SctpTransport::send(message_ptr message) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void SctpTransport::close(unsigned int stream) {
|
||||
void SctpTransport::closeStream(unsigned int stream) {
|
||||
send(make_message(0, Message::Reset, uint16_t(stream)));
|
||||
}
|
||||
|
||||
@ -405,7 +425,7 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
|
||||
try {
|
||||
mBufferedAmountCallback(streamId, amount);
|
||||
} catch (const std::exception &e) {
|
||||
PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
|
||||
PLOG_DEBUG << "SCTP buffered amount callback: " << e.what();
|
||||
}
|
||||
mSendMutex.lock();
|
||||
}
|
||||
@ -502,9 +522,9 @@ int SctpTransport::handleSend(size_t free) {
|
||||
|
||||
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
|
||||
try {
|
||||
std::unique_lock lock(mWriteMutex);
|
||||
PLOG_VERBOSE << "Handle write, len=" << len;
|
||||
|
||||
std::unique_lock lock(mWriteMutex);
|
||||
if (!outgoing(make_message(data, data + len)))
|
||||
return -1;
|
||||
|
||||
@ -628,7 +648,7 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
|
||||
if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
|
||||
for (int i = 0; i < count; ++i) {
|
||||
uint16_t streamId = reset_event.strreset_stream_list[i];
|
||||
close(streamId);
|
||||
closeStream(streamId);
|
||||
}
|
||||
}
|
||||
if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
|
||||
@ -686,7 +706,15 @@ int SctpTransport::SendCallback(struct socket *sock, uint32_t sb_free) {
|
||||
|
||||
auto sconn = reinterpret_cast<struct sockaddr_conn *>(&paddrinfo.spinfo_address);
|
||||
void *ptr = sconn->sconn_addr;
|
||||
return static_cast<SctpTransport *>(ptr)->handleSend(size_t(sb_free));
|
||||
auto *transport = static_cast<SctpTransport *>(ptr);
|
||||
|
||||
// Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
|
||||
// https://github.com/sctplab/usrsctp/issues/405
|
||||
std::shared_lock lock(InstancesMutex);
|
||||
if (Instances.find(transport) == Instances.end())
|
||||
return -1;
|
||||
|
||||
return transport->handleSend(size_t(sb_free));
|
||||
}
|
||||
|
||||
int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
|
||||
|
@ -28,6 +28,8 @@
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <unordered_set>
|
||||
|
||||
#include "usrsctp.h"
|
||||
|
||||
@ -46,7 +48,7 @@ public:
|
||||
|
||||
bool stop() override;
|
||||
bool send(message_ptr message) override; // false if buffered
|
||||
void close(unsigned int stream);
|
||||
void closeStream(unsigned int stream);
|
||||
void flush();
|
||||
|
||||
// Stats
|
||||
@ -70,6 +72,7 @@ private:
|
||||
|
||||
void connect();
|
||||
void shutdown();
|
||||
void close();
|
||||
void incoming(message_ptr message) override;
|
||||
|
||||
bool trySendQueue();
|
||||
@ -109,6 +112,9 @@ private:
|
||||
struct sctp_rcvinfo recv_info, int flags, void *user_data);
|
||||
static int SendCallback(struct socket *sock, uint32_t sb_free);
|
||||
static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df);
|
||||
|
||||
static std::unordered_set<SctpTransport *> Instances;
|
||||
static std::shared_mutex InstancesMutex;
|
||||
};
|
||||
|
||||
} // namespace rtc
|
||||
|
@ -38,8 +38,8 @@ SelectInterrupter::SelectInterrupter() {
|
||||
throw std::runtime_error("Failed to create pipe");
|
||||
::fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
|
||||
::fcntl(pipefd[1], F_SETFL, O_NONBLOCK);
|
||||
mPipeOut = pipefd[0]; // read
|
||||
mPipeIn = pipefd[1]; // write
|
||||
mPipeOut = pipefd[1]; // read
|
||||
mPipeIn = pipefd[0]; // write
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -62,11 +62,8 @@ int SelectInterrupter::prepare(fd_set &readfds, fd_set &writefds) {
|
||||
FD_SET(mDummySock, &readfds);
|
||||
return SOCKET_TO_INT(mDummySock) + 1;
|
||||
#else
|
||||
int ret;
|
||||
do {
|
||||
char dummy;
|
||||
ret = ::read(mPipeIn, &dummy, 1);
|
||||
} while (ret > 0);
|
||||
::read(mPipeIn, &dummy, 1);
|
||||
FD_SET(mPipeIn, &readfds);
|
||||
return mPipeIn + 1;
|
||||
#endif
|
||||
@ -107,6 +104,7 @@ bool TcpTransport::stop() {
|
||||
}
|
||||
|
||||
bool TcpTransport::send(message_ptr message) {
|
||||
std::unique_lock lock(mSockMutex);
|
||||
if (state() != State::Connected)
|
||||
return false;
|
||||
|
||||
@ -126,6 +124,7 @@ void TcpTransport::incoming(message_ptr message) {
|
||||
}
|
||||
|
||||
bool TcpTransport::outgoing(message_ptr message) {
|
||||
// mSockMutex must be locked
|
||||
// If nothing is pending, try to send directly
|
||||
// It's safe because if the queue is empty, the thread is not sending
|
||||
if (mSendQueue.empty() && trySendMessage(message))
|
||||
@ -174,6 +173,7 @@ void TcpTransport::connect(const string &hostname, const string &service) {
|
||||
}
|
||||
|
||||
void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
||||
std::unique_lock lock(mSockMutex);
|
||||
try {
|
||||
char node[MAX_NUMERICNODE_LEN];
|
||||
char serv[MAX_NUMERICSERV_LEN];
|
||||
@ -248,15 +248,18 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
||||
}
|
||||
|
||||
void TcpTransport::close() {
|
||||
std::unique_lock lock(mSockMutex);
|
||||
if (mSock != INVALID_SOCKET) {
|
||||
PLOG_DEBUG << "Closing TCP socket";
|
||||
::closesocket(mSock);
|
||||
mSock = INVALID_SOCKET;
|
||||
}
|
||||
changeState(State::Disconnected);
|
||||
interruptSelect();
|
||||
}
|
||||
|
||||
bool TcpTransport::trySendQueue() {
|
||||
// mSockMutex must be locked
|
||||
while (auto next = mSendQueue.peek()) {
|
||||
auto message = *next;
|
||||
if (!trySendMessage(message)) {
|
||||
@ -269,6 +272,7 @@ bool TcpTransport::trySendQueue() {
|
||||
}
|
||||
|
||||
bool TcpTransport::trySendMessage(message_ptr &message) {
|
||||
// mSockMutex must be locked
|
||||
auto data = reinterpret_cast<const char *>(message->data());
|
||||
auto size = message->size();
|
||||
while (size) {
|
||||
@ -314,13 +318,22 @@ void TcpTransport::runLoop() {
|
||||
changeState(State::Connected);
|
||||
|
||||
while (true) {
|
||||
std::unique_lock lock(mSockMutex);
|
||||
if (mSock == INVALID_SOCKET)
|
||||
break;
|
||||
|
||||
fd_set readfds, writefds;
|
||||
int n = prepareSelect(readfds, writefds);
|
||||
|
||||
struct timeval tv;
|
||||
tv.tv_sec = 10;
|
||||
tv.tv_usec = 0;
|
||||
lock.unlock();
|
||||
int ret = ::select(n, &readfds, &writefds, NULL, &tv);
|
||||
lock.lock();
|
||||
if (mSock == INVALID_SOCKET)
|
||||
break;
|
||||
|
||||
if (ret < 0) {
|
||||
throw std::runtime_error("Failed to wait on socket");
|
||||
} else if (ret == 0) {
|
||||
|
@ -78,6 +78,7 @@ private:
|
||||
string mHostname, mService;
|
||||
|
||||
socket_t mSock = INVALID_SOCKET;
|
||||
std::mutex mSockMutex;
|
||||
std::thread mThread;
|
||||
SelectInterrupter mInterrupter;
|
||||
Queue<message_ptr> mSendQueue;
|
||||
|
81
src/threadpool.cpp
Normal file
81
src/threadpool.cpp
Normal file
@ -0,0 +1,81 @@
|
||||
/**
|
||||
* Copyright (c) 2020 Paul-Louis Ageneau
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2.1 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
#include "threadpool.hpp"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
ThreadPool &ThreadPool::Instance() {
|
||||
static ThreadPool instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
ThreadPool::~ThreadPool() { join(); }
|
||||
|
||||
int ThreadPool::count() const {
|
||||
std::unique_lock lock(mWorkersMutex);
|
||||
return mWorkers.size();
|
||||
}
|
||||
|
||||
void ThreadPool::spawn(int count) {
|
||||
std::unique_lock lock(mWorkersMutex);
|
||||
mJoining = false;
|
||||
while (count-- > 0)
|
||||
mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
|
||||
}
|
||||
|
||||
void ThreadPool::join() {
|
||||
std::unique_lock lock(mWorkersMutex);
|
||||
mJoining = true;
|
||||
mCondition.notify_all();
|
||||
|
||||
for (auto &w : mWorkers)
|
||||
w.join();
|
||||
|
||||
mWorkers.clear();
|
||||
}
|
||||
|
||||
void ThreadPool::run() {
|
||||
while (runOne()) {
|
||||
}
|
||||
}
|
||||
|
||||
bool ThreadPool::runOne() {
|
||||
if (auto task = dequeue()) {
|
||||
try {
|
||||
task();
|
||||
} catch (const std::exception &e) {
|
||||
PLOG_WARNING << "Unhandled exception in task: " << e.what();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
std::function<void()> ThreadPool::dequeue() {
|
||||
std::unique_lock lock(mMutex);
|
||||
mCondition.wait(lock, [this]() { return !mTasks.empty() || mJoining; });
|
||||
if (mTasks.empty())
|
||||
return nullptr;
|
||||
auto task = std::move(mTasks.front());
|
||||
mTasks.pop();
|
||||
return task;
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
87
src/threadpool.hpp
Normal file
87
src/threadpool.hpp
Normal file
@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Copyright (c) 2020 Paul-Louis Ageneau
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2.1 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
#ifndef RTC_THREADPOOL_H
|
||||
#define RTC_THREADPOOL_H
|
||||
|
||||
#include "include.hpp"
|
||||
#include "init.hpp"
|
||||
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <stdexcept>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
namespace rtc {
|
||||
|
||||
template <class F, class... Args>
|
||||
using invoke_future_t = std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>>;
|
||||
|
||||
class ThreadPool final {
|
||||
public:
|
||||
static ThreadPool &Instance();
|
||||
|
||||
ThreadPool(const ThreadPool &) = delete;
|
||||
ThreadPool &operator=(const ThreadPool &) = delete;
|
||||
ThreadPool(ThreadPool &&) = delete;
|
||||
ThreadPool &operator=(ThreadPool &&) = delete;
|
||||
|
||||
int count() const;
|
||||
void spawn(int count = 1);
|
||||
void join();
|
||||
void run();
|
||||
bool runOne();
|
||||
|
||||
template <class F, class... Args>
|
||||
auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
|
||||
|
||||
protected:
|
||||
ThreadPool() = default;
|
||||
~ThreadPool();
|
||||
|
||||
std::function<void()> dequeue(); // returns null function if joining
|
||||
|
||||
std::vector<std::thread> mWorkers;
|
||||
std::queue<std::function<void()>> mTasks;
|
||||
std::atomic<bool> mJoining = false;
|
||||
|
||||
mutable std::mutex mMutex, mWorkersMutex;
|
||||
std::condition_variable mCondition;
|
||||
};
|
||||
|
||||
template <class F, class... Args>
|
||||
auto ThreadPool::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
|
||||
std::unique_lock lock(mMutex);
|
||||
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
|
||||
auto task = std::make_shared<std::packaged_task<R()>>(
|
||||
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
|
||||
std::future<R> result = task->get_future();
|
||||
|
||||
mTasks.emplace([task = std::move(task), token = Init::Token()]() { return (*task)(); });
|
||||
mCondition.notify_one();
|
||||
return result;
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
||||
#endif
|
@ -86,9 +86,8 @@ void init() {
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
if (!done) {
|
||||
OPENSSL_init_ssl(0, NULL);
|
||||
SSL_load_error_strings();
|
||||
ERR_load_crypto_strings();
|
||||
OPENSSL_init_ssl(OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, nullptr);
|
||||
OPENSSL_init_crypto(OPENSSL_INIT_LOAD_CRYPTO_STRINGS, nullptr);
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
|
@ -60,6 +60,8 @@ gnutls_datum_t make_datum(char *data, size_t size);
|
||||
#include <openssl/err.h>
|
||||
#include <openssl/pem.h>
|
||||
#include <openssl/x509.h>
|
||||
#include <openssl/rsa.h>
|
||||
#include <openssl/bn.h>
|
||||
|
||||
#ifndef BIO_EOF
|
||||
#define BIO_EOF -1
|
||||
|
@ -41,12 +41,17 @@ public:
|
||||
|
||||
virtual ~Transport() {
|
||||
stop();
|
||||
if (mLower)
|
||||
mLower->onRecv(nullptr); // doing it on stop could cause a deadlock
|
||||
}
|
||||
|
||||
virtual bool stop() {
|
||||
return !mShutdown.exchange(true);
|
||||
if (mShutdown.exchange(true))
|
||||
return false;
|
||||
|
||||
// We don't want incoming() to be called by the lower layer anymore
|
||||
if (mLower)
|
||||
mLower->onRecv(nullptr);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void registerIncoming() {
|
||||
|
@ -18,8 +18,9 @@
|
||||
|
||||
#if RTC_ENABLE_WEBSOCKET
|
||||
|
||||
#include "include.hpp"
|
||||
#include "websocket.hpp"
|
||||
#include "include.hpp"
|
||||
#include "threadpool.hpp"
|
||||
|
||||
#include "tcptransport.hpp"
|
||||
#include "tlstransport.hpp"
|
||||
@ -301,8 +302,7 @@ void WebSocket::closeTransports() {
|
||||
auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr));
|
||||
auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr));
|
||||
auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr));
|
||||
if (ws || tls || tcp) {
|
||||
std::thread t([ws, tls, tcp, token = mInitToken]() mutable {
|
||||
ThreadPool::Instance().enqueue([ws, tls, tcp]() mutable {
|
||||
if (ws)
|
||||
ws->stop();
|
||||
if (tls)
|
||||
@ -314,8 +314,6 @@ void WebSocket::closeTransports() {
|
||||
tls.reset();
|
||||
tcp.reset();
|
||||
});
|
||||
t.detach();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
@ -200,7 +200,7 @@ int test_capi_main() {
|
||||
|
||||
// You may call rtcCleanup() when finished to free static resources
|
||||
rtcCleanup();
|
||||
sleep(1);
|
||||
sleep(2);
|
||||
|
||||
printf("Success\n");
|
||||
return 0;
|
||||
|
@ -150,7 +150,7 @@ void test_connectivity() {
|
||||
|
||||
// You may call rtc::Cleanup() when finished to free static resources
|
||||
rtc::Cleanup();
|
||||
this_thread::sleep_for(1s);
|
||||
this_thread::sleep_for(2s);
|
||||
|
||||
cout << "Success" << endl;
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
using namespace std;
|
||||
using namespace chrono_literals;
|
||||
@ -71,7 +72,10 @@ int main(int argc, char **argv) {
|
||||
cout << "*** Finished WebRTC benchmark" << endl;
|
||||
} catch (const exception &e) {
|
||||
cerr << "WebRTC benchmark failed: " << e.what() << endl;
|
||||
std::this_thread::sleep_for(2s);
|
||||
return -1;
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(2s);
|
||||
return 0;
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ void test_websocket() {
|
||||
throw runtime_error("Expected message not received");
|
||||
|
||||
ws->close();
|
||||
this_thread::sleep_for(1s);
|
||||
this_thread::sleep_for(2s);
|
||||
|
||||
// You may call rtc::Cleanup() when finished to free static resources
|
||||
rtc::Cleanup();
|
||||
|
Reference in New Issue
Block a user