Compare commits

...

63 Commits

Author SHA1 Message Date
c93f44f132 Bumped version to 0.6.5 2020-07-10 19:42:54 +02:00
517b69043f Merge pull request #120 from paullouisageneau/workaround-usrsctp-send-after-close
Better workaround for usrsctp send after closing/unregistering
2020-07-10 18:24:31 +02:00
b04db3a744 Better workaround for usrsctp send after closing/unregistering 2020-07-10 18:15:19 +02:00
36090a24e4 Updated usrsctp 2020-07-10 17:30:16 +02:00
6b75b3e227 Re-added usrsctp_deregister_address call mistakenly removed in #116 2020-07-10 17:05:41 +02:00
afed83f5f0 Merge pull request #116 from paullouisageneau/workaround-usrsctp-send-after-close
Workaround for usrsctp send after close
2020-07-10 14:33:16 +02:00
75c42592bf Unregister usrsctp send callback before closing 2020-07-10 14:13:39 +02:00
b35bfbeb0a Merge pull request #119 from paullouisageneau/fix-jamfile
Jamfile: Use different build directories for different builds
2020-07-10 14:10:03 +02:00
e481e896cb Added NO_EXAMPLES cmake flag 2020-07-08 18:38:25 +02:00
202467928a Changed build directory name to build-[crypto]-[variant] 2020-07-08 18:25:32 +02:00
71650ce163 Use different build directories for different builds 2020-07-08 17:58:26 +02:00
706a8b7160 Merge pull request #118 from paullouisageneau/threadpool
Enhance thread management
2020-07-08 17:57:50 +02:00
6f419a32ea Increased test timings for destruction 2020-07-08 17:42:23 +02:00
f5ff042d62 Check for re-init in destructor 2020-07-08 17:37:57 +02:00
822b2e6558 Simplified Init with a recursive mutex 2020-07-08 17:34:32 +02:00
5b251af1d7 Added initialized flag 2020-07-08 17:08:54 +02:00
5b56291b67 Fixed init lock on destruction 2020-07-08 15:58:20 +02:00
777f5a8dfe Revised deinitialization 2020-07-08 15:30:50 +02:00
971e6e8b91 Fixed init tokens handling 2020-07-08 11:39:53 +02:00
a3fb52c173 Fixed TCP transport thread interruption 2020-07-08 03:24:38 +02:00
db00253c18 Made Processor keep an init token to prevent early threadpool join 2020-07-08 02:46:19 +02:00
dd2967b0e1 Fixed Init critical section 2020-07-08 02:36:00 +02:00
cc4e215067 Install OpenSSL in Windows workflow 2020-07-08 01:52:34 +02:00
bbeed01eb0 Added comments 2020-07-08 01:52:26 +02:00
aecc2b8fda Added Processor and finished ThreadPool integration 2020-07-08 01:33:54 +02:00
d60e18d963 Made ThreadPool a singleton 2020-07-07 20:18:09 +02:00
e41019a1f0 Added ThreadPool 2020-07-07 19:59:18 +02:00
5825e44fc8 Fixed compilation warning 2020-07-06 12:25:03 +02:00
dc9a8114bc Merge pull request #114 from aldenml/fix-no-deprecated
fixed compilation when using openssl compiled with no-deprecated
2020-07-05 23:17:01 +02:00
3e827f9798 fixed compilation when using openssl compiled with no-deprecated 2020-07-04 18:57:50 -04:00
0a6b263bc3 Merge pull request #113 from paullouisageneau/fix-capi-userptr
Fix potential callback call with null user pointer in C API
2020-07-02 20:32:24 +02:00
e02c30027b Pass actual size of string in message callback 2020-07-02 20:09:03 +02:00
c4380ebcc4 Fixed potential null user pointer in callbacks 2020-07-02 20:08:54 +02:00
add0649335 Merge pull request #110 from paullouisageneau/keep-mline-order
Keep m-line order in description
2020-07-01 11:35:38 +02:00
cd28340de3 Keep m-line order in description 2020-07-01 11:24:03 +02:00
c675aedb83 Merge pull request #109 from paullouisageneau/fix-mid-handling
Fix mid handling in description
2020-07-01 00:17:41 +02:00
e32d139056 Moved data after non-data media in description 2020-07-01 00:13:12 +02:00
a790161168 Properly set remote data mid 2020-07-01 00:01:01 +02:00
2697ef0d76 Bumped version to 0.6.4 2020-06-29 21:09:02 +02:00
5044aedbec Updated libjuice to v0.4.4 2020-06-29 21:08:39 +02:00
44c90c1cb4 Set gnutls linking to shared in Jamfile 2020-06-29 21:05:34 +02:00
be79c68540 Merge pull request #108 from paullouisageneau/fix-srtp-transport
Fix optional SRTP transport
2020-06-29 21:03:44 +02:00
226a927df1 Implemented RTP and RTCP demultiplexing 2020-06-29 20:39:31 +02:00
4e1b9bb3c2 Fixed SRTP/DTLS demultiplexing 2020-06-29 09:48:42 +02:00
8bc016cc08 Added some logging 2020-06-29 09:48:42 +02:00
5afbe10d01 Changed to two SRTP sessions and introduced srtp_add_stream() 2020-06-29 09:48:42 +02:00
8df07ca68d Fixed serverSalt 2020-06-29 09:48:42 +02:00
884bd2316e Fixed error checking for OpenSSL 2020-06-29 09:48:42 +02:00
dadecce709 Fixed compilation for OpenSSL 2020-06-29 09:48:42 +02:00
103935bdd5 Introduced postCreation method to DTLS-SRTP 2020-06-29 09:48:42 +02:00
62e6954949 Use dedicated method to send media 2020-06-29 09:48:42 +02:00
3ac2d155cc Set DtlsSrtpTransport to bypass before handshake 2020-06-29 09:48:42 +02:00
6d8788c2a1 Updated libjuice 2020-06-29 09:47:53 +02:00
603dd01b87 Merge pull request #107 from Kyrio/fix-termux-include
Add the right header for IPPROTO_* on platforms such as Termux
2020-06-29 09:43:34 +02:00
8091508428 Add the right header for IPPROTO_* on some platforms 2020-06-29 01:08:34 +02:00
b38f63f077 Renamed PeerConnection::send() to sendMedia() 2020-06-26 18:02:07 +02:00
d87539937e Merge pull request #105 from paullouisageneau/fix-after-free-sctp-incoming
Fix use-after-free in SctpTransport::incoming()
2020-06-26 11:10:33 +02:00
eb09cadded Reordered SctpTransport::shutdown() and mLower->onRecv(nullptr) 2020-06-25 22:26:15 +02:00
79e0c62321 Merge pull request #103 from paullouisageneau/srtp-flag
Add USE_SRTP flag
2020-06-25 11:03:41 +02:00
ef38777129 Added USE_SRTP flag and renamed NO_WEBSOCKET flag 2020-06-25 10:59:12 +02:00
313f081061 Merge pull request #102 from paullouisageneau/fix-getaddrinfo-leak
Fix getaddrinfo() leak in Candidate::resolve()
2020-06-25 09:26:18 +02:00
6108b05e0d Fixed missing freeaddrinfo() on early exit 2020-06-25 09:20:05 +02:00
f68601b45f Updated libjuice 2020-06-24 15:30:15 +02:00
37 changed files with 843 additions and 279 deletions

View File

@ -43,6 +43,8 @@ jobs:
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: ilammy/msvc-dev-cmd@v1 - uses: ilammy/msvc-dev-cmd@v1
- name: install packages
run: choco install openssl
- name: submodules - name: submodules
run: git submodule update --init --recursive run: git submodule update --init --recursive
- name: cmake - name: cmake

View File

@ -1,12 +1,14 @@
cmake_minimum_required(VERSION 3.7) cmake_minimum_required(VERSION 3.7)
project(libdatachannel project(libdatachannel
DESCRIPTION "WebRTC DataChannels Library" DESCRIPTION "WebRTC Data Channels Library"
VERSION 0.6.3 VERSION 0.6.5
LANGUAGES CXX) LANGUAGES CXX)
option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF) option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
option(USE_JUICE "Use libjuice instead of libnice" OFF) option(USE_JUICE "Use libjuice instead of libnice" OFF)
option(RTC_ENABLE_WEBSOCKET "Build WebSocket support" ON) option(USE_SRTP "Enable SRTP for media support" OFF)
option(NO_WEBSOCKET "Disable WebSocket support" OFF)
option(NO_EXAMPLES "Disable examples" OFF)
if(USE_GNUTLS) if(USE_GNUTLS)
option(USE_NETTLE "Use Nettle instead of OpenSSL in libjuice" ON) option(USE_NETTLE "Use Nettle instead of OpenSSL in libjuice" ON)
@ -42,6 +44,8 @@ set(LIBDATACHANNEL_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
) )
set(LIBDATACHANNEL_WEBSOCKET_SOURCES set(LIBDATACHANNEL_WEBSOCKET_SOURCES
@ -82,7 +86,6 @@ set(TESTS_SOURCES
set(CMAKE_THREAD_PREFER_PTHREAD TRUE) set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
set(THREADS_PREFER_PTHREAD_FLAG TRUE) set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED) find_package(Threads REQUIRED)
find_package(SRTP)
set(CMAKE_POLICY_DEFAULT_CMP0048 NEW) set(CMAKE_POLICY_DEFAULT_CMP0048 NEW)
add_subdirectory(deps/plog) add_subdirectory(deps/plog)
@ -99,7 +102,14 @@ endif()
add_library(Usrsctp::Usrsctp ALIAS usrsctp) add_library(Usrsctp::Usrsctp ALIAS usrsctp)
add_library(Usrsctp::UsrsctpStatic ALIAS usrsctp-static) add_library(Usrsctp::UsrsctpStatic ALIAS usrsctp-static)
if (RTC_ENABLE_WEBSOCKET) if (NO_WEBSOCKET)
add_library(datachannel SHARED
${LIBDATACHANNEL_SOURCES})
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL
${LIBDATACHANNEL_SOURCES})
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=0)
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=0)
else()
add_library(datachannel SHARED add_library(datachannel SHARED
${LIBDATACHANNEL_SOURCES} ${LIBDATACHANNEL_SOURCES}
${LIBDATACHANNEL_WEBSOCKET_SOURCES}) ${LIBDATACHANNEL_WEBSOCKET_SOURCES})
@ -108,13 +118,6 @@ if (RTC_ENABLE_WEBSOCKET)
${LIBDATACHANNEL_WEBSOCKET_SOURCES}) ${LIBDATACHANNEL_WEBSOCKET_SOURCES})
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=1) target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=1)
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=1) target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=1)
else()
add_library(datachannel SHARED
${LIBDATACHANNEL_SOURCES})
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL
${LIBDATACHANNEL_SOURCES})
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=0)
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=0)
endif() endif()
set_target_properties(datachannel PROPERTIES set_target_properties(datachannel PROPERTIES
@ -141,7 +144,8 @@ if(WIN32)
target_link_libraries(datachannel-static PRIVATE wsock32 ws2_32) # winsock2 target_link_libraries(datachannel-static PRIVATE wsock32 ws2_32) # winsock2
endif() endif()
if(SRTP_FOUND) if(USE_SRTP)
find_package(SRTP REQUIRED)
if(NOT TARGET SRTP::SRTP) if(NOT TARGET SRTP::SRTP)
add_library(SRTP::SRTP UNKNOWN IMPORTED) add_library(SRTP::SRTP UNKNOWN IMPORTED)
set_target_properties(SRTP::SRTP PROPERTIES set_target_properties(SRTP::SRTP PROPERTIES
@ -228,9 +232,11 @@ else()
endif() endif()
# Examples # Examples
set(JSON_BuildTests OFF CACHE INTERNAL "") if(NOT NO_EXAMPLES)
add_subdirectory(deps/json) set(JSON_BuildTests OFF CACHE INTERNAL "")
add_subdirectory(examples/client) add_subdirectory(deps/json)
add_subdirectory(examples/copy-paste) add_subdirectory(examples/client)
add_subdirectory(examples/copy-paste-capi) add_subdirectory(examples/copy-paste)
add_subdirectory(examples/copy-paste-capi)
endif()

58
Jamfile
View File

@ -25,7 +25,7 @@ lib libdatachannel
<library>/libdatachannel//usrsctp <library>/libdatachannel//usrsctp
<library>/libdatachannel//juice <library>/libdatachannel//juice
<library>/libdatachannel//plog <library>/libdatachannel//plog
<gnutls>on:<library>gnutls <gnutls>on:<library>gnutls/<link>shared
<gnutls>off:<library>ssl <gnutls>off:<library>ssl
<gnutls>off:<library>crypto <gnutls>off:<library>crypto
: # default build : # default build
@ -85,20 +85,37 @@ alias juice
make libusrsctp.a : : @make_libusrsctp ; make libusrsctp.a : : @make_libusrsctp ;
make usrsctp.lib : : @make_libusrsctp_msvc ; make usrsctp.lib : : @make_libusrsctp_msvc ;
rule make_libusrsctp ( targets * : sources * : properties * )
{
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
VARIANT on $(targets) = $(VARIANT) ;
if <gnutls>on in $(properties)
{ BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ; }
else
{ BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ; }
}
actions make_libusrsctp actions make_libusrsctp
{ {
(cd $(CWD)/deps/usrsctp && mkdir build && cd build && cmake -DCMAKE_C_FLAGS="-fPIC" .. && make -j2 usrsctp-static) (cd $(CWD)/deps/usrsctp && mkdir $(BUILD_DIR) && cd $(BUILD_DIR) && cmake -DCMAKE_BUILD_TYPE=$(VARIANT) -DCMAKE_C_FLAGS="-fPIC" .. && make -j2 usrsctp-static)
cp $(CWD)/deps/usrsctp/build/usrsctplib/libusrsctp.a $(<) cp $(CWD)/deps/usrsctp/$(BUILD_DIR)/usrsctplib/libusrsctp.a $(<)
}
rule make_libusrsctp_msvc ( targets * : sources * : properties * )
{
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
VARIANT on $(targets) = $(VARIANT) ;
if <gnutls>on in $(properties)
{ BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ; }
else
{ BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ; }
} }
actions make_libusrsctp_msvc actions make_libusrsctp_msvc
{ {
SET OLDD=%CD% SET OLDD=%CD%
cd $(CWD)/deps/usrsctp cd $(CWD)/deps/usrsctp
mkdir build mkdir $(BUILD_SIR)
cd build cd $(BUILD_DIR)
cmake -G "Visual Studio 16 2019" .. cmake -G "Visual Studio 16 2019" ..
cd build msbuild usrsctplib.sln /property:Configuration=$(VARIANT)
msbuild usrsctplib.sln /property:Configuration=Release
cd %OLDD% cd %OLDD%
cp $(CWD)/deps/usrsctp/build/usrsctplib/Release/usrsctp.lib $(<) cp $(CWD)/deps/usrsctp/build/usrsctplib/Release/usrsctp.lib $(<)
} }
@ -108,11 +125,15 @@ make juice-static.lib : : @make_libjuice_msvc ;
rule make_libjuice ( targets * : sources * : properties * ) rule make_libjuice ( targets * : sources * : properties * )
{ {
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
VARIANT on $(targets) = $(VARIANT) ;
if <gnutls>on in $(properties) if <gnutls>on in $(properties)
{ {
BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ;
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ; CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ;
} }
else { else
{
local OPENSSL_INCLUDE = [ feature.get-values <openssl-include> : $(properties) ] ; local OPENSSL_INCLUDE = [ feature.get-values <openssl-include> : $(properties) ] ;
if <target-os>darwin in $(properties) && $(OPENSSL_INCLUDE) = "" if <target-os>darwin in $(properties) && $(OPENSSL_INCLUDE) = ""
@ -122,26 +143,29 @@ rule make_libjuice ( targets * : sources * : properties * )
OPENSSL_INCLUDE = /usr/local/opt/openssl/include ; OPENSSL_INCLUDE = /usr/local/opt/openssl/include ;
} }
BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ;
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ; CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ;
if $(OPENSSL_INCLUDE) != "" if $(OPENSSL_INCLUDE) != ""
{ { CMAKEOPTS on $(targets) += " -DOPENSSL_ROOT_DIR=$(OPENSSL_INCLUDE)/.." ; }
CMAKEOPTS on $(targets) += " -DOPENSSL_ROOT_DIR=$(OPENSSL_INCLUDE)/.." ;
}
} }
} }
actions make_libjuice actions make_libjuice
{ {
(cd $(CWD)/deps/libjuice && mkdir build && cd build && cmake -DCMAKE_C_FLAGS="-fPIC" $(CMAKEOPTS) .. && make -j2 juice-static) (cd $(CWD)/deps/libjuice && mkdir $(BUILD_DIR) && cd $(BUILD_DIR) && cmake -DCMAKE_C_FLAGS="-fPIC" -DCMAKE_BUILD_TYPE=$(VARIANT) $(CMAKEOPTS) .. && make -j2 juice-static)
cp $(CWD)/deps/libjuice/build/libjuice-static.a $(<) cp $(CWD)/deps/libjuice/$(BUILD_DIR)/libjuice-static.a $(<)
} }
rule make_libjuice_msvc ( targets * : sources * : properties * ) rule make_libjuice_msvc ( targets * : sources * : properties * )
{ {
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
VARIANT on $(targets) = $(VARIANT) ;
if <gnutls>on in $(properties) if <gnutls>on in $(properties)
{ {
BUILD_DIR on $(targets) += "build-gnutls-$(VARIANT)" ;
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ; CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ;
} }
else else
{ {
BUILD_DIR on $(targets) += "build-openssl-$(VARIANT)" ;
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ; CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ;
} }
} }
@ -149,12 +173,12 @@ actions make_libjuice_msvc
{ {
SET OLDD=%CD% SET OLDD=%CD%
cd $(CWD)/deps/libjuice cd $(CWD)/deps/libjuice
mkdir build mkdir $(BUILD_DIR)
cd build cd $(BUILD_DIR)
cmake -G "Visual Studio 16 2019" $(CMAKEOPTS) .. cmake -G "Visual Studio 16 2019" $(CMAKEOPTS) ..
msbuild libjuice.sln /property:Configuration=Release msbuild libjuice.sln /property:Configuration=$(VARIANT)
cd %OLDD% cd %OLDD%
cp $(CWD)/deps/libjuice/build/Release/juice-static.lib $(<) cp $(CWD)/deps/libjuice/$(BUILD_DIR)/Release/juice-static.lib $(<)
} }
# the search path to pick up the openssl libraries from. This is the <search> # the search path to pick up the openssl libraries from. This is the <search>

View File

@ -38,22 +38,22 @@ else
LIBS+=glib-2.0 gobject-2.0 nice LIBS+=glib-2.0 gobject-2.0 nice
endif endif
RTC_ENABLE_MEDIA ?= 0 USE_SRTP ?= 0
ifneq ($(RTC_ENABLE_MEDIA), 0) ifneq ($(USE_SRTP), 0)
CPPFLAGS+=-DRTC_ENABLE_MEDIA=1 CPPFLAGS+=-DRTC_ENABLE_MEDIA=1
LIBS+=srtp LIBS+=srtp
else else
CPPFLAGS+=-DRTC_ENABLE_MEDIA=0 CPPFLAGS+=-DRTC_ENABLE_MEDIA=0
endif endif
RTC_ENABLE_WEBSOCKET ?= 1
ifneq ($(RTC_ENABLE_WEBSOCKET), 0) NO_WEBSOCKET ?= 0
ifeq ($(NO_WEBSOCKET), 0)
CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=1 CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=1
else else
CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=0 CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=0
endif endif
INCLUDES+=$(shell pkg-config --cflags $(LIBS)) INCLUDES+=$(shell pkg-config --cflags $(LIBS))
LDLIBS+=$(LOCALLIBS) $(shell pkg-config --libs $(LIBS)) LDLIBS+=$(LOCALLIBS) $(shell pkg-config --libs $(LIBS))

2
deps/libjuice vendored

2
deps/usrsctp vendored

View File

@ -49,6 +49,7 @@ public:
bool ended() const; bool ended() const;
void hintType(Type type); void hintType(Type type);
void setDataMid(string mid);
void setFingerprint(string fingerprint); void setFingerprint(string fingerprint);
void setSctpPort(uint16_t port); void setSctpPort(uint16_t port);
void setMaxMessageSize(size_t size); void setMaxMessageSize(size_t size);
@ -86,7 +87,7 @@ private:
string mid; string mid;
std::vector<string> attributes; std::vector<string> attributes;
}; };
std::map<string, Media> mMedia; // by mid std::map<int, Media> mMedia; // by m-line index
// Candidates // Candidates
std::vector<Candidate> mCandidates; std::vector<Candidate> mCandidates;

View File

@ -64,6 +64,8 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536; // Remote max message size if not specified in SDP const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536; // Remote max message size if not specified in SDP
const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
// overloaded helper // overloaded helper
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; }; template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
template <class... Ts> overloaded(Ts...)->overloaded<Ts...>; template <class... Ts> overloaded(Ts...)->overloaded<Ts...>;

View File

@ -25,8 +25,7 @@
namespace rtc { namespace rtc {
class Init; using init_token = std::shared_ptr<void>;
using init_token = std::shared_ptr<Init>;
class Init { class Init {
public: public:
@ -39,9 +38,10 @@ public:
private: private:
Init(); Init();
static std::weak_ptr<Init> Weak; static std::weak_ptr<void> Weak;
static init_token Global; static std::shared_ptr<void> *Global;
static std::mutex Mutex; static bool Initialized;
static std::recursive_mutex Mutex;
}; };
inline void Preload() { Init::Preload(); } inline void Preload() { Init::Preload(); }

View File

@ -41,6 +41,7 @@
namespace rtc { namespace rtc {
class Certificate; class Certificate;
class Processor;
class IceTransport; class IceTransport;
class DtlsTransport; class DtlsTransport;
class SctpTransport; class SctpTransport;
@ -101,7 +102,7 @@ public:
// Media // Media
bool hasMedia() const; bool hasMedia() const;
void sendMedia(const binary &packet); void sendMedia(const binary &packet);
void send(const byte *packet, size_t size); void sendMedia(const byte *packet, size_t size);
void onMedia(std::function<void(const binary &packet)> callback); void onMedia(std::function<void(const binary &packet)> callback);
@ -139,10 +140,10 @@ private:
void outgoingMedia(message_ptr message); void outgoingMedia(message_ptr message);
const init_token mInitToken = Init::Token();
const Configuration mConfig; const Configuration mConfig;
const future_certificate_ptr mCertificate; const future_certificate_ptr mCertificate;
const std::unique_ptr<Processor> mProcessor;
init_token mInitToken = Init::Token();
std::optional<Description> mLocalDescription, mRemoteDescription; std::optional<Description> mLocalDescription, mRemoteDescription;
mutable std::mutex mLocalDescriptionMutex, mRemoteDescriptionMutex; mutable std::mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;

View File

@ -28,6 +28,7 @@
#else #else
#include <netdb.h> #include <netdb.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h>
#endif #endif
#include <sys/types.h> #include <sys/types.h>
@ -107,8 +108,9 @@ bool Candidate::resolve(ResolveMode mode) {
oss << sp << nodebuffer << sp << servbuffer << sp << "typ" << sp << type; oss << sp << nodebuffer << sp << servbuffer << sp << "typ" << sp << type;
oss << left; oss << left;
mCandidate = oss.str(); mCandidate = oss.str();
mIsResolved = true;
PLOG_VERBOSE << "Resolved candidate: " << mCandidate; PLOG_VERBOSE << "Resolved candidate: " << mCandidate;
return mIsResolved = true; break;
} }
} }
} }
@ -116,7 +118,7 @@ bool Candidate::resolve(ResolveMode mode) {
freeaddrinfo(result); freeaddrinfo(result);
} }
return false; return mIsResolved;
} }
bool Candidate::isResolved() const { return mIsResolved; } bool Candidate::isResolved() const { return mIsResolved; }

View File

@ -17,6 +17,7 @@
*/ */
#include "certificate.hpp" #include "certificate.hpp"
#include "threadpool.hpp"
#include <cassert> #include <cassert>
#include <chrono> #include <chrono>
@ -201,8 +202,8 @@ certificate_ptr make_certificate_impl(string commonName) {
auto *commonNameBytes = auto *commonNameBytes =
reinterpret_cast<unsigned char *>(const_cast<char *>(commonName.c_str())); reinterpret_cast<unsigned char *>(const_cast<char *>(commonName.c_str()));
if (!X509_gmtime_adj(X509_get_notBefore(x509.get()), 3600 * -1) || if (!X509_gmtime_adj(X509_getm_notBefore(x509.get()), 3600 * -1) ||
!X509_gmtime_adj(X509_get_notAfter(x509.get()), 3600 * 24 * 365) || !X509_gmtime_adj(X509_getm_notAfter(x509.get()), 3600 * 24 * 365) ||
!X509_set_version(x509.get(), 1) || !X509_set_pubkey(x509.get(), pkey.get()) || !X509_set_version(x509.get(), 1) || !X509_set_pubkey(x509.get(), pkey.get()) ||
!BN_pseudo_rand(serial_number.get(), serialSize, 0, 0) || !BN_pseudo_rand(serial_number.get(), serialSize, 0, 0) ||
!BN_to_ASN1_INTEGER(serial_number.get(), X509_get_serialNumber(x509.get())) || !BN_to_ASN1_INTEGER(serial_number.get(), X509_get_serialNumber(x509.get())) ||
@ -230,19 +231,6 @@ namespace rtc {
namespace { namespace {
// Helper function roughly equivalent to std::async with policy std::launch::async
// since std::async might be unreliable on some platforms (e.g. Mingw32 on Windows)
template <class F, class... Args>
std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>>
thread_call(F &&f, Args &&... args) {
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
std::packaged_task<R()> task(std::bind(f, std::forward<Args>(args)...));
std::future<R> future = task.get_future();
std::thread t(std::move(task));
t.detach();
return future;
}
static std::unordered_map<string, future_certificate_ptr> CertificateCache; static std::unordered_map<string, future_certificate_ptr> CertificateCache;
static std::mutex CertificateCacheMutex; static std::mutex CertificateCacheMutex;
@ -254,7 +242,7 @@ future_certificate_ptr make_certificate(string commonName) {
if (auto it = CertificateCache.find(commonName); it != CertificateCache.end()) if (auto it = CertificateCache.find(commonName); it != CertificateCache.end())
return it->second; return it->second;
auto future = thread_call(make_certificate_impl, commonName); auto future = ThreadPool::Instance().enqueue(make_certificate_impl, commonName);
auto shared = future.share(); auto shared = future.share();
CertificateCache.emplace(std::move(commonName), shared); CertificateCache.emplace(std::move(commonName), shared);
return shared; return shared;

View File

@ -96,7 +96,7 @@ void DataChannel::close() {
mIsClosed = true; mIsClosed = true;
if (mIsOpen.exchange(false)) if (mIsOpen.exchange(false))
if (auto transport = mSctpTransport.lock()) if (auto transport = mSctpTransport.lock())
transport->close(mStream); transport->closeStream(mStream);
mSctpTransport.reset(); mSctpTransport.reset();
resetCallbacks(); resetCallbacks();

View File

@ -63,6 +63,7 @@ Description::Description(const string &sdp, Type type, Role role)
std::istringstream ss(sdp); std::istringstream ss(sdp);
std::optional<Media> currentMedia; std::optional<Media> currentMedia;
int mlineIndex = 0;
bool finished; bool finished;
do { do {
string line; string line;
@ -76,7 +77,9 @@ Description::Description(const string &sdp, Type type, Role role)
if (currentMedia->type == "application") if (currentMedia->type == "application")
mData.mid = currentMedia->mid; mData.mid = currentMedia->mid;
else else
mMedia.emplace(currentMedia->mid, std::move(*currentMedia)); mMedia.emplace(mlineIndex, std::move(*currentMedia));
++mlineIndex;
} else if (line.find(" ICE/SDP") != string::npos) { } else if (line.find(" ICE/SDP") != string::npos) {
PLOG_WARNING << "SDP \"m=\" line has no corresponding mid, ignoring"; PLOG_WARNING << "SDP \"m=\" line has no corresponding mid, ignoring";
@ -163,6 +166,8 @@ void Description::hintType(Type type) {
} }
} }
void Description::setDataMid(string mid) { mData.mid = mid; }
void Description::setFingerprint(string fingerprint) { void Description::setFingerprint(string fingerprint) {
mFingerprint.emplace(std::move(fingerprint)); mFingerprint.emplace(std::move(fingerprint));
} }
@ -188,10 +193,7 @@ bool Description::hasMedia() const { return !mMedia.empty(); }
void Description::addMedia(const Description &source) { void Description::addMedia(const Description &source) {
for (auto p : source.mMedia) for (auto p : source.mMedia)
if (p.first != mData.mid) mMedia.emplace(p);
mMedia.emplace(std::move(p));
else
PLOG_WARNING << "Media mid \"" << p.first << "\" is the same as data mid, ignoring";
} }
Description::operator string() const { return generateSdp("\r\n"); } Description::operator string() const { return generateSdp("\r\n"); }
@ -212,48 +214,60 @@ string Description::generateSdp(const string &eol) const {
// see Negotiating Media Multiplexing Using the Session Description Protocol // see Negotiating Media Multiplexing Using the Session Description Protocol
// https://tools.ietf.org/html/draft-ietf-mmusic-sdp-bundle-negotiation-54 // https://tools.ietf.org/html/draft-ietf-mmusic-sdp-bundle-negotiation-54
sdp << "a=group:BUNDLE"; sdp << "a=group:BUNDLE";
for (const auto &m : mMedia) for (int i = 0; i < int(mMedia.size() + 1); ++i)
sdp << " " << m.first; // mid if (auto it = mMedia.find(i); it != mMedia.end())
sdp << " " << mData.mid << eol; sdp << ' ' << it->second.mid;
else
sdp << ' ' << mData.mid;
sdp << eol;
// Data sdp << "a=msid-semantic: WMS" << eol;
const string dataDescription = "UDP/DTLS/SCTP webrtc-datachannel";
sdp << "m=application" << ' ' << (!mMedia.empty() ? 0 : 9) << ' ' << dataDescription << eol;
sdp << "c=IN IP4 0.0.0.0" << eol;
if (!mMedia.empty())
sdp << "a=bundle-only" << eol;
sdp << "a=mid:" << mData.mid << eol;
if (mData.sctpPort)
sdp << "a=sctp-port:" << *mData.sctpPort << eol;
if (mData.maxMessageSize)
sdp << "a=max-message-size:" << *mData.maxMessageSize << eol;
// Non-data media // Non-data media
if (!mMedia.empty()) { if (!mMedia.empty()) {
// Lip-sync // Lip-sync
sdp << "a=group:LS"; sdp << "a=group:LS";
for (const auto &m : mMedia) for (const auto &p : mMedia)
sdp << " " << m.first; // mid sdp << " " << p.second.mid;
sdp << eol; sdp << eol;
}
// Descriptions and attributes // Descriptions and attributes
for (const auto &m : mMedia) { for (int i = 0; i < int(mMedia.size() + 1); ++i) {
const auto &media = m.second; if (auto it = mMedia.find(i); it != mMedia.end()) {
// Non-data media
const auto &media = it->second;
sdp << "m=" << media.type << ' ' << 0 << ' ' << media.description << eol; sdp << "m=" << media.type << ' ' << 0 << ' ' << media.description << eol;
sdp << "c=IN IP4 0.0.0.0" << eol; sdp << "c=IN IP4 0.0.0.0" << eol;
sdp << "a=bundle-only" << eol; sdp << "a=bundle-only" << eol;
sdp << "a=mid:" << media.mid << eol; sdp << "a=mid:" << media.mid << eol;
for (const auto &attr : media.attributes) for (const auto &attr : media.attributes)
sdp << "a=" << attr << eol; sdp << "a=" << attr << eol;
} else {
// Data
const string description = "UDP/DTLS/SCTP webrtc-datachannel";
sdp << "m=application" << ' ' << (!mMedia.empty() ? 0 : 9) << ' ' << description << eol;
sdp << "c=IN IP4 0.0.0.0" << eol;
if (!mMedia.empty())
sdp << "a=bundle-only" << eol;
sdp << "a=mid:" << mData.mid << eol;
if (mData.sctpPort)
sdp << "a=sctp-port:" << *mData.sctpPort << eol;
if (mData.maxMessageSize)
sdp << "a=max-message-size:" << *mData.maxMessageSize << eol;
} }
} }
// Common // Common
sdp << "a=ice-options:trickle" << eol; if (!mEnded)
sdp << "a=ice-options:trickle" << eol;
sdp << "a=ice-ufrag:" << mIceUfrag << eol; sdp << "a=ice-ufrag:" << mIceUfrag << eol;
sdp << "a=ice-pwd:" << mIcePwd << eol; sdp << "a=ice-pwd:" << mIcePwd << eol;
sdp << "a=setup:" << roleToString(mRole) << eol; sdp << "a=setup:" << roleToString(mRole) << eol;
sdp << "a=tls-id:1" << eol; sdp << "a=tls-id:1" << eol;
if (mFingerprint) if (mFingerprint)
sdp << "a=fingerprint:sha-256 " << *mFingerprint << eol; sdp << "a=fingerprint:sha-256 " << *mFingerprint << eol;

View File

@ -43,50 +43,86 @@ DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower,
std::move(stateChangeCallback)), std::move(stateChangeCallback)),
mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback
PLOG_DEBUG << "Initializing SRTP transport"; PLOG_DEBUG << "Initializing DTLS-SRTP transport";
#if USE_GNUTLS if (srtp_err_status_t err = srtp_create(&mSrtpIn, nullptr)) {
PLOG_DEBUG << "Initializing DTLS-SRTP transport (GnuTLS)"; throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err)));
gnutls::check(gnutls_srtp_set_profile(mSession, GNUTLS_SRTP_AES128_CM_HMAC_SHA1_80), }
"Failed to set SRTP profile"); if (srtp_err_status_t err = srtp_create(&mSrtpOut, nullptr)) {
#else srtp_dealloc(mSrtpIn);
PLOG_DEBUG << "Initializing DTLS-SRTP transport (OpenSSL)"; throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err)));
openssl::check(SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"), }
"Failed to set SRTP profile");
#endif
} }
DtlsSrtpTransport::~DtlsSrtpTransport() { DtlsSrtpTransport::~DtlsSrtpTransport() {
stop(); stop();
if (mCreated) srtp_dealloc(mSrtpIn);
srtp_dealloc(mSrtp); srtp_dealloc(mSrtpOut);
} }
bool DtlsSrtpTransport::send(message_ptr message) { bool DtlsSrtpTransport::sendMedia(message_ptr message) {
if (!message) if (!message)
return false; return false;
if (!mInitDone) {
PLOG_WARNING << "SRTP media sent before keys are derived";
return false;
}
int size = message->size(); int size = message->size();
PLOG_VERBOSE << "Send size=" << size; PLOG_VERBOSE << "Send size=" << size;
// srtp_protect() assumes that it can write SRTP_MAX_TRAILER_LEN (for the authentication tag) // The RTP header has a minimum size of 12 bytes
// into the location in memory immediately following the RTP packet. if (size < 12)
throw std::runtime_error("RTP/RTCP packet too short");
// srtp_protect() and srtp_protect_rtcp() assume that they can write SRTP_MAX_TRAILER_LEN (for
// the authentication tag) into the location in memory immediately following the RTP packet.
message->resize(size + SRTP_MAX_TRAILER_LEN); message->resize(size + SRTP_MAX_TRAILER_LEN);
if (srtp_err_status_t err = srtp_protect(mSrtp, message->data(), &size)) {
if (err == srtp_err_status_replay_fail) uint8_t value2 = to_integer<uint8_t>(*(message->begin() + 1)) & 0x7F;
throw std::runtime_error("SRTP packet is a replay"); PLOG_VERBOSE << "Demultiplexing SRTCP and SRTP with RTP payload type, value="
else << unsigned(value2);
throw std::runtime_error("SRTP protect error, status=" +
to_string(static_cast<int>(err))); // RFC 5761 Multiplexing RTP and RTCP 4. Distinguishable RTP and RTCP Packets
// It is RECOMMENDED to follow the guidelines in the RTP/AVP profile for the choice of RTP
// payload type values, with the additional restriction that payload type values in the
// range 64-95 MUST NOT be used. Specifically, dynamic RTP payload types SHOULD be chosen in
// the range 96-127 where possible. Values below 64 MAY be used if that is insufficient
// [...]
if (value2 >= 64 && value2 <= 95) { // Range 64-95 (inclusive) MUST be RTCP
if (srtp_err_status_t err = srtp_protect_rtcp(mSrtpOut, message->data(), &size)) {
if (err == srtp_err_status_replay_fail)
throw std::runtime_error("SRTCP packet is a replay");
else
throw std::runtime_error("SRTCP protect error, status=" +
to_string(static_cast<int>(err)));
}
PLOG_VERBOSE << "Protected SRTCP packet, size=" << size;
} else {
if (srtp_err_status_t err = srtp_protect(mSrtpOut, message->data(), &size)) {
if (err == srtp_err_status_replay_fail)
throw std::runtime_error("SRTP packet is a replay");
else
throw std::runtime_error("SRTP protect error, status=" +
to_string(static_cast<int>(err)));
}
PLOG_VERBOSE << "Protected SRTP packet, size=" << size;
} }
PLOG_VERBOSE << "Protected SRTP packet, size=" << size;
message->resize(size); message->resize(size);
outgoing(message); outgoing(message);
return true; return true;
} }
void DtlsSrtpTransport::incoming(message_ptr message) { void DtlsSrtpTransport::incoming(message_ptr message) {
if (!mInitDone) {
// Bypas
DtlsTransport::incoming(message);
return;
}
int size = message->size(); int size = message->size();
if (size == 0) if (size == 0)
return; return;
@ -95,49 +131,80 @@ void DtlsSrtpTransport::incoming(message_ptr message) {
// The process for demultiplexing a packet is as follows. The receiver looks at the first byte // The process for demultiplexing a packet is as follows. The receiver looks at the first byte
// of the packet. [...] If the value is in between 128 and 191 (inclusive), then the packet is // of the packet. [...] If the value is in between 128 and 191 (inclusive), then the packet is
// RTP (or RTCP [...]). If the value is between 20 and 63 (inclusive), the packet is DTLS. // RTP (or RTCP [...]). If the value is between 20 and 63 (inclusive), the packet is DTLS.
uint8_t value = to_integer<uint8_t>(*message->begin()); uint8_t value1 = to_integer<uint8_t>(*message->begin());
PLOG_VERBOSE << "Demultiplexing DTLS and SRTP/SRTCP with first byte, value="
<< unsigned(value1);
if (value >= 128 && value <= 192) { if (value1 >= 20 && value1 <= 63) {
PLOG_VERBOSE << "Incoming DTLS packet, size=" << size; PLOG_VERBOSE << "Incoming DTLS packet, size=" << size;
DtlsTransport::incoming(message); DtlsTransport::incoming(message);
} else if (value >= 20 && value <= 64) {
PLOG_VERBOSE << "Incoming SRTP packet, size=" << size;
if (srtp_err_status_t err = srtp_unprotect(mSrtp, message->data(), &size)) { } else if (value1 >= 128 && value1 <= 191) {
if (err == srtp_err_status_replay_fail) // The RTP header has a minimum size of 12 bytes
PLOG_WARNING << "Incoming SRTP packet is a replay"; if (size < 12) {
else PLOG_WARNING << "Incoming SRTP/SRTCP packet too short, size=" << size;
PLOG_WARNING << "SRTP unprotect error, status=" << err;
return; return;
} }
PLOG_VERBOSE << "Unprotected SRTP packet, size=" << size;
uint8_t value2 = to_integer<uint8_t>(*(message->begin() + 1)) & 0x7F;
PLOG_VERBOSE << "Demultiplexing SRTCP and SRTP with RTP payload type, value="
<< unsigned(value2);
// See RFC 5761 reference above
if (value2 >= 64 && value2 <= 95) { // Range 64-95 (inclusive) MUST be RTCP
PLOG_VERBOSE << "Incoming SRTCP packet, size=" << size;
if (srtp_err_status_t err = srtp_unprotect_rtcp(mSrtpIn, message->data(), &size)) {
if (err == srtp_err_status_replay_fail)
PLOG_WARNING << "Incoming SRTCP packet is a replay";
else
PLOG_WARNING << "SRTCP unprotect error, status=" << err;
return;
}
PLOG_VERBOSE << "Unprotected SRTCP packet, size=" << size;
} else {
PLOG_VERBOSE << "Incoming SRTP packet, size=" << size;
if (srtp_err_status_t err = srtp_unprotect(mSrtpIn, message->data(), &size)) {
if (err == srtp_err_status_replay_fail)
PLOG_WARNING << "Incoming SRTP packet is a replay";
else
PLOG_WARNING << "SRTP unprotect error, status=" << err;
return;
}
PLOG_VERBOSE << "Unprotected SRTP packet, size=" << size;
}
message->resize(size); message->resize(size);
mSrtpRecvCallback(message); mSrtpRecvCallback(message);
} else { } else {
PLOG_WARNING << "Unknown packet type, value=" << value << ", size=" << size; PLOG_WARNING << "Unknown packet type, value=" << unsigned(value1) << ", size=" << size;
} }
} }
void DtlsSrtpTransport::postCreation() {
#if USE_GNUTLS
PLOG_DEBUG << "Setting SRTP profile (GnuTLS)";
gnutls::check(gnutls_srtp_set_profile(mSession, GNUTLS_SRTP_AES128_CM_HMAC_SHA1_80),
"Failed to set SRTP profile");
#else
PLOG_DEBUG << "Setting SRTP profile (OpenSSL)";
// returns 0 on success, 1 on error
if (SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"), "Failed to set SRTP profile")
throw std::runtime_error("Failed to set SRTP profile: " + openssl::error_string(ERR_get_error()));
#endif
}
void DtlsSrtpTransport::postHandshake() { void DtlsSrtpTransport::postHandshake() {
if (mCreated) if (mInitDone)
return; return;
srtp_policy_t inbound = {};
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtp);
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtcp);
inbound.ssrc.type = ssrc_any_inbound;
srtp_policy_t outbound = {};
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtp);
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtcp);
outbound.ssrc.type = ssrc_any_outbound;
const size_t materialLen = SRTP_AES_ICM_128_KEY_LEN_WSALT * 2; const size_t materialLen = SRTP_AES_ICM_128_KEY_LEN_WSALT * 2;
unsigned char material[materialLen]; unsigned char material[materialLen];
const unsigned char *clientKey, *clientSalt, *serverKey, *serverSalt; const unsigned char *clientKey, *clientSalt, *serverKey, *serverSalt;
#if USE_GNUTLS #if USE_GNUTLS
PLOG_INFO << "Deriving SRTP keying material (GnuTLS)";
gnutls_datum_t clientKeyDatum, clientSaltDatum, serverKeyDatum, serverSaltDatum; gnutls_datum_t clientKeyDatum, clientSaltDatum, serverKeyDatum, serverSaltDatum;
gnutls::check(gnutls_srtp_get_keys(mSession, material, materialLen, &clientKeyDatum, gnutls::check(gnutls_srtp_get_keys(mSession, material, materialLen, &clientKeyDatum,
&clientSaltDatum, &serverKeyDatum, &serverSaltDatum), &clientSaltDatum, &serverKeyDatum, &serverSaltDatum),
@ -160,18 +227,23 @@ void DtlsSrtpTransport::postHandshake() {
serverKey = reinterpret_cast<const unsigned char *>(serverKeyDatum.data); serverKey = reinterpret_cast<const unsigned char *>(serverKeyDatum.data);
serverSalt = reinterpret_cast<const unsigned char *>(serverSaltDatum.data); serverSalt = reinterpret_cast<const unsigned char *>(serverSaltDatum.data);
#else #else
// This provides the client write master key, the server write master key, the client write PLOG_INFO << "Deriving SRTP keying material (OpenSSL)";
// master salt and the server write master salt in that order.
// The extractor provides the client write master key, the server write master key, the client
// write master salt and the server write master salt in that order.
const string label = "EXTRACTOR-dtls_srtp"; const string label = "EXTRACTOR-dtls_srtp";
openssl::check(SSL_export_keying_material(mSsl, material, materialLen, label.c_str(),
label.size(), nullptr, 0, 0), // returns 1 on success, 0 or -1 on failure (OpenSSL API is a complete mess...)
"Failed to derive SRTP keys"); if (SSL_export_keying_material(mSsl, material, materialLen, label.c_str(), label.size(),
nullptr, 0, 0) <= 0)
throw std::runtime_error("Failed to derive SRTP keys: " +
openssl::error_string(ERR_get_error()));
clientKey = material; clientKey = material;
clientSalt = clientKey + SRTP_AES_128_KEY_LEN; clientSalt = clientKey + SRTP_AES_128_KEY_LEN;
serverKey = material + SRTP_AES_ICM_128_KEY_LEN_WSALT; serverKey = material + SRTP_AES_ICM_128_KEY_LEN_WSALT;
serverSalt = serverSalt + SRTP_AES_128_KEY_LEN; serverSalt = serverKey + SRTP_AES_128_KEY_LEN;
#endif #endif
unsigned char clientSessionKey[SRTP_AES_ICM_128_KEY_LEN_WSALT]; unsigned char clientSessionKey[SRTP_AES_ICM_128_KEY_LEN_WSALT];
@ -182,22 +254,31 @@ void DtlsSrtpTransport::postHandshake() {
std::memcpy(serverSessionKey, serverKey, SRTP_AES_128_KEY_LEN); std::memcpy(serverSessionKey, serverKey, SRTP_AES_128_KEY_LEN);
std::memcpy(serverSessionKey + SRTP_AES_128_KEY_LEN, serverSalt, SRTP_SALT_LEN); std::memcpy(serverSessionKey + SRTP_AES_128_KEY_LEN, serverSalt, SRTP_SALT_LEN);
if (mIsClient) { srtp_policy_t inbound = {};
inbound.key = serverSessionKey; srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtp);
outbound.key = clientSessionKey; srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtcp);
} else { inbound.ssrc.type = ssrc_any_inbound;
inbound.key = clientSessionKey; inbound.ssrc.value = 0;
outbound.key = serverSessionKey; inbound.key = mIsClient ? serverSessionKey : clientSessionKey;
} inbound.next = nullptr;
srtp_policy_t *policies = &inbound; if (srtp_err_status_t err = srtp_add_stream(mSrtpIn, &inbound))
inbound.next = &outbound; throw std::runtime_error("SRTP add inbound stream failed, status=" +
to_string(static_cast<int>(err)));
srtp_policy_t outbound = {};
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtp);
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtcp);
outbound.ssrc.type = ssrc_any_outbound;
outbound.ssrc.value = 0;
outbound.key = mIsClient ? clientSessionKey : serverSessionKey;
outbound.next = nullptr; outbound.next = nullptr;
if (srtp_err_status_t err = srtp_create(&mSrtp, policies)) if (srtp_err_status_t err = srtp_add_stream(mSrtpOut, &outbound))
throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err))); throw std::runtime_error("SRTP add outbound stream failed, status=" +
to_string(static_cast<int>(err)));
mCreated = true; mInitDone = true;
} }
} // namespace rtc } // namespace rtc

View File

@ -38,16 +38,17 @@ public:
state_callback stateChangeCallback); state_callback stateChangeCallback);
~DtlsSrtpTransport(); ~DtlsSrtpTransport();
bool send(message_ptr message) override; bool sendMedia(message_ptr message);
private: private:
void incoming(message_ptr message) override; void incoming(message_ptr message) override;
void postCreation() override;
void postHandshake() override; void postHandshake() override;
message_callback mSrtpRecvCallback; message_callback mSrtpRecvCallback;
srtp_t mSrtp; srtp_t mSrtpIn, mSrtpOut;
bool mCreated = false; bool mInitDone = false;
}; };
} // namespace rtc } // namespace rtc

View File

@ -85,6 +85,8 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr cer
gnutls_transport_set_pull_function(mSession, ReadCallback); gnutls_transport_set_pull_function(mSession, ReadCallback);
gnutls_transport_set_pull_timeout_function(mSession, TimeoutCallback); gnutls_transport_set_pull_timeout_function(mSession, TimeoutCallback);
postCreation();
mRecvThread = std::thread(&DtlsTransport::runRecvLoop, this); mRecvThread = std::thread(&DtlsTransport::runRecvLoop, this);
registerIncoming(); registerIncoming();
@ -137,6 +139,10 @@ void DtlsTransport::incoming(message_ptr message) {
mIncomingQueue.push(message); mIncomingQueue.push(message);
} }
void DtlsTransport::postCreation() {
// Dummy
}
void DtlsTransport::postHandshake() { void DtlsTransport::postHandshake() {
// Dummy // Dummy
} }
@ -408,6 +414,10 @@ void DtlsTransport::incoming(message_ptr message) {
mIncomingQueue.push(message); mIncomingQueue.push(message);
} }
void DtlsTransport::postCreation() {
// Dummy
}
void DtlsTransport::postHandshake() { void DtlsTransport::postHandshake() {
// Dummy // Dummy
} }

View File

@ -52,6 +52,7 @@ public:
protected: protected:
virtual void incoming(message_ptr message) override; virtual void incoming(message_ptr message) override;
virtual void postCreation();
virtual void postHandshake(); virtual void postHandshake();
void runRecvLoop(); void runRecvLoop();

View File

@ -21,6 +21,7 @@
#include "certificate.hpp" #include "certificate.hpp"
#include "dtlstransport.hpp" #include "dtlstransport.hpp"
#include "sctptransport.hpp" #include "sctptransport.hpp"
#include "threadpool.hpp"
#include "tls.hpp" #include "tls.hpp"
#if RTC_ENABLE_WEBSOCKET #if RTC_ENABLE_WEBSOCKET
@ -39,41 +40,21 @@ using std::shared_ptr;
namespace rtc { namespace rtc {
std::weak_ptr<Init> Init::Weak; namespace {
init_token Init::Global;
std::mutex Init::Mutex;
init_token Init::Token() { void doInit() {
std::lock_guard lock(Mutex); PLOG_DEBUG << "Global initialization";
if (!Global) {
if (auto token = Weak.lock())
Global = token;
else
Global = shared_ptr<Init>(new Init());
}
return Global;
}
void Init::Preload() {
Token(); // pre-init
make_certificate().wait(); // preload certificate
}
void Init::Cleanup() {
Global.reset();
CleanupCertificateCache();
}
Init::Init() {
#ifdef _WIN32 #ifdef _WIN32
WSADATA wsaData; WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData)) if (WSAStartup(MAKEWORD(2, 2), &wsaData))
throw std::runtime_error("WSAStartup failed, error=" + std::to_string(WSAGetLastError())); throw std::runtime_error("WSAStartup failed, error=" + std::to_string(WSAGetLastError()));
#endif #endif
ThreadPool::Instance().spawn(THREADPOOL_SIZE);
#if USE_GNUTLS #if USE_GNUTLS
// Nothing to do // Nothing to do
#else #else
openssl::init(); openssl::init();
#endif #endif
@ -88,7 +69,12 @@ Init::Init() {
#endif #endif
} }
Init::~Init() { void doCleanup() {
PLOG_DEBUG << "Global cleanup";
ThreadPool::Instance().join();
CleanupCertificateCache();
SctpTransport::Cleanup(); SctpTransport::Cleanup();
DtlsTransport::Cleanup(); DtlsTransport::Cleanup();
#if RTC_ENABLE_WEBSOCKET #if RTC_ENABLE_WEBSOCKET
@ -103,5 +89,57 @@ Init::~Init() {
#endif #endif
} }
} // namespace
std::weak_ptr<void> Init::Weak;
std::shared_ptr<void> *Init::Global = nullptr;
bool Init::Initialized = false;
std::recursive_mutex Init::Mutex;
init_token Init::Token() {
std::unique_lock lock(Mutex);
if (auto token = Weak.lock())
return token;
delete Global;
Global = new shared_ptr<void>(new Init());
Weak = *Global;
return *Global;
}
void Init::Preload() {
std::unique_lock lock(Mutex);
auto token = Token();
if (!Global)
Global = new shared_ptr<void>(token);
PLOG_DEBUG << "Preloading certificate";
make_certificate().wait();
}
void Init::Cleanup() {
std::unique_lock lock(Mutex);
delete Global;
Global = nullptr;
}
Init::Init() {
// Mutex is locked by Token() here
if (!std::exchange(Initialized, true))
doInit();
}
Init::~Init() {
std::thread t([]() {
// We need to lock Mutex ourselves
std::unique_lock lock(Mutex);
if (Global)
return;
if (std::exchange(Initialized, false))
doCleanup();
});
t.detach();
}
} // namespace rtc } // namespace rtc

View File

@ -18,9 +18,12 @@
#include "peerconnection.hpp" #include "peerconnection.hpp"
#include "certificate.hpp" #include "certificate.hpp"
#include "include.hpp"
#include "processor.hpp"
#include "threadpool.hpp"
#include "dtlstransport.hpp" #include "dtlstransport.hpp"
#include "icetransport.hpp" #include "icetransport.hpp"
#include "include.hpp"
#include "sctptransport.hpp" #include "sctptransport.hpp"
#if RTC_ENABLE_MEDIA #if RTC_ENABLE_MEDIA
@ -39,8 +42,8 @@ using std::weak_ptr;
PeerConnection::PeerConnection() : PeerConnection(Configuration()) {} PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
PeerConnection::PeerConnection(const Configuration &config) PeerConnection::PeerConnection(const Configuration &config)
: mConfig(config), mCertificate(make_certificate()), mState(State::New), : mConfig(config), mCertificate(make_certificate()), mProcessor(std::make_unique<Processor>()),
mGatheringState(GatheringState::New) { mState(State::New), mGatheringState(GatheringState::New) {
PLOG_VERBOSE << "Creating PeerConnection"; PLOG_VERBOSE << "Creating PeerConnection";
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd) if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
@ -145,6 +148,7 @@ void PeerConnection::addRemoteCandidate(Candidate candidate) {
iceTransport->addRemoteCandidate(candidate); iceTransport->addRemoteCandidate(candidate);
} else { } else {
// OK, we might need a lookup, do it asynchronously // OK, we might need a lookup, do it asynchronously
// We don't use the thread pool because we have no control on the timeout
weak_ptr<IceTransport> weakIceTransport{iceTransport}; weak_ptr<IceTransport> weakIceTransport{iceTransport};
std::thread t([weakIceTransport, candidate]() mutable { std::thread t([weakIceTransport, candidate]() mutable {
if (candidate.resolve(Candidate::ResolveMode::Lookup)) if (candidate.resolve(Candidate::ResolveMode::Lookup))
@ -227,7 +231,7 @@ void PeerConnection::sendMedia(const binary &packet) {
outgoingMedia(make_message(packet.begin(), packet.end(), Message::Binary)); outgoingMedia(make_message(packet.begin(), packet.end(), Message::Binary));
} }
void PeerConnection::send(const byte *packet, size_t size) { void PeerConnection::sendMedia(const byte *packet, size_t size) {
outgoingMedia(make_message(packet, packet + size, Message::Binary)); outgoingMedia(make_message(packet, packet + size, Message::Binary));
} }
@ -244,7 +248,7 @@ void PeerConnection::outgoingMedia(message_ptr message) {
if (!transport) if (!transport)
throw std::runtime_error("PeerConnection is not open"); throw std::runtime_error("PeerConnection is not open");
std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)->send(message); std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)->sendMedia(message);
#else #else
PLOG_WARNING << "Ignoring sent media (not compiled with SRTP support)"; PLOG_WARNING << "Ignoring sent media (not compiled with SRTP support)";
#endif #endif
@ -445,21 +449,18 @@ void PeerConnection::closeTransports() {
auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr)); auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr)); auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr)); auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
if (sctp || dtls || ice) { ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
std::thread t([sctp, dtls, ice, token = mInitToken]() mutable { if (sctp)
if (sctp) sctp->stop();
sctp->stop(); if (dtls)
if (dtls) dtls->stop();
dtls->stop(); if (ice)
if (ice) ice->stop();
ice->stop();
sctp.reset(); sctp.reset();
dtls.reset(); dtls.reset();
ice.reset(); ice.reset();
}); });
t.detach();
}
} }
void PeerConnection::endLocalCandidates() { void PeerConnection::endLocalCandidates() {
@ -502,7 +503,7 @@ void PeerConnection::forwardMessage(message_ptr message) {
mDataChannels.insert(std::make_pair(message->stream, channel)); mDataChannels.insert(std::make_pair(message->stream, channel));
} else { } else {
// Invalid, close the DataChannel // Invalid, close the DataChannel
sctpTransport->close(message->stream); sctpTransport->closeStream(message->stream);
return; return;
} }
} }
@ -594,20 +595,26 @@ void PeerConnection::remoteCloseDataChannels() {
void PeerConnection::processLocalDescription(Description description) { void PeerConnection::processLocalDescription(Description description) {
std::optional<uint16_t> remoteSctpPort; std::optional<uint16_t> remoteSctpPort;
if (auto remote = remoteDescription()) std::optional<string> remoteDataMid;
remoteSctpPort = remote->sctpPort(); if (auto remote = remoteDescription()) {
remoteDataMid = remote->dataMid();
remoteSctpPort = remote->sctpPort();
}
auto certificate = mCertificate.get(); // wait for certificate if not ready auto certificate = mCertificate.get(); // wait for certificate if not ready
{ {
std::lock_guard lock(mLocalDescriptionMutex); std::lock_guard lock(mLocalDescriptionMutex);
mLocalDescription.emplace(std::move(description)); mLocalDescription.emplace(std::move(description));
if (remoteDataMid)
mLocalDescription->setDataMid(*remoteDataMid);
mLocalDescription->setFingerprint(certificate->fingerprint()); mLocalDescription->setFingerprint(certificate->fingerprint());
mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT)); mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE); mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
} }
mLocalDescriptionCallback(*mLocalDescription); mProcessor->enqueue([this]() { mLocalDescriptionCallback(*mLocalDescription); });
} }
void PeerConnection::processLocalCandidate(Candidate candidate) { void PeerConnection::processLocalCandidate(Candidate candidate) {
@ -617,7 +624,8 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
mLocalDescription->addCandidate(candidate); mLocalDescription->addCandidate(candidate);
mLocalCandidateCallback(candidate); mProcessor->enqueue(
[this, candidate = std::move(candidate)]() { mLocalCandidateCallback(candidate); });
} }
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) { void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
@ -625,7 +633,8 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
if (!dataChannel) if (!dataChannel)
return; return;
mDataChannelCallback(dataChannel); mProcessor->enqueue(
[this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
} }
bool PeerConnection::changeState(State state) { bool PeerConnection::changeState(State state) {
@ -639,13 +648,13 @@ bool PeerConnection::changeState(State state) {
} while (!mState.compare_exchange_weak(current, state)); } while (!mState.compare_exchange_weak(current, state));
mStateChangeCallback(state); mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
return true; return true;
} }
bool PeerConnection::changeGatheringState(GatheringState state) { bool PeerConnection::changeGatheringState(GatheringState state) {
if (mGatheringState.exchange(state) != state) if (mGatheringState.exchange(state) != state)
mGatheringStateChangeCallback(state); mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
return true; return true;
} }

44
src/processor.cpp Normal file
View File

@ -0,0 +1,44 @@
/**
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "processor.hpp"
namespace rtc {
Processor::~Processor() { join(); }
void Processor::join() {
std::unique_lock lock(mMutex);
mCondition.wait(lock, [this]() { return !mPending && mTasks.empty(); });
}
void Processor::schedule() {
std::unique_lock lock(mMutex);
if (mTasks.empty()) {
// No more tasks
mPending = false;
mCondition.notify_all();
return;
}
ThreadPool::Instance().enqueue(std::move(mTasks.front()));
mTasks.pop();
}
} // namespace rtc

92
src/processor.hpp Normal file
View File

@ -0,0 +1,92 @@
/**
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef RTC_PROCESSOR_H
#define RTC_PROCESSOR_H
#include "include.hpp"
#include "init.hpp"
#include "threadpool.hpp"
#include <condition_variable>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
namespace rtc {
// Processed tasks in order by delegating them to the thread pool
class Processor final {
public:
Processor() = default;
~Processor();
Processor(const Processor &) = delete;
Processor &operator=(const Processor &) = delete;
Processor(Processor &&) = delete;
Processor &operator=(Processor &&) = delete;
void join();
template <class F, class... Args>
auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
protected:
void schedule();
// Keep an init token
const init_token mInitToken = Init::Token();
std::queue<std::function<void()>> mTasks;
bool mPending = false; // true iff a task is pending in the thread pool
mutable std::mutex mMutex;
std::condition_variable mCondition;
};
template <class F, class... Args>
auto Processor::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
std::unique_lock lock(mMutex);
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
auto task = std::make_shared<std::packaged_task<R()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<R> result = task->get_future();
auto bundle = [this, task = std::move(task)]() {
try {
(*task)();
} catch (const std::exception &e) {
PLOG_WARNING << "Unhandled exception in task: " << e.what();
}
schedule(); // chain the next task
};
if (!mPending) {
ThreadPool::Instance().enqueue(std::move(bundle));
mPending = true;
} else {
mTasks.emplace(std::move(bundle));
}
return result;
}
} // namespace rtc
#endif

View File

@ -55,18 +55,15 @@ std::unordered_map<int, void *> userPointerMap;
std::mutex mutex; std::mutex mutex;
int lastId = 0; int lastId = 0;
void *getUserPointer(int id) { std::optional<void *> getUserPointer(int id) {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = userPointerMap.find(id); auto it = userPointerMap.find(id);
return it != userPointerMap.end() ? it->second : nullptr; return it != userPointerMap.end() ? std::make_optional(it->second) : nullopt;
} }
void setUserPointer(int i, void *ptr) { void setUserPointer(int i, void *ptr) {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
if (ptr) userPointerMap[i] = ptr;
userPointerMap[i] = ptr;
else
userPointerMap.erase(i);
} }
shared_ptr<PeerConnection> getPeerConnection(int id) { shared_ptr<PeerConnection> getPeerConnection(int id) {
@ -89,6 +86,7 @@ int emplacePeerConnection(shared_ptr<PeerConnection> ptr) {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
int pc = ++lastId; int pc = ++lastId;
peerConnectionMap.emplace(std::make_pair(pc, ptr)); peerConnectionMap.emplace(std::make_pair(pc, ptr));
userPointerMap.emplace(std::make_pair(pc, nullptr));
return pc; return pc;
} }
@ -96,6 +94,7 @@ int emplaceDataChannel(shared_ptr<DataChannel> ptr) {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
int dc = ++lastId; int dc = ++lastId;
dataChannelMap.emplace(std::make_pair(dc, ptr)); dataChannelMap.emplace(std::make_pair(dc, ptr));
userPointerMap.emplace(std::make_pair(dc, nullptr));
return dc; return dc;
} }
@ -126,6 +125,7 @@ int emplaceWebSocket(shared_ptr<WebSocket> ptr) {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
int ws = ++lastId; int ws = ++lastId;
webSocketMap.emplace(std::make_pair(ws, ptr)); webSocketMap.emplace(std::make_pair(ws, ptr));
userPointerMap.emplace(std::make_pair(ws, nullptr));
return ws; return ws;
} }
@ -244,7 +244,8 @@ int rtcCreateDataChannel(int pc, const char *label) {
return WRAP({ return WRAP({
auto peerConnection = getPeerConnection(pc); auto peerConnection = getPeerConnection(pc);
int dc = emplaceDataChannel(peerConnection->createDataChannel(string(label))); int dc = emplaceDataChannel(peerConnection->createDataChannel(string(label)));
rtcSetUserPointer(dc, getUserPointer(pc)); if (auto ptr = getUserPointer(pc))
rtcSetUserPointer(dc, *ptr);
return dc; return dc;
}); });
} }
@ -293,9 +294,10 @@ int rtcSetDataChannelCallback(int pc, rtcDataChannelCallbackFunc cb) {
if (cb) if (cb)
peerConnection->onDataChannel([pc, cb](std::shared_ptr<DataChannel> dataChannel) { peerConnection->onDataChannel([pc, cb](std::shared_ptr<DataChannel> dataChannel) {
int dc = emplaceDataChannel(dataChannel); int dc = emplaceDataChannel(dataChannel);
void *ptr = getUserPointer(pc); if (auto ptr = getUserPointer(pc)) {
rtcSetUserPointer(dc, ptr); rtcSetUserPointer(dc, *ptr);
cb(dc, ptr); cb(dc, *ptr);
}
}); });
else else
peerConnection->onDataChannel(nullptr); peerConnection->onDataChannel(nullptr);
@ -307,7 +309,8 @@ int rtcSetLocalDescriptionCallback(int pc, rtcDescriptionCallbackFunc cb) {
auto peerConnection = getPeerConnection(pc); auto peerConnection = getPeerConnection(pc);
if (cb) if (cb)
peerConnection->onLocalDescription([pc, cb](const Description &desc) { peerConnection->onLocalDescription([pc, cb](const Description &desc) {
cb(string(desc).c_str(), desc.typeString().c_str(), getUserPointer(pc)); if (auto ptr = getUserPointer(pc))
cb(string(desc).c_str(), desc.typeString().c_str(), *ptr);
}); });
else else
peerConnection->onLocalDescription(nullptr); peerConnection->onLocalDescription(nullptr);
@ -319,7 +322,8 @@ int rtcSetLocalCandidateCallback(int pc, rtcCandidateCallbackFunc cb) {
auto peerConnection = getPeerConnection(pc); auto peerConnection = getPeerConnection(pc);
if (cb) if (cb)
peerConnection->onLocalCandidate([pc, cb](const Candidate &cand) { peerConnection->onLocalCandidate([pc, cb](const Candidate &cand) {
cb(cand.candidate().c_str(), cand.mid().c_str(), getUserPointer(pc)); if (auto ptr = getUserPointer(pc))
cb(cand.candidate().c_str(), cand.mid().c_str(), *ptr);
}); });
else else
peerConnection->onLocalCandidate(nullptr); peerConnection->onLocalCandidate(nullptr);
@ -331,7 +335,8 @@ int rtcSetStateChangeCallback(int pc, rtcStateChangeCallbackFunc cb) {
auto peerConnection = getPeerConnection(pc); auto peerConnection = getPeerConnection(pc);
if (cb) if (cb)
peerConnection->onStateChange([pc, cb](PeerConnection::State state) { peerConnection->onStateChange([pc, cb](PeerConnection::State state) {
cb(static_cast<rtcState>(state), getUserPointer(pc)); if (auto ptr = getUserPointer(pc))
cb(static_cast<rtcState>(state), *ptr);
}); });
else else
peerConnection->onStateChange(nullptr); peerConnection->onStateChange(nullptr);
@ -343,7 +348,8 @@ int rtcSetGatheringStateChangeCallback(int pc, rtcGatheringStateCallbackFunc cb)
auto peerConnection = getPeerConnection(pc); auto peerConnection = getPeerConnection(pc);
if (cb) if (cb)
peerConnection->onGatheringStateChange([pc, cb](PeerConnection::GatheringState state) { peerConnection->onGatheringStateChange([pc, cb](PeerConnection::GatheringState state) {
cb(static_cast<rtcGatheringState>(state), getUserPointer(pc)); if (auto ptr = getUserPointer(pc))
cb(static_cast<rtcGatheringState>(state), *ptr);
}); });
else else
peerConnection->onGatheringStateChange(nullptr); peerConnection->onGatheringStateChange(nullptr);
@ -435,7 +441,10 @@ int rtcSetOpenCallback(int id, rtcOpenCallbackFunc cb) {
return WRAP({ return WRAP({
auto channel = getChannel(id); auto channel = getChannel(id);
if (cb) if (cb)
channel->onOpen([id, cb]() { cb(getUserPointer(id)); }); channel->onOpen([id, cb]() {
if (auto ptr = getUserPointer(id))
cb(*ptr);
});
else else
channel->onOpen(nullptr); channel->onOpen(nullptr);
}); });
@ -445,7 +454,10 @@ int rtcSetClosedCallback(int id, rtcClosedCallbackFunc cb) {
return WRAP({ return WRAP({
auto channel = getChannel(id); auto channel = getChannel(id);
if (cb) if (cb)
channel->onClosed([id, cb]() { cb(getUserPointer(id)); }); channel->onClosed([id, cb]() {
if (auto ptr = getUserPointer(id))
cb(*ptr);
});
else else
channel->onClosed(nullptr); channel->onClosed(nullptr);
}); });
@ -455,8 +467,10 @@ int rtcSetErrorCallback(int id, rtcErrorCallbackFunc cb) {
return WRAP({ return WRAP({
auto channel = getChannel(id); auto channel = getChannel(id);
if (cb) if (cb)
channel->onError( channel->onError([id, cb](const string &error) {
[id, cb](const string &error) { cb(error.c_str(), getUserPointer(id)); }); if (auto ptr = getUserPointer(id))
cb(error.c_str(), *ptr);
});
else else
channel->onError(nullptr); channel->onError(nullptr);
}); });
@ -468,9 +482,13 @@ int rtcSetMessageCallback(int id, rtcMessageCallbackFunc cb) {
if (cb) if (cb)
channel->onMessage( channel->onMessage(
[id, cb](const binary &b) { [id, cb](const binary &b) {
cb(reinterpret_cast<const char *>(b.data()), int(b.size()), getUserPointer(id)); if (auto ptr = getUserPointer(id))
cb(reinterpret_cast<const char *>(b.data()), int(b.size()), *ptr);
}, },
[id, cb](const string &s) { cb(s.c_str(), -1, getUserPointer(id)); }); [id, cb](const string &s) {
if (auto ptr = getUserPointer(id))
cb(s.c_str(), -int(s.size() + 1), *ptr);
});
else else
channel->onMessage(nullptr); channel->onMessage(nullptr);
}); });
@ -514,7 +532,10 @@ int rtcSetBufferedAmountLowCallback(int id, rtcBufferedAmountLowCallbackFunc cb)
return WRAP({ return WRAP({
auto channel = getChannel(id); auto channel = getChannel(id);
if (cb) if (cb)
channel->onBufferedAmountLow([id, cb]() { cb(getUserPointer(id)); }); channel->onBufferedAmountLow([id, cb]() {
if (auto ptr = getUserPointer(id))
cb(*ptr);
});
else else
channel->onBufferedAmountLow(nullptr); channel->onBufferedAmountLow(nullptr);
}); });
@ -528,7 +549,10 @@ int rtcSetAvailableCallback(int id, rtcAvailableCallbackFunc cb) {
return WRAP({ return WRAP({
auto channel = getChannel(id); auto channel = getChannel(id);
if (cb) if (cb)
channel->onOpen([id, cb]() { cb(getUserPointer(id)); }); channel->onOpen([id, cb]() {
if (auto ptr = getUserPointer(id))
cb(*ptr);
});
else else
channel->onOpen(nullptr); channel->onOpen(nullptr);
}); });

View File

@ -50,6 +50,9 @@ using std::shared_ptr;
namespace rtc { namespace rtc {
std::unordered_set<SctpTransport *> SctpTransport::Instances;
std::shared_mutex SctpTransport::InstancesMutex;
void SctpTransport::Init() { void SctpTransport::Init() {
usrsctp_init(0, &SctpTransport::WriteCallback, nullptr); usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
usrsctp_sysctl_set_sctp_ecn_enable(0); usrsctp_sysctl_set_sctp_ecn_enable(0);
@ -92,6 +95,11 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
PLOG_DEBUG << "Initializing SCTP transport"; PLOG_DEBUG << "Initializing SCTP transport";
usrsctp_register_address(this); usrsctp_register_address(this);
{
std::unique_lock lock(InstancesMutex);
Instances.insert(this);
}
mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, &SctpTransport::RecvCallback, mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, &SctpTransport::RecvCallback,
&SctpTransport::SendCallback, 0, this); &SctpTransport::SendCallback, 0, this);
if (!mSock) if (!mSock)
@ -186,14 +194,21 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
SctpTransport::~SctpTransport() { SctpTransport::~SctpTransport() {
stop(); stop();
close();
if (mSock)
usrsctp_close(mSock);
usrsctp_deregister_address(this); usrsctp_deregister_address(this);
{
std::unique_lock lock(InstancesMutex);
Instances.erase(this);
}
} }
bool SctpTransport::stop() { bool SctpTransport::stop() {
// Transport::stop() will unregister incoming() from the lower layer, therefore we need to make
// sure the thread from lower layers is not blocked in incoming() by the WrittenOnce condition.
mWrittenOnce = true;
mWrittenCondition.notify_all();
if (!Transport::stop()) if (!Transport::stop())
return false; return false;
@ -204,6 +219,13 @@ bool SctpTransport::stop() {
return true; return true;
} }
void SctpTransport::close() {
if (mSock) {
usrsctp_close(mSock);
mSock = nullptr;
}
}
void SctpTransport::connect() { void SctpTransport::connect() {
if (!mSock) if (!mSock)
return; return;
@ -240,9 +262,7 @@ void SctpTransport::shutdown() {
PLOG_WARNING << "SCTP shutdown failed, errno=" << errno; PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
} }
// close() abort the connection when linger is disabled, call it now close();
usrsctp_close(mSock);
mSock = nullptr;
PLOG_INFO << "SCTP disconnected"; PLOG_INFO << "SCTP disconnected";
changeState(State::Disconnected); changeState(State::Disconnected);
@ -266,7 +286,7 @@ bool SctpTransport::send(message_ptr message) {
return false; return false;
} }
void SctpTransport::close(unsigned int stream) { void SctpTransport::closeStream(unsigned int stream) {
send(make_message(0, Message::Reset, uint16_t(stream))); send(make_message(0, Message::Reset, uint16_t(stream)));
} }
@ -405,7 +425,7 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
try { try {
mBufferedAmountCallback(streamId, amount); mBufferedAmountCallback(streamId, amount);
} catch (const std::exception &e) { } catch (const std::exception &e) {
PLOG_WARNING << "SCTP buffered amount callback: " << e.what(); PLOG_DEBUG << "SCTP buffered amount callback: " << e.what();
} }
mSendMutex.lock(); mSendMutex.lock();
} }
@ -502,9 +522,9 @@ int SctpTransport::handleSend(size_t free) {
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) { int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
try { try {
std::unique_lock lock(mWriteMutex);
PLOG_VERBOSE << "Handle write, len=" << len; PLOG_VERBOSE << "Handle write, len=" << len;
std::unique_lock lock(mWriteMutex);
if (!outgoing(make_message(data, data + len))) if (!outgoing(make_message(data, data + len)))
return -1; return -1;
@ -628,7 +648,7 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) { if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
for (int i = 0; i < count; ++i) { for (int i = 0; i < count; ++i) {
uint16_t streamId = reset_event.strreset_stream_list[i]; uint16_t streamId = reset_event.strreset_stream_list[i];
close(streamId); closeStream(streamId);
} }
} }
if (flags & SCTP_STREAM_RESET_INCOMING_SSN) { if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
@ -686,7 +706,15 @@ int SctpTransport::SendCallback(struct socket *sock, uint32_t sb_free) {
auto sconn = reinterpret_cast<struct sockaddr_conn *>(&paddrinfo.spinfo_address); auto sconn = reinterpret_cast<struct sockaddr_conn *>(&paddrinfo.spinfo_address);
void *ptr = sconn->sconn_addr; void *ptr = sconn->sconn_addr;
return static_cast<SctpTransport *>(ptr)->handleSend(size_t(sb_free)); auto *transport = static_cast<SctpTransport *>(ptr);
// Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
// https://github.com/sctplab/usrsctp/issues/405
std::shared_lock lock(InstancesMutex);
if (Instances.find(transport) == Instances.end())
return -1;
return transport->handleSend(size_t(sb_free));
} }
int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) { int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {

View File

@ -28,6 +28,8 @@
#include <functional> #include <functional>
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <shared_mutex>
#include <unordered_set>
#include "usrsctp.h" #include "usrsctp.h"
@ -46,7 +48,7 @@ public:
bool stop() override; bool stop() override;
bool send(message_ptr message) override; // false if buffered bool send(message_ptr message) override; // false if buffered
void close(unsigned int stream); void closeStream(unsigned int stream);
void flush(); void flush();
// Stats // Stats
@ -70,6 +72,7 @@ private:
void connect(); void connect();
void shutdown(); void shutdown();
void close();
void incoming(message_ptr message) override; void incoming(message_ptr message) override;
bool trySendQueue(); bool trySendQueue();
@ -109,6 +112,9 @@ private:
struct sctp_rcvinfo recv_info, int flags, void *user_data); struct sctp_rcvinfo recv_info, int flags, void *user_data);
static int SendCallback(struct socket *sock, uint32_t sb_free); static int SendCallback(struct socket *sock, uint32_t sb_free);
static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df); static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df);
static std::unordered_set<SctpTransport *> Instances;
static std::shared_mutex InstancesMutex;
}; };
} // namespace rtc } // namespace rtc

View File

@ -38,8 +38,8 @@ SelectInterrupter::SelectInterrupter() {
throw std::runtime_error("Failed to create pipe"); throw std::runtime_error("Failed to create pipe");
::fcntl(pipefd[0], F_SETFL, O_NONBLOCK); ::fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
::fcntl(pipefd[1], F_SETFL, O_NONBLOCK); ::fcntl(pipefd[1], F_SETFL, O_NONBLOCK);
mPipeOut = pipefd[0]; // read mPipeOut = pipefd[1]; // read
mPipeIn = pipefd[1]; // write mPipeIn = pipefd[0]; // write
#endif #endif
} }
@ -62,11 +62,8 @@ int SelectInterrupter::prepare(fd_set &readfds, fd_set &writefds) {
FD_SET(mDummySock, &readfds); FD_SET(mDummySock, &readfds);
return SOCKET_TO_INT(mDummySock) + 1; return SOCKET_TO_INT(mDummySock) + 1;
#else #else
int ret; char dummy;
do { ::read(mPipeIn, &dummy, 1);
char dummy;
ret = ::read(mPipeIn, &dummy, 1);
} while (ret > 0);
FD_SET(mPipeIn, &readfds); FD_SET(mPipeIn, &readfds);
return mPipeIn + 1; return mPipeIn + 1;
#endif #endif
@ -107,6 +104,7 @@ bool TcpTransport::stop() {
} }
bool TcpTransport::send(message_ptr message) { bool TcpTransport::send(message_ptr message) {
std::unique_lock lock(mSockMutex);
if (state() != State::Connected) if (state() != State::Connected)
return false; return false;
@ -126,6 +124,7 @@ void TcpTransport::incoming(message_ptr message) {
} }
bool TcpTransport::outgoing(message_ptr message) { bool TcpTransport::outgoing(message_ptr message) {
// mSockMutex must be locked
// If nothing is pending, try to send directly // If nothing is pending, try to send directly
// It's safe because if the queue is empty, the thread is not sending // It's safe because if the queue is empty, the thread is not sending
if (mSendQueue.empty() && trySendMessage(message)) if (mSendQueue.empty() && trySendMessage(message))
@ -174,6 +173,7 @@ void TcpTransport::connect(const string &hostname, const string &service) {
} }
void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) { void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
std::unique_lock lock(mSockMutex);
try { try {
char node[MAX_NUMERICNODE_LEN]; char node[MAX_NUMERICNODE_LEN];
char serv[MAX_NUMERICSERV_LEN]; char serv[MAX_NUMERICSERV_LEN];
@ -248,15 +248,18 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
} }
void TcpTransport::close() { void TcpTransport::close() {
std::unique_lock lock(mSockMutex);
if (mSock != INVALID_SOCKET) { if (mSock != INVALID_SOCKET) {
PLOG_DEBUG << "Closing TCP socket"; PLOG_DEBUG << "Closing TCP socket";
::closesocket(mSock); ::closesocket(mSock);
mSock = INVALID_SOCKET; mSock = INVALID_SOCKET;
} }
changeState(State::Disconnected); changeState(State::Disconnected);
interruptSelect();
} }
bool TcpTransport::trySendQueue() { bool TcpTransport::trySendQueue() {
// mSockMutex must be locked
while (auto next = mSendQueue.peek()) { while (auto next = mSendQueue.peek()) {
auto message = *next; auto message = *next;
if (!trySendMessage(message)) { if (!trySendMessage(message)) {
@ -269,6 +272,7 @@ bool TcpTransport::trySendQueue() {
} }
bool TcpTransport::trySendMessage(message_ptr &message) { bool TcpTransport::trySendMessage(message_ptr &message) {
// mSockMutex must be locked
auto data = reinterpret_cast<const char *>(message->data()); auto data = reinterpret_cast<const char *>(message->data());
auto size = message->size(); auto size = message->size();
while (size) { while (size) {
@ -314,13 +318,22 @@ void TcpTransport::runLoop() {
changeState(State::Connected); changeState(State::Connected);
while (true) { while (true) {
std::unique_lock lock(mSockMutex);
if (mSock == INVALID_SOCKET)
break;
fd_set readfds, writefds; fd_set readfds, writefds;
int n = prepareSelect(readfds, writefds); int n = prepareSelect(readfds, writefds);
struct timeval tv; struct timeval tv;
tv.tv_sec = 10; tv.tv_sec = 10;
tv.tv_usec = 0; tv.tv_usec = 0;
lock.unlock();
int ret = ::select(n, &readfds, &writefds, NULL, &tv); int ret = ::select(n, &readfds, &writefds, NULL, &tv);
lock.lock();
if (mSock == INVALID_SOCKET)
break;
if (ret < 0) { if (ret < 0) {
throw std::runtime_error("Failed to wait on socket"); throw std::runtime_error("Failed to wait on socket");
} else if (ret == 0) { } else if (ret == 0) {

View File

@ -78,6 +78,7 @@ private:
string mHostname, mService; string mHostname, mService;
socket_t mSock = INVALID_SOCKET; socket_t mSock = INVALID_SOCKET;
std::mutex mSockMutex;
std::thread mThread; std::thread mThread;
SelectInterrupter mInterrupter; SelectInterrupter mInterrupter;
Queue<message_ptr> mSendQueue; Queue<message_ptr> mSendQueue;

81
src/threadpool.cpp Normal file
View File

@ -0,0 +1,81 @@
/**
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "threadpool.hpp"
namespace rtc {
ThreadPool &ThreadPool::Instance() {
static ThreadPool instance;
return instance;
}
ThreadPool::~ThreadPool() { join(); }
int ThreadPool::count() const {
std::unique_lock lock(mWorkersMutex);
return mWorkers.size();
}
void ThreadPool::spawn(int count) {
std::unique_lock lock(mWorkersMutex);
mJoining = false;
while (count-- > 0)
mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
}
void ThreadPool::join() {
std::unique_lock lock(mWorkersMutex);
mJoining = true;
mCondition.notify_all();
for (auto &w : mWorkers)
w.join();
mWorkers.clear();
}
void ThreadPool::run() {
while (runOne()) {
}
}
bool ThreadPool::runOne() {
if (auto task = dequeue()) {
try {
task();
} catch (const std::exception &e) {
PLOG_WARNING << "Unhandled exception in task: " << e.what();
}
return true;
}
return false;
}
std::function<void()> ThreadPool::dequeue() {
std::unique_lock lock(mMutex);
mCondition.wait(lock, [this]() { return !mTasks.empty() || mJoining; });
if (mTasks.empty())
return nullptr;
auto task = std::move(mTasks.front());
mTasks.pop();
return task;
}
} // namespace rtc

87
src/threadpool.hpp Normal file
View File

@ -0,0 +1,87 @@
/**
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef RTC_THREADPOOL_H
#define RTC_THREADPOOL_H
#include "include.hpp"
#include "init.hpp"
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
namespace rtc {
template <class F, class... Args>
using invoke_future_t = std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>>;
class ThreadPool final {
public:
static ThreadPool &Instance();
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator=(ThreadPool &&) = delete;
int count() const;
void spawn(int count = 1);
void join();
void run();
bool runOne();
template <class F, class... Args>
auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
protected:
ThreadPool() = default;
~ThreadPool();
std::function<void()> dequeue(); // returns null function if joining
std::vector<std::thread> mWorkers;
std::queue<std::function<void()>> mTasks;
std::atomic<bool> mJoining = false;
mutable std::mutex mMutex, mWorkersMutex;
std::condition_variable mCondition;
};
template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
std::unique_lock lock(mMutex);
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
auto task = std::make_shared<std::packaged_task<R()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<R> result = task->get_future();
mTasks.emplace([task = std::move(task), token = Init::Token()]() { return (*task)(); });
mCondition.notify_one();
return result;
}
} // namespace rtc
#endif

View File

@ -86,9 +86,8 @@ void init() {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
if (!done) { if (!done) {
OPENSSL_init_ssl(0, NULL); OPENSSL_init_ssl(OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, nullptr);
SSL_load_error_strings(); OPENSSL_init_crypto(OPENSSL_INIT_LOAD_CRYPTO_STRINGS, nullptr);
ERR_load_crypto_strings();
done = true; done = true;
} }
} }

View File

@ -60,6 +60,8 @@ gnutls_datum_t make_datum(char *data, size_t size);
#include <openssl/err.h> #include <openssl/err.h>
#include <openssl/pem.h> #include <openssl/pem.h>
#include <openssl/x509.h> #include <openssl/x509.h>
#include <openssl/rsa.h>
#include <openssl/bn.h>
#ifndef BIO_EOF #ifndef BIO_EOF
#define BIO_EOF -1 #define BIO_EOF -1

View File

@ -41,12 +41,17 @@ public:
virtual ~Transport() { virtual ~Transport() {
stop(); stop();
if (mLower)
mLower->onRecv(nullptr); // doing it on stop could cause a deadlock
} }
virtual bool stop() { virtual bool stop() {
return !mShutdown.exchange(true); if (mShutdown.exchange(true))
return false;
// We don't want incoming() to be called by the lower layer anymore
if (mLower)
mLower->onRecv(nullptr);
return true;
} }
void registerIncoming() { void registerIncoming() {

View File

@ -18,8 +18,9 @@
#if RTC_ENABLE_WEBSOCKET #if RTC_ENABLE_WEBSOCKET
#include "include.hpp"
#include "websocket.hpp" #include "websocket.hpp"
#include "include.hpp"
#include "threadpool.hpp"
#include "tcptransport.hpp" #include "tcptransport.hpp"
#include "tlstransport.hpp" #include "tlstransport.hpp"
@ -301,21 +302,18 @@ void WebSocket::closeTransports() {
auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr)); auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr));
auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr)); auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr));
auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr)); auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr));
if (ws || tls || tcp) { ThreadPool::Instance().enqueue([ws, tls, tcp]() mutable {
std::thread t([ws, tls, tcp, token = mInitToken]() mutable { if (ws)
if (ws) ws->stop();
ws->stop(); if (tls)
if (tls) tls->stop();
tls->stop(); if (tcp)
if (tcp) tcp->stop();
tcp->stop();
ws.reset(); ws.reset();
tls.reset(); tls.reset();
tcp.reset(); tcp.reset();
}); });
t.detach();
}
} }
} // namespace rtc } // namespace rtc

View File

@ -200,7 +200,7 @@ int test_capi_main() {
// You may call rtcCleanup() when finished to free static resources // You may call rtcCleanup() when finished to free static resources
rtcCleanup(); rtcCleanup();
sleep(1); sleep(2);
printf("Success\n"); printf("Success\n");
return 0; return 0;

View File

@ -150,7 +150,7 @@ void test_connectivity() {
// You may call rtc::Cleanup() when finished to free static resources // You may call rtc::Cleanup() when finished to free static resources
rtc::Cleanup(); rtc::Cleanup();
this_thread::sleep_for(1s); this_thread::sleep_for(2s);
cout << "Success" << endl; cout << "Success" << endl;
} }

View File

@ -18,6 +18,7 @@
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <thread>
using namespace std; using namespace std;
using namespace chrono_literals; using namespace chrono_literals;
@ -71,7 +72,10 @@ int main(int argc, char **argv) {
cout << "*** Finished WebRTC benchmark" << endl; cout << "*** Finished WebRTC benchmark" << endl;
} catch (const exception &e) { } catch (const exception &e) {
cerr << "WebRTC benchmark failed: " << e.what() << endl; cerr << "WebRTC benchmark failed: " << e.what() << endl;
std::this_thread::sleep_for(2s);
return -1; return -1;
} }
std::this_thread::sleep_for(2s);
return 0; return 0;
} }

View File

@ -72,7 +72,7 @@ void test_websocket() {
throw runtime_error("Expected message not received"); throw runtime_error("Expected message not received");
ws->close(); ws->close();
this_thread::sleep_for(1s); this_thread::sleep_for(2s);
// You may call rtc::Cleanup() when finished to free static resources // You may call rtc::Cleanup() when finished to free static resources
rtc::Cleanup(); rtc::Cleanup();