mirror of
https://github.com/mii443/libdatachannel.git
synced 2025-08-23 15:48:03 +00:00
Compare commits
63 Commits
Author | SHA1 | Date | |
---|---|---|---|
c93f44f132 | |||
517b69043f | |||
b04db3a744 | |||
36090a24e4 | |||
6b75b3e227 | |||
afed83f5f0 | |||
75c42592bf | |||
b35bfbeb0a | |||
e481e896cb | |||
202467928a | |||
71650ce163 | |||
706a8b7160 | |||
6f419a32ea | |||
f5ff042d62 | |||
822b2e6558 | |||
5b251af1d7 | |||
5b56291b67 | |||
777f5a8dfe | |||
971e6e8b91 | |||
a3fb52c173 | |||
db00253c18 | |||
dd2967b0e1 | |||
cc4e215067 | |||
bbeed01eb0 | |||
aecc2b8fda | |||
d60e18d963 | |||
e41019a1f0 | |||
5825e44fc8 | |||
dc9a8114bc | |||
3e827f9798 | |||
0a6b263bc3 | |||
e02c30027b | |||
c4380ebcc4 | |||
add0649335 | |||
cd28340de3 | |||
c675aedb83 | |||
e32d139056 | |||
a790161168 | |||
2697ef0d76 | |||
5044aedbec | |||
44c90c1cb4 | |||
be79c68540 | |||
226a927df1 | |||
4e1b9bb3c2 | |||
8bc016cc08 | |||
5afbe10d01 | |||
8df07ca68d | |||
884bd2316e | |||
dadecce709 | |||
103935bdd5 | |||
62e6954949 | |||
3ac2d155cc | |||
6d8788c2a1 | |||
603dd01b87 | |||
8091508428 | |||
b38f63f077 | |||
d87539937e | |||
eb09cadded | |||
79e0c62321 | |||
ef38777129 | |||
313f081061 | |||
6108b05e0d | |||
f68601b45f |
2
.github/workflows/build-openssl.yml
vendored
2
.github/workflows/build-openssl.yml
vendored
@ -43,6 +43,8 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- uses: ilammy/msvc-dev-cmd@v1
|
- uses: ilammy/msvc-dev-cmd@v1
|
||||||
|
- name: install packages
|
||||||
|
run: choco install openssl
|
||||||
- name: submodules
|
- name: submodules
|
||||||
run: git submodule update --init --recursive
|
run: git submodule update --init --recursive
|
||||||
- name: cmake
|
- name: cmake
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
cmake_minimum_required(VERSION 3.7)
|
cmake_minimum_required(VERSION 3.7)
|
||||||
project(libdatachannel
|
project(libdatachannel
|
||||||
DESCRIPTION "WebRTC DataChannels Library"
|
DESCRIPTION "WebRTC Data Channels Library"
|
||||||
VERSION 0.6.3
|
VERSION 0.6.5
|
||||||
LANGUAGES CXX)
|
LANGUAGES CXX)
|
||||||
|
|
||||||
option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
|
option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
|
||||||
option(USE_JUICE "Use libjuice instead of libnice" OFF)
|
option(USE_JUICE "Use libjuice instead of libnice" OFF)
|
||||||
option(RTC_ENABLE_WEBSOCKET "Build WebSocket support" ON)
|
option(USE_SRTP "Enable SRTP for media support" OFF)
|
||||||
|
option(NO_WEBSOCKET "Disable WebSocket support" OFF)
|
||||||
|
option(NO_EXAMPLES "Disable examples" OFF)
|
||||||
|
|
||||||
if(USE_GNUTLS)
|
if(USE_GNUTLS)
|
||||||
option(USE_NETTLE "Use Nettle instead of OpenSSL in libjuice" ON)
|
option(USE_NETTLE "Use Nettle instead of OpenSSL in libjuice" ON)
|
||||||
@ -42,6 +44,8 @@ set(LIBDATACHANNEL_SOURCES
|
|||||||
${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
|
||||||
|
${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.cpp
|
||||||
|
${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
set(LIBDATACHANNEL_WEBSOCKET_SOURCES
|
set(LIBDATACHANNEL_WEBSOCKET_SOURCES
|
||||||
@ -82,7 +86,6 @@ set(TESTS_SOURCES
|
|||||||
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
|
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
|
||||||
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
|
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
|
||||||
find_package(Threads REQUIRED)
|
find_package(Threads REQUIRED)
|
||||||
find_package(SRTP)
|
|
||||||
|
|
||||||
set(CMAKE_POLICY_DEFAULT_CMP0048 NEW)
|
set(CMAKE_POLICY_DEFAULT_CMP0048 NEW)
|
||||||
add_subdirectory(deps/plog)
|
add_subdirectory(deps/plog)
|
||||||
@ -99,7 +102,14 @@ endif()
|
|||||||
add_library(Usrsctp::Usrsctp ALIAS usrsctp)
|
add_library(Usrsctp::Usrsctp ALIAS usrsctp)
|
||||||
add_library(Usrsctp::UsrsctpStatic ALIAS usrsctp-static)
|
add_library(Usrsctp::UsrsctpStatic ALIAS usrsctp-static)
|
||||||
|
|
||||||
if (RTC_ENABLE_WEBSOCKET)
|
if (NO_WEBSOCKET)
|
||||||
|
add_library(datachannel SHARED
|
||||||
|
${LIBDATACHANNEL_SOURCES})
|
||||||
|
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL
|
||||||
|
${LIBDATACHANNEL_SOURCES})
|
||||||
|
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=0)
|
||||||
|
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=0)
|
||||||
|
else()
|
||||||
add_library(datachannel SHARED
|
add_library(datachannel SHARED
|
||||||
${LIBDATACHANNEL_SOURCES}
|
${LIBDATACHANNEL_SOURCES}
|
||||||
${LIBDATACHANNEL_WEBSOCKET_SOURCES})
|
${LIBDATACHANNEL_WEBSOCKET_SOURCES})
|
||||||
@ -108,13 +118,6 @@ if (RTC_ENABLE_WEBSOCKET)
|
|||||||
${LIBDATACHANNEL_WEBSOCKET_SOURCES})
|
${LIBDATACHANNEL_WEBSOCKET_SOURCES})
|
||||||
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=1)
|
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=1)
|
||||||
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=1)
|
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=1)
|
||||||
else()
|
|
||||||
add_library(datachannel SHARED
|
|
||||||
${LIBDATACHANNEL_SOURCES})
|
|
||||||
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL
|
|
||||||
${LIBDATACHANNEL_SOURCES})
|
|
||||||
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=0)
|
|
||||||
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=0)
|
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
set_target_properties(datachannel PROPERTIES
|
set_target_properties(datachannel PROPERTIES
|
||||||
@ -141,7 +144,8 @@ if(WIN32)
|
|||||||
target_link_libraries(datachannel-static PRIVATE wsock32 ws2_32) # winsock2
|
target_link_libraries(datachannel-static PRIVATE wsock32 ws2_32) # winsock2
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
if(SRTP_FOUND)
|
if(USE_SRTP)
|
||||||
|
find_package(SRTP REQUIRED)
|
||||||
if(NOT TARGET SRTP::SRTP)
|
if(NOT TARGET SRTP::SRTP)
|
||||||
add_library(SRTP::SRTP UNKNOWN IMPORTED)
|
add_library(SRTP::SRTP UNKNOWN IMPORTED)
|
||||||
set_target_properties(SRTP::SRTP PROPERTIES
|
set_target_properties(SRTP::SRTP PROPERTIES
|
||||||
@ -228,9 +232,11 @@ else()
|
|||||||
endif()
|
endif()
|
||||||
|
|
||||||
# Examples
|
# Examples
|
||||||
set(JSON_BuildTests OFF CACHE INTERNAL "")
|
if(NOT NO_EXAMPLES)
|
||||||
add_subdirectory(deps/json)
|
set(JSON_BuildTests OFF CACHE INTERNAL "")
|
||||||
add_subdirectory(examples/client)
|
add_subdirectory(deps/json)
|
||||||
add_subdirectory(examples/copy-paste)
|
add_subdirectory(examples/client)
|
||||||
add_subdirectory(examples/copy-paste-capi)
|
add_subdirectory(examples/copy-paste)
|
||||||
|
add_subdirectory(examples/copy-paste-capi)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
58
Jamfile
58
Jamfile
@ -25,7 +25,7 @@ lib libdatachannel
|
|||||||
<library>/libdatachannel//usrsctp
|
<library>/libdatachannel//usrsctp
|
||||||
<library>/libdatachannel//juice
|
<library>/libdatachannel//juice
|
||||||
<library>/libdatachannel//plog
|
<library>/libdatachannel//plog
|
||||||
<gnutls>on:<library>gnutls
|
<gnutls>on:<library>gnutls/<link>shared
|
||||||
<gnutls>off:<library>ssl
|
<gnutls>off:<library>ssl
|
||||||
<gnutls>off:<library>crypto
|
<gnutls>off:<library>crypto
|
||||||
: # default build
|
: # default build
|
||||||
@ -85,20 +85,37 @@ alias juice
|
|||||||
make libusrsctp.a : : @make_libusrsctp ;
|
make libusrsctp.a : : @make_libusrsctp ;
|
||||||
make usrsctp.lib : : @make_libusrsctp_msvc ;
|
make usrsctp.lib : : @make_libusrsctp_msvc ;
|
||||||
|
|
||||||
|
rule make_libusrsctp ( targets * : sources * : properties * )
|
||||||
|
{
|
||||||
|
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
|
||||||
|
VARIANT on $(targets) = $(VARIANT) ;
|
||||||
|
if <gnutls>on in $(properties)
|
||||||
|
{ BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ; }
|
||||||
|
else
|
||||||
|
{ BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ; }
|
||||||
|
}
|
||||||
actions make_libusrsctp
|
actions make_libusrsctp
|
||||||
{
|
{
|
||||||
(cd $(CWD)/deps/usrsctp && mkdir build && cd build && cmake -DCMAKE_C_FLAGS="-fPIC" .. && make -j2 usrsctp-static)
|
(cd $(CWD)/deps/usrsctp && mkdir $(BUILD_DIR) && cd $(BUILD_DIR) && cmake -DCMAKE_BUILD_TYPE=$(VARIANT) -DCMAKE_C_FLAGS="-fPIC" .. && make -j2 usrsctp-static)
|
||||||
cp $(CWD)/deps/usrsctp/build/usrsctplib/libusrsctp.a $(<)
|
cp $(CWD)/deps/usrsctp/$(BUILD_DIR)/usrsctplib/libusrsctp.a $(<)
|
||||||
|
}
|
||||||
|
rule make_libusrsctp_msvc ( targets * : sources * : properties * )
|
||||||
|
{
|
||||||
|
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
|
||||||
|
VARIANT on $(targets) = $(VARIANT) ;
|
||||||
|
if <gnutls>on in $(properties)
|
||||||
|
{ BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ; }
|
||||||
|
else
|
||||||
|
{ BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ; }
|
||||||
}
|
}
|
||||||
actions make_libusrsctp_msvc
|
actions make_libusrsctp_msvc
|
||||||
{
|
{
|
||||||
SET OLDD=%CD%
|
SET OLDD=%CD%
|
||||||
cd $(CWD)/deps/usrsctp
|
cd $(CWD)/deps/usrsctp
|
||||||
mkdir build
|
mkdir $(BUILD_SIR)
|
||||||
cd build
|
cd $(BUILD_DIR)
|
||||||
cmake -G "Visual Studio 16 2019" ..
|
cmake -G "Visual Studio 16 2019" ..
|
||||||
cd build
|
msbuild usrsctplib.sln /property:Configuration=$(VARIANT)
|
||||||
msbuild usrsctplib.sln /property:Configuration=Release
|
|
||||||
cd %OLDD%
|
cd %OLDD%
|
||||||
cp $(CWD)/deps/usrsctp/build/usrsctplib/Release/usrsctp.lib $(<)
|
cp $(CWD)/deps/usrsctp/build/usrsctplib/Release/usrsctp.lib $(<)
|
||||||
}
|
}
|
||||||
@ -108,11 +125,15 @@ make juice-static.lib : : @make_libjuice_msvc ;
|
|||||||
|
|
||||||
rule make_libjuice ( targets * : sources * : properties * )
|
rule make_libjuice ( targets * : sources * : properties * )
|
||||||
{
|
{
|
||||||
|
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
|
||||||
|
VARIANT on $(targets) = $(VARIANT) ;
|
||||||
if <gnutls>on in $(properties)
|
if <gnutls>on in $(properties)
|
||||||
{
|
{
|
||||||
|
BUILD_DIR on $(targets) = "build-gnutls-$(VARIANT)" ;
|
||||||
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ;
|
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ;
|
||||||
}
|
}
|
||||||
else {
|
else
|
||||||
|
{
|
||||||
local OPENSSL_INCLUDE = [ feature.get-values <openssl-include> : $(properties) ] ;
|
local OPENSSL_INCLUDE = [ feature.get-values <openssl-include> : $(properties) ] ;
|
||||||
|
|
||||||
if <target-os>darwin in $(properties) && $(OPENSSL_INCLUDE) = ""
|
if <target-os>darwin in $(properties) && $(OPENSSL_INCLUDE) = ""
|
||||||
@ -122,26 +143,29 @@ rule make_libjuice ( targets * : sources * : properties * )
|
|||||||
OPENSSL_INCLUDE = /usr/local/opt/openssl/include ;
|
OPENSSL_INCLUDE = /usr/local/opt/openssl/include ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BUILD_DIR on $(targets) = "build-openssl-$(VARIANT)" ;
|
||||||
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ;
|
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ;
|
||||||
if $(OPENSSL_INCLUDE) != ""
|
if $(OPENSSL_INCLUDE) != ""
|
||||||
{
|
{ CMAKEOPTS on $(targets) += " -DOPENSSL_ROOT_DIR=$(OPENSSL_INCLUDE)/.." ; }
|
||||||
CMAKEOPTS on $(targets) += " -DOPENSSL_ROOT_DIR=$(OPENSSL_INCLUDE)/.." ;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
actions make_libjuice
|
actions make_libjuice
|
||||||
{
|
{
|
||||||
(cd $(CWD)/deps/libjuice && mkdir build && cd build && cmake -DCMAKE_C_FLAGS="-fPIC" $(CMAKEOPTS) .. && make -j2 juice-static)
|
(cd $(CWD)/deps/libjuice && mkdir $(BUILD_DIR) && cd $(BUILD_DIR) && cmake -DCMAKE_C_FLAGS="-fPIC" -DCMAKE_BUILD_TYPE=$(VARIANT) $(CMAKEOPTS) .. && make -j2 juice-static)
|
||||||
cp $(CWD)/deps/libjuice/build/libjuice-static.a $(<)
|
cp $(CWD)/deps/libjuice/$(BUILD_DIR)/libjuice-static.a $(<)
|
||||||
}
|
}
|
||||||
rule make_libjuice_msvc ( targets * : sources * : properties * )
|
rule make_libjuice_msvc ( targets * : sources * : properties * )
|
||||||
{
|
{
|
||||||
|
local VARIANT = [ feature.get-values <variant> : $(properties) ] ;
|
||||||
|
VARIANT on $(targets) = $(VARIANT) ;
|
||||||
if <gnutls>on in $(properties)
|
if <gnutls>on in $(properties)
|
||||||
{
|
{
|
||||||
|
BUILD_DIR on $(targets) += "build-gnutls-$(VARIANT)" ;
|
||||||
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ;
|
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=1" ;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
BUILD_DIR on $(targets) += "build-openssl-$(VARIANT)" ;
|
||||||
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ;
|
CMAKEOPTS on $(targets) = "-DUSE_NETTLE=0" ;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -149,12 +173,12 @@ actions make_libjuice_msvc
|
|||||||
{
|
{
|
||||||
SET OLDD=%CD%
|
SET OLDD=%CD%
|
||||||
cd $(CWD)/deps/libjuice
|
cd $(CWD)/deps/libjuice
|
||||||
mkdir build
|
mkdir $(BUILD_DIR)
|
||||||
cd build
|
cd $(BUILD_DIR)
|
||||||
cmake -G "Visual Studio 16 2019" $(CMAKEOPTS) ..
|
cmake -G "Visual Studio 16 2019" $(CMAKEOPTS) ..
|
||||||
msbuild libjuice.sln /property:Configuration=Release
|
msbuild libjuice.sln /property:Configuration=$(VARIANT)
|
||||||
cd %OLDD%
|
cd %OLDD%
|
||||||
cp $(CWD)/deps/libjuice/build/Release/juice-static.lib $(<)
|
cp $(CWD)/deps/libjuice/$(BUILD_DIR)/Release/juice-static.lib $(<)
|
||||||
}
|
}
|
||||||
|
|
||||||
# the search path to pick up the openssl libraries from. This is the <search>
|
# the search path to pick up the openssl libraries from. This is the <search>
|
||||||
|
10
Makefile
10
Makefile
@ -38,22 +38,22 @@ else
|
|||||||
LIBS+=glib-2.0 gobject-2.0 nice
|
LIBS+=glib-2.0 gobject-2.0 nice
|
||||||
endif
|
endif
|
||||||
|
|
||||||
RTC_ENABLE_MEDIA ?= 0
|
USE_SRTP ?= 0
|
||||||
ifneq ($(RTC_ENABLE_MEDIA), 0)
|
ifneq ($(USE_SRTP), 0)
|
||||||
CPPFLAGS+=-DRTC_ENABLE_MEDIA=1
|
CPPFLAGS+=-DRTC_ENABLE_MEDIA=1
|
||||||
LIBS+=srtp
|
LIBS+=srtp
|
||||||
else
|
else
|
||||||
CPPFLAGS+=-DRTC_ENABLE_MEDIA=0
|
CPPFLAGS+=-DRTC_ENABLE_MEDIA=0
|
||||||
endif
|
endif
|
||||||
|
|
||||||
RTC_ENABLE_WEBSOCKET ?= 1
|
|
||||||
ifneq ($(RTC_ENABLE_WEBSOCKET), 0)
|
NO_WEBSOCKET ?= 0
|
||||||
|
ifeq ($(NO_WEBSOCKET), 0)
|
||||||
CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=1
|
CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=1
|
||||||
else
|
else
|
||||||
CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=0
|
CPPFLAGS+=-DRTC_ENABLE_WEBSOCKET=0
|
||||||
endif
|
endif
|
||||||
|
|
||||||
|
|
||||||
INCLUDES+=$(shell pkg-config --cflags $(LIBS))
|
INCLUDES+=$(shell pkg-config --cflags $(LIBS))
|
||||||
LDLIBS+=$(LOCALLIBS) $(shell pkg-config --libs $(LIBS))
|
LDLIBS+=$(LOCALLIBS) $(shell pkg-config --libs $(LIBS))
|
||||||
|
|
||||||
|
2
deps/libjuice
vendored
2
deps/libjuice
vendored
Submodule deps/libjuice updated: c6566d3c6f...92a2ed7d44
2
deps/usrsctp
vendored
2
deps/usrsctp
vendored
Submodule deps/usrsctp updated: fdc4d2b99b...ffed0925f2
@ -49,6 +49,7 @@ public:
|
|||||||
bool ended() const;
|
bool ended() const;
|
||||||
|
|
||||||
void hintType(Type type);
|
void hintType(Type type);
|
||||||
|
void setDataMid(string mid);
|
||||||
void setFingerprint(string fingerprint);
|
void setFingerprint(string fingerprint);
|
||||||
void setSctpPort(uint16_t port);
|
void setSctpPort(uint16_t port);
|
||||||
void setMaxMessageSize(size_t size);
|
void setMaxMessageSize(size_t size);
|
||||||
@ -86,7 +87,7 @@ private:
|
|||||||
string mid;
|
string mid;
|
||||||
std::vector<string> attributes;
|
std::vector<string> attributes;
|
||||||
};
|
};
|
||||||
std::map<string, Media> mMedia; // by mid
|
std::map<int, Media> mMedia; // by m-line index
|
||||||
|
|
||||||
// Candidates
|
// Candidates
|
||||||
std::vector<Candidate> mCandidates;
|
std::vector<Candidate> mCandidates;
|
||||||
|
@ -64,6 +64,8 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
|
|||||||
const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536; // Remote max message size if not specified in SDP
|
const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536; // Remote max message size if not specified in SDP
|
||||||
const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
|
const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
|
||||||
|
|
||||||
|
const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
|
||||||
|
|
||||||
// overloaded helper
|
// overloaded helper
|
||||||
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
|
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
|
||||||
template <class... Ts> overloaded(Ts...)->overloaded<Ts...>;
|
template <class... Ts> overloaded(Ts...)->overloaded<Ts...>;
|
||||||
|
@ -25,8 +25,7 @@
|
|||||||
|
|
||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
class Init;
|
using init_token = std::shared_ptr<void>;
|
||||||
using init_token = std::shared_ptr<Init>;
|
|
||||||
|
|
||||||
class Init {
|
class Init {
|
||||||
public:
|
public:
|
||||||
@ -39,9 +38,10 @@ public:
|
|||||||
private:
|
private:
|
||||||
Init();
|
Init();
|
||||||
|
|
||||||
static std::weak_ptr<Init> Weak;
|
static std::weak_ptr<void> Weak;
|
||||||
static init_token Global;
|
static std::shared_ptr<void> *Global;
|
||||||
static std::mutex Mutex;
|
static bool Initialized;
|
||||||
|
static std::recursive_mutex Mutex;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline void Preload() { Init::Preload(); }
|
inline void Preload() { Init::Preload(); }
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
class Certificate;
|
class Certificate;
|
||||||
|
class Processor;
|
||||||
class IceTransport;
|
class IceTransport;
|
||||||
class DtlsTransport;
|
class DtlsTransport;
|
||||||
class SctpTransport;
|
class SctpTransport;
|
||||||
@ -101,7 +102,7 @@ public:
|
|||||||
// Media
|
// Media
|
||||||
bool hasMedia() const;
|
bool hasMedia() const;
|
||||||
void sendMedia(const binary &packet);
|
void sendMedia(const binary &packet);
|
||||||
void send(const byte *packet, size_t size);
|
void sendMedia(const byte *packet, size_t size);
|
||||||
|
|
||||||
void onMedia(std::function<void(const binary &packet)> callback);
|
void onMedia(std::function<void(const binary &packet)> callback);
|
||||||
|
|
||||||
@ -139,10 +140,10 @@ private:
|
|||||||
|
|
||||||
void outgoingMedia(message_ptr message);
|
void outgoingMedia(message_ptr message);
|
||||||
|
|
||||||
|
const init_token mInitToken = Init::Token();
|
||||||
const Configuration mConfig;
|
const Configuration mConfig;
|
||||||
const future_certificate_ptr mCertificate;
|
const future_certificate_ptr mCertificate;
|
||||||
|
const std::unique_ptr<Processor> mProcessor;
|
||||||
init_token mInitToken = Init::Token();
|
|
||||||
|
|
||||||
std::optional<Description> mLocalDescription, mRemoteDescription;
|
std::optional<Description> mLocalDescription, mRemoteDescription;
|
||||||
mutable std::mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;
|
mutable std::mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#else
|
#else
|
||||||
#include <netdb.h>
|
#include <netdb.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
@ -107,8 +108,9 @@ bool Candidate::resolve(ResolveMode mode) {
|
|||||||
oss << sp << nodebuffer << sp << servbuffer << sp << "typ" << sp << type;
|
oss << sp << nodebuffer << sp << servbuffer << sp << "typ" << sp << type;
|
||||||
oss << left;
|
oss << left;
|
||||||
mCandidate = oss.str();
|
mCandidate = oss.str();
|
||||||
|
mIsResolved = true;
|
||||||
PLOG_VERBOSE << "Resolved candidate: " << mCandidate;
|
PLOG_VERBOSE << "Resolved candidate: " << mCandidate;
|
||||||
return mIsResolved = true;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -116,7 +118,7 @@ bool Candidate::resolve(ResolveMode mode) {
|
|||||||
freeaddrinfo(result);
|
freeaddrinfo(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return mIsResolved;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Candidate::isResolved() const { return mIsResolved; }
|
bool Candidate::isResolved() const { return mIsResolved; }
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "certificate.hpp"
|
#include "certificate.hpp"
|
||||||
|
#include "threadpool.hpp"
|
||||||
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
@ -201,8 +202,8 @@ certificate_ptr make_certificate_impl(string commonName) {
|
|||||||
auto *commonNameBytes =
|
auto *commonNameBytes =
|
||||||
reinterpret_cast<unsigned char *>(const_cast<char *>(commonName.c_str()));
|
reinterpret_cast<unsigned char *>(const_cast<char *>(commonName.c_str()));
|
||||||
|
|
||||||
if (!X509_gmtime_adj(X509_get_notBefore(x509.get()), 3600 * -1) ||
|
if (!X509_gmtime_adj(X509_getm_notBefore(x509.get()), 3600 * -1) ||
|
||||||
!X509_gmtime_adj(X509_get_notAfter(x509.get()), 3600 * 24 * 365) ||
|
!X509_gmtime_adj(X509_getm_notAfter(x509.get()), 3600 * 24 * 365) ||
|
||||||
!X509_set_version(x509.get(), 1) || !X509_set_pubkey(x509.get(), pkey.get()) ||
|
!X509_set_version(x509.get(), 1) || !X509_set_pubkey(x509.get(), pkey.get()) ||
|
||||||
!BN_pseudo_rand(serial_number.get(), serialSize, 0, 0) ||
|
!BN_pseudo_rand(serial_number.get(), serialSize, 0, 0) ||
|
||||||
!BN_to_ASN1_INTEGER(serial_number.get(), X509_get_serialNumber(x509.get())) ||
|
!BN_to_ASN1_INTEGER(serial_number.get(), X509_get_serialNumber(x509.get())) ||
|
||||||
@ -230,19 +231,6 @@ namespace rtc {
|
|||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
// Helper function roughly equivalent to std::async with policy std::launch::async
|
|
||||||
// since std::async might be unreliable on some platforms (e.g. Mingw32 on Windows)
|
|
||||||
template <class F, class... Args>
|
|
||||||
std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>>
|
|
||||||
thread_call(F &&f, Args &&... args) {
|
|
||||||
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
|
|
||||||
std::packaged_task<R()> task(std::bind(f, std::forward<Args>(args)...));
|
|
||||||
std::future<R> future = task.get_future();
|
|
||||||
std::thread t(std::move(task));
|
|
||||||
t.detach();
|
|
||||||
return future;
|
|
||||||
}
|
|
||||||
|
|
||||||
static std::unordered_map<string, future_certificate_ptr> CertificateCache;
|
static std::unordered_map<string, future_certificate_ptr> CertificateCache;
|
||||||
static std::mutex CertificateCacheMutex;
|
static std::mutex CertificateCacheMutex;
|
||||||
|
|
||||||
@ -254,7 +242,7 @@ future_certificate_ptr make_certificate(string commonName) {
|
|||||||
if (auto it = CertificateCache.find(commonName); it != CertificateCache.end())
|
if (auto it = CertificateCache.find(commonName); it != CertificateCache.end())
|
||||||
return it->second;
|
return it->second;
|
||||||
|
|
||||||
auto future = thread_call(make_certificate_impl, commonName);
|
auto future = ThreadPool::Instance().enqueue(make_certificate_impl, commonName);
|
||||||
auto shared = future.share();
|
auto shared = future.share();
|
||||||
CertificateCache.emplace(std::move(commonName), shared);
|
CertificateCache.emplace(std::move(commonName), shared);
|
||||||
return shared;
|
return shared;
|
||||||
|
@ -96,7 +96,7 @@ void DataChannel::close() {
|
|||||||
mIsClosed = true;
|
mIsClosed = true;
|
||||||
if (mIsOpen.exchange(false))
|
if (mIsOpen.exchange(false))
|
||||||
if (auto transport = mSctpTransport.lock())
|
if (auto transport = mSctpTransport.lock())
|
||||||
transport->close(mStream);
|
transport->closeStream(mStream);
|
||||||
|
|
||||||
mSctpTransport.reset();
|
mSctpTransport.reset();
|
||||||
resetCallbacks();
|
resetCallbacks();
|
||||||
|
@ -63,6 +63,7 @@ Description::Description(const string &sdp, Type type, Role role)
|
|||||||
std::istringstream ss(sdp);
|
std::istringstream ss(sdp);
|
||||||
std::optional<Media> currentMedia;
|
std::optional<Media> currentMedia;
|
||||||
|
|
||||||
|
int mlineIndex = 0;
|
||||||
bool finished;
|
bool finished;
|
||||||
do {
|
do {
|
||||||
string line;
|
string line;
|
||||||
@ -76,7 +77,9 @@ Description::Description(const string &sdp, Type type, Role role)
|
|||||||
if (currentMedia->type == "application")
|
if (currentMedia->type == "application")
|
||||||
mData.mid = currentMedia->mid;
|
mData.mid = currentMedia->mid;
|
||||||
else
|
else
|
||||||
mMedia.emplace(currentMedia->mid, std::move(*currentMedia));
|
mMedia.emplace(mlineIndex, std::move(*currentMedia));
|
||||||
|
|
||||||
|
++mlineIndex;
|
||||||
|
|
||||||
} else if (line.find(" ICE/SDP") != string::npos) {
|
} else if (line.find(" ICE/SDP") != string::npos) {
|
||||||
PLOG_WARNING << "SDP \"m=\" line has no corresponding mid, ignoring";
|
PLOG_WARNING << "SDP \"m=\" line has no corresponding mid, ignoring";
|
||||||
@ -163,6 +166,8 @@ void Description::hintType(Type type) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Description::setDataMid(string mid) { mData.mid = mid; }
|
||||||
|
|
||||||
void Description::setFingerprint(string fingerprint) {
|
void Description::setFingerprint(string fingerprint) {
|
||||||
mFingerprint.emplace(std::move(fingerprint));
|
mFingerprint.emplace(std::move(fingerprint));
|
||||||
}
|
}
|
||||||
@ -188,10 +193,7 @@ bool Description::hasMedia() const { return !mMedia.empty(); }
|
|||||||
|
|
||||||
void Description::addMedia(const Description &source) {
|
void Description::addMedia(const Description &source) {
|
||||||
for (auto p : source.mMedia)
|
for (auto p : source.mMedia)
|
||||||
if (p.first != mData.mid)
|
mMedia.emplace(p);
|
||||||
mMedia.emplace(std::move(p));
|
|
||||||
else
|
|
||||||
PLOG_WARNING << "Media mid \"" << p.first << "\" is the same as data mid, ignoring";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Description::operator string() const { return generateSdp("\r\n"); }
|
Description::operator string() const { return generateSdp("\r\n"); }
|
||||||
@ -212,48 +214,60 @@ string Description::generateSdp(const string &eol) const {
|
|||||||
// see Negotiating Media Multiplexing Using the Session Description Protocol
|
// see Negotiating Media Multiplexing Using the Session Description Protocol
|
||||||
// https://tools.ietf.org/html/draft-ietf-mmusic-sdp-bundle-negotiation-54
|
// https://tools.ietf.org/html/draft-ietf-mmusic-sdp-bundle-negotiation-54
|
||||||
sdp << "a=group:BUNDLE";
|
sdp << "a=group:BUNDLE";
|
||||||
for (const auto &m : mMedia)
|
for (int i = 0; i < int(mMedia.size() + 1); ++i)
|
||||||
sdp << " " << m.first; // mid
|
if (auto it = mMedia.find(i); it != mMedia.end())
|
||||||
sdp << " " << mData.mid << eol;
|
sdp << ' ' << it->second.mid;
|
||||||
|
else
|
||||||
|
sdp << ' ' << mData.mid;
|
||||||
|
sdp << eol;
|
||||||
|
|
||||||
// Data
|
sdp << "a=msid-semantic: WMS" << eol;
|
||||||
const string dataDescription = "UDP/DTLS/SCTP webrtc-datachannel";
|
|
||||||
sdp << "m=application" << ' ' << (!mMedia.empty() ? 0 : 9) << ' ' << dataDescription << eol;
|
|
||||||
sdp << "c=IN IP4 0.0.0.0" << eol;
|
|
||||||
if (!mMedia.empty())
|
|
||||||
sdp << "a=bundle-only" << eol;
|
|
||||||
sdp << "a=mid:" << mData.mid << eol;
|
|
||||||
if (mData.sctpPort)
|
|
||||||
sdp << "a=sctp-port:" << *mData.sctpPort << eol;
|
|
||||||
if (mData.maxMessageSize)
|
|
||||||
sdp << "a=max-message-size:" << *mData.maxMessageSize << eol;
|
|
||||||
|
|
||||||
// Non-data media
|
// Non-data media
|
||||||
if (!mMedia.empty()) {
|
if (!mMedia.empty()) {
|
||||||
// Lip-sync
|
// Lip-sync
|
||||||
sdp << "a=group:LS";
|
sdp << "a=group:LS";
|
||||||
for (const auto &m : mMedia)
|
for (const auto &p : mMedia)
|
||||||
sdp << " " << m.first; // mid
|
sdp << " " << p.second.mid;
|
||||||
sdp << eol;
|
sdp << eol;
|
||||||
|
}
|
||||||
|
|
||||||
// Descriptions and attributes
|
// Descriptions and attributes
|
||||||
for (const auto &m : mMedia) {
|
for (int i = 0; i < int(mMedia.size() + 1); ++i) {
|
||||||
const auto &media = m.second;
|
if (auto it = mMedia.find(i); it != mMedia.end()) {
|
||||||
|
// Non-data media
|
||||||
|
const auto &media = it->second;
|
||||||
sdp << "m=" << media.type << ' ' << 0 << ' ' << media.description << eol;
|
sdp << "m=" << media.type << ' ' << 0 << ' ' << media.description << eol;
|
||||||
sdp << "c=IN IP4 0.0.0.0" << eol;
|
sdp << "c=IN IP4 0.0.0.0" << eol;
|
||||||
sdp << "a=bundle-only" << eol;
|
sdp << "a=bundle-only" << eol;
|
||||||
sdp << "a=mid:" << media.mid << eol;
|
sdp << "a=mid:" << media.mid << eol;
|
||||||
for (const auto &attr : media.attributes)
|
for (const auto &attr : media.attributes)
|
||||||
sdp << "a=" << attr << eol;
|
sdp << "a=" << attr << eol;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// Data
|
||||||
|
const string description = "UDP/DTLS/SCTP webrtc-datachannel";
|
||||||
|
sdp << "m=application" << ' ' << (!mMedia.empty() ? 0 : 9) << ' ' << description << eol;
|
||||||
|
sdp << "c=IN IP4 0.0.0.0" << eol;
|
||||||
|
if (!mMedia.empty())
|
||||||
|
sdp << "a=bundle-only" << eol;
|
||||||
|
sdp << "a=mid:" << mData.mid << eol;
|
||||||
|
if (mData.sctpPort)
|
||||||
|
sdp << "a=sctp-port:" << *mData.sctpPort << eol;
|
||||||
|
if (mData.maxMessageSize)
|
||||||
|
sdp << "a=max-message-size:" << *mData.maxMessageSize << eol;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Common
|
// Common
|
||||||
sdp << "a=ice-options:trickle" << eol;
|
if (!mEnded)
|
||||||
|
sdp << "a=ice-options:trickle" << eol;
|
||||||
|
|
||||||
sdp << "a=ice-ufrag:" << mIceUfrag << eol;
|
sdp << "a=ice-ufrag:" << mIceUfrag << eol;
|
||||||
sdp << "a=ice-pwd:" << mIcePwd << eol;
|
sdp << "a=ice-pwd:" << mIcePwd << eol;
|
||||||
sdp << "a=setup:" << roleToString(mRole) << eol;
|
sdp << "a=setup:" << roleToString(mRole) << eol;
|
||||||
sdp << "a=tls-id:1" << eol;
|
sdp << "a=tls-id:1" << eol;
|
||||||
|
|
||||||
if (mFingerprint)
|
if (mFingerprint)
|
||||||
sdp << "a=fingerprint:sha-256 " << *mFingerprint << eol;
|
sdp << "a=fingerprint:sha-256 " << *mFingerprint << eol;
|
||||||
|
|
||||||
|
@ -43,50 +43,86 @@ DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower,
|
|||||||
std::move(stateChangeCallback)),
|
std::move(stateChangeCallback)),
|
||||||
mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback
|
mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback
|
||||||
|
|
||||||
PLOG_DEBUG << "Initializing SRTP transport";
|
PLOG_DEBUG << "Initializing DTLS-SRTP transport";
|
||||||
|
|
||||||
#if USE_GNUTLS
|
if (srtp_err_status_t err = srtp_create(&mSrtpIn, nullptr)) {
|
||||||
PLOG_DEBUG << "Initializing DTLS-SRTP transport (GnuTLS)";
|
throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err)));
|
||||||
gnutls::check(gnutls_srtp_set_profile(mSession, GNUTLS_SRTP_AES128_CM_HMAC_SHA1_80),
|
}
|
||||||
"Failed to set SRTP profile");
|
if (srtp_err_status_t err = srtp_create(&mSrtpOut, nullptr)) {
|
||||||
#else
|
srtp_dealloc(mSrtpIn);
|
||||||
PLOG_DEBUG << "Initializing DTLS-SRTP transport (OpenSSL)";
|
throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err)));
|
||||||
openssl::check(SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"),
|
}
|
||||||
"Failed to set SRTP profile");
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DtlsSrtpTransport::~DtlsSrtpTransport() {
|
DtlsSrtpTransport::~DtlsSrtpTransport() {
|
||||||
stop();
|
stop();
|
||||||
|
|
||||||
if (mCreated)
|
srtp_dealloc(mSrtpIn);
|
||||||
srtp_dealloc(mSrtp);
|
srtp_dealloc(mSrtpOut);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DtlsSrtpTransport::send(message_ptr message) {
|
bool DtlsSrtpTransport::sendMedia(message_ptr message) {
|
||||||
if (!message)
|
if (!message)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
if (!mInitDone) {
|
||||||
|
PLOG_WARNING << "SRTP media sent before keys are derived";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
int size = message->size();
|
int size = message->size();
|
||||||
PLOG_VERBOSE << "Send size=" << size;
|
PLOG_VERBOSE << "Send size=" << size;
|
||||||
|
|
||||||
// srtp_protect() assumes that it can write SRTP_MAX_TRAILER_LEN (for the authentication tag)
|
// The RTP header has a minimum size of 12 bytes
|
||||||
// into the location in memory immediately following the RTP packet.
|
if (size < 12)
|
||||||
|
throw std::runtime_error("RTP/RTCP packet too short");
|
||||||
|
|
||||||
|
// srtp_protect() and srtp_protect_rtcp() assume that they can write SRTP_MAX_TRAILER_LEN (for
|
||||||
|
// the authentication tag) into the location in memory immediately following the RTP packet.
|
||||||
message->resize(size + SRTP_MAX_TRAILER_LEN);
|
message->resize(size + SRTP_MAX_TRAILER_LEN);
|
||||||
if (srtp_err_status_t err = srtp_protect(mSrtp, message->data(), &size)) {
|
|
||||||
if (err == srtp_err_status_replay_fail)
|
uint8_t value2 = to_integer<uint8_t>(*(message->begin() + 1)) & 0x7F;
|
||||||
throw std::runtime_error("SRTP packet is a replay");
|
PLOG_VERBOSE << "Demultiplexing SRTCP and SRTP with RTP payload type, value="
|
||||||
else
|
<< unsigned(value2);
|
||||||
throw std::runtime_error("SRTP protect error, status=" +
|
|
||||||
to_string(static_cast<int>(err)));
|
// RFC 5761 Multiplexing RTP and RTCP 4. Distinguishable RTP and RTCP Packets
|
||||||
|
// It is RECOMMENDED to follow the guidelines in the RTP/AVP profile for the choice of RTP
|
||||||
|
// payload type values, with the additional restriction that payload type values in the
|
||||||
|
// range 64-95 MUST NOT be used. Specifically, dynamic RTP payload types SHOULD be chosen in
|
||||||
|
// the range 96-127 where possible. Values below 64 MAY be used if that is insufficient
|
||||||
|
// [...]
|
||||||
|
if (value2 >= 64 && value2 <= 95) { // Range 64-95 (inclusive) MUST be RTCP
|
||||||
|
if (srtp_err_status_t err = srtp_protect_rtcp(mSrtpOut, message->data(), &size)) {
|
||||||
|
if (err == srtp_err_status_replay_fail)
|
||||||
|
throw std::runtime_error("SRTCP packet is a replay");
|
||||||
|
else
|
||||||
|
throw std::runtime_error("SRTCP protect error, status=" +
|
||||||
|
to_string(static_cast<int>(err)));
|
||||||
|
}
|
||||||
|
PLOG_VERBOSE << "Protected SRTCP packet, size=" << size;
|
||||||
|
} else {
|
||||||
|
if (srtp_err_status_t err = srtp_protect(mSrtpOut, message->data(), &size)) {
|
||||||
|
if (err == srtp_err_status_replay_fail)
|
||||||
|
throw std::runtime_error("SRTP packet is a replay");
|
||||||
|
else
|
||||||
|
throw std::runtime_error("SRTP protect error, status=" +
|
||||||
|
to_string(static_cast<int>(err)));
|
||||||
|
}
|
||||||
|
PLOG_VERBOSE << "Protected SRTP packet, size=" << size;
|
||||||
}
|
}
|
||||||
PLOG_VERBOSE << "Protected SRTP packet, size=" << size;
|
|
||||||
message->resize(size);
|
message->resize(size);
|
||||||
outgoing(message);
|
outgoing(message);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DtlsSrtpTransport::incoming(message_ptr message) {
|
void DtlsSrtpTransport::incoming(message_ptr message) {
|
||||||
|
if (!mInitDone) {
|
||||||
|
// Bypas
|
||||||
|
DtlsTransport::incoming(message);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int size = message->size();
|
int size = message->size();
|
||||||
if (size == 0)
|
if (size == 0)
|
||||||
return;
|
return;
|
||||||
@ -95,49 +131,80 @@ void DtlsSrtpTransport::incoming(message_ptr message) {
|
|||||||
// The process for demultiplexing a packet is as follows. The receiver looks at the first byte
|
// The process for demultiplexing a packet is as follows. The receiver looks at the first byte
|
||||||
// of the packet. [...] If the value is in between 128 and 191 (inclusive), then the packet is
|
// of the packet. [...] If the value is in between 128 and 191 (inclusive), then the packet is
|
||||||
// RTP (or RTCP [...]). If the value is between 20 and 63 (inclusive), the packet is DTLS.
|
// RTP (or RTCP [...]). If the value is between 20 and 63 (inclusive), the packet is DTLS.
|
||||||
uint8_t value = to_integer<uint8_t>(*message->begin());
|
uint8_t value1 = to_integer<uint8_t>(*message->begin());
|
||||||
|
PLOG_VERBOSE << "Demultiplexing DTLS and SRTP/SRTCP with first byte, value="
|
||||||
|
<< unsigned(value1);
|
||||||
|
|
||||||
if (value >= 128 && value <= 192) {
|
if (value1 >= 20 && value1 <= 63) {
|
||||||
PLOG_VERBOSE << "Incoming DTLS packet, size=" << size;
|
PLOG_VERBOSE << "Incoming DTLS packet, size=" << size;
|
||||||
DtlsTransport::incoming(message);
|
DtlsTransport::incoming(message);
|
||||||
} else if (value >= 20 && value <= 64) {
|
|
||||||
PLOG_VERBOSE << "Incoming SRTP packet, size=" << size;
|
|
||||||
|
|
||||||
if (srtp_err_status_t err = srtp_unprotect(mSrtp, message->data(), &size)) {
|
} else if (value1 >= 128 && value1 <= 191) {
|
||||||
if (err == srtp_err_status_replay_fail)
|
// The RTP header has a minimum size of 12 bytes
|
||||||
PLOG_WARNING << "Incoming SRTP packet is a replay";
|
if (size < 12) {
|
||||||
else
|
PLOG_WARNING << "Incoming SRTP/SRTCP packet too short, size=" << size;
|
||||||
PLOG_WARNING << "SRTP unprotect error, status=" << err;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
PLOG_VERBOSE << "Unprotected SRTP packet, size=" << size;
|
|
||||||
|
uint8_t value2 = to_integer<uint8_t>(*(message->begin() + 1)) & 0x7F;
|
||||||
|
PLOG_VERBOSE << "Demultiplexing SRTCP and SRTP with RTP payload type, value="
|
||||||
|
<< unsigned(value2);
|
||||||
|
|
||||||
|
// See RFC 5761 reference above
|
||||||
|
if (value2 >= 64 && value2 <= 95) { // Range 64-95 (inclusive) MUST be RTCP
|
||||||
|
PLOG_VERBOSE << "Incoming SRTCP packet, size=" << size;
|
||||||
|
if (srtp_err_status_t err = srtp_unprotect_rtcp(mSrtpIn, message->data(), &size)) {
|
||||||
|
if (err == srtp_err_status_replay_fail)
|
||||||
|
PLOG_WARNING << "Incoming SRTCP packet is a replay";
|
||||||
|
else
|
||||||
|
PLOG_WARNING << "SRTCP unprotect error, status=" << err;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
PLOG_VERBOSE << "Unprotected SRTCP packet, size=" << size;
|
||||||
|
} else {
|
||||||
|
PLOG_VERBOSE << "Incoming SRTP packet, size=" << size;
|
||||||
|
if (srtp_err_status_t err = srtp_unprotect(mSrtpIn, message->data(), &size)) {
|
||||||
|
if (err == srtp_err_status_replay_fail)
|
||||||
|
PLOG_WARNING << "Incoming SRTP packet is a replay";
|
||||||
|
else
|
||||||
|
PLOG_WARNING << "SRTP unprotect error, status=" << err;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
PLOG_VERBOSE << "Unprotected SRTP packet, size=" << size;
|
||||||
|
}
|
||||||
|
|
||||||
message->resize(size);
|
message->resize(size);
|
||||||
mSrtpRecvCallback(message);
|
mSrtpRecvCallback(message);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
PLOG_WARNING << "Unknown packet type, value=" << value << ", size=" << size;
|
PLOG_WARNING << "Unknown packet type, value=" << unsigned(value1) << ", size=" << size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DtlsSrtpTransport::postCreation() {
|
||||||
|
#if USE_GNUTLS
|
||||||
|
PLOG_DEBUG << "Setting SRTP profile (GnuTLS)";
|
||||||
|
gnutls::check(gnutls_srtp_set_profile(mSession, GNUTLS_SRTP_AES128_CM_HMAC_SHA1_80),
|
||||||
|
"Failed to set SRTP profile");
|
||||||
|
#else
|
||||||
|
PLOG_DEBUG << "Setting SRTP profile (OpenSSL)";
|
||||||
|
// returns 0 on success, 1 on error
|
||||||
|
if (SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"), "Failed to set SRTP profile")
|
||||||
|
throw std::runtime_error("Failed to set SRTP profile: " + openssl::error_string(ERR_get_error()));
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
void DtlsSrtpTransport::postHandshake() {
|
void DtlsSrtpTransport::postHandshake() {
|
||||||
if (mCreated)
|
if (mInitDone)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
srtp_policy_t inbound = {};
|
|
||||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtp);
|
|
||||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtcp);
|
|
||||||
inbound.ssrc.type = ssrc_any_inbound;
|
|
||||||
|
|
||||||
srtp_policy_t outbound = {};
|
|
||||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtp);
|
|
||||||
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtcp);
|
|
||||||
outbound.ssrc.type = ssrc_any_outbound;
|
|
||||||
|
|
||||||
const size_t materialLen = SRTP_AES_ICM_128_KEY_LEN_WSALT * 2;
|
const size_t materialLen = SRTP_AES_ICM_128_KEY_LEN_WSALT * 2;
|
||||||
unsigned char material[materialLen];
|
unsigned char material[materialLen];
|
||||||
const unsigned char *clientKey, *clientSalt, *serverKey, *serverSalt;
|
const unsigned char *clientKey, *clientSalt, *serverKey, *serverSalt;
|
||||||
|
|
||||||
#if USE_GNUTLS
|
#if USE_GNUTLS
|
||||||
|
PLOG_INFO << "Deriving SRTP keying material (GnuTLS)";
|
||||||
|
|
||||||
gnutls_datum_t clientKeyDatum, clientSaltDatum, serverKeyDatum, serverSaltDatum;
|
gnutls_datum_t clientKeyDatum, clientSaltDatum, serverKeyDatum, serverSaltDatum;
|
||||||
gnutls::check(gnutls_srtp_get_keys(mSession, material, materialLen, &clientKeyDatum,
|
gnutls::check(gnutls_srtp_get_keys(mSession, material, materialLen, &clientKeyDatum,
|
||||||
&clientSaltDatum, &serverKeyDatum, &serverSaltDatum),
|
&clientSaltDatum, &serverKeyDatum, &serverSaltDatum),
|
||||||
@ -160,18 +227,23 @@ void DtlsSrtpTransport::postHandshake() {
|
|||||||
serverKey = reinterpret_cast<const unsigned char *>(serverKeyDatum.data);
|
serverKey = reinterpret_cast<const unsigned char *>(serverKeyDatum.data);
|
||||||
serverSalt = reinterpret_cast<const unsigned char *>(serverSaltDatum.data);
|
serverSalt = reinterpret_cast<const unsigned char *>(serverSaltDatum.data);
|
||||||
#else
|
#else
|
||||||
// This provides the client write master key, the server write master key, the client write
|
PLOG_INFO << "Deriving SRTP keying material (OpenSSL)";
|
||||||
// master salt and the server write master salt in that order.
|
|
||||||
|
// The extractor provides the client write master key, the server write master key, the client
|
||||||
|
// write master salt and the server write master salt in that order.
|
||||||
const string label = "EXTRACTOR-dtls_srtp";
|
const string label = "EXTRACTOR-dtls_srtp";
|
||||||
openssl::check(SSL_export_keying_material(mSsl, material, materialLen, label.c_str(),
|
|
||||||
label.size(), nullptr, 0, 0),
|
// returns 1 on success, 0 or -1 on failure (OpenSSL API is a complete mess...)
|
||||||
"Failed to derive SRTP keys");
|
if (SSL_export_keying_material(mSsl, material, materialLen, label.c_str(), label.size(),
|
||||||
|
nullptr, 0, 0) <= 0)
|
||||||
|
throw std::runtime_error("Failed to derive SRTP keys: " +
|
||||||
|
openssl::error_string(ERR_get_error()));
|
||||||
|
|
||||||
clientKey = material;
|
clientKey = material;
|
||||||
clientSalt = clientKey + SRTP_AES_128_KEY_LEN;
|
clientSalt = clientKey + SRTP_AES_128_KEY_LEN;
|
||||||
|
|
||||||
serverKey = material + SRTP_AES_ICM_128_KEY_LEN_WSALT;
|
serverKey = material + SRTP_AES_ICM_128_KEY_LEN_WSALT;
|
||||||
serverSalt = serverSalt + SRTP_AES_128_KEY_LEN;
|
serverSalt = serverKey + SRTP_AES_128_KEY_LEN;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
unsigned char clientSessionKey[SRTP_AES_ICM_128_KEY_LEN_WSALT];
|
unsigned char clientSessionKey[SRTP_AES_ICM_128_KEY_LEN_WSALT];
|
||||||
@ -182,22 +254,31 @@ void DtlsSrtpTransport::postHandshake() {
|
|||||||
std::memcpy(serverSessionKey, serverKey, SRTP_AES_128_KEY_LEN);
|
std::memcpy(serverSessionKey, serverKey, SRTP_AES_128_KEY_LEN);
|
||||||
std::memcpy(serverSessionKey + SRTP_AES_128_KEY_LEN, serverSalt, SRTP_SALT_LEN);
|
std::memcpy(serverSessionKey + SRTP_AES_128_KEY_LEN, serverSalt, SRTP_SALT_LEN);
|
||||||
|
|
||||||
if (mIsClient) {
|
srtp_policy_t inbound = {};
|
||||||
inbound.key = serverSessionKey;
|
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtp);
|
||||||
outbound.key = clientSessionKey;
|
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&inbound.rtcp);
|
||||||
} else {
|
inbound.ssrc.type = ssrc_any_inbound;
|
||||||
inbound.key = clientSessionKey;
|
inbound.ssrc.value = 0;
|
||||||
outbound.key = serverSessionKey;
|
inbound.key = mIsClient ? serverSessionKey : clientSessionKey;
|
||||||
}
|
inbound.next = nullptr;
|
||||||
|
|
||||||
srtp_policy_t *policies = &inbound;
|
if (srtp_err_status_t err = srtp_add_stream(mSrtpIn, &inbound))
|
||||||
inbound.next = &outbound;
|
throw std::runtime_error("SRTP add inbound stream failed, status=" +
|
||||||
|
to_string(static_cast<int>(err)));
|
||||||
|
|
||||||
|
srtp_policy_t outbound = {};
|
||||||
|
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtp);
|
||||||
|
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&outbound.rtcp);
|
||||||
|
outbound.ssrc.type = ssrc_any_outbound;
|
||||||
|
outbound.ssrc.value = 0;
|
||||||
|
outbound.key = mIsClient ? clientSessionKey : serverSessionKey;
|
||||||
outbound.next = nullptr;
|
outbound.next = nullptr;
|
||||||
|
|
||||||
if (srtp_err_status_t err = srtp_create(&mSrtp, policies))
|
if (srtp_err_status_t err = srtp_add_stream(mSrtpOut, &outbound))
|
||||||
throw std::runtime_error("SRTP create failed, status=" + to_string(static_cast<int>(err)));
|
throw std::runtime_error("SRTP add outbound stream failed, status=" +
|
||||||
|
to_string(static_cast<int>(err)));
|
||||||
|
|
||||||
mCreated = true;
|
mInitDone = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
@ -38,16 +38,17 @@ public:
|
|||||||
state_callback stateChangeCallback);
|
state_callback stateChangeCallback);
|
||||||
~DtlsSrtpTransport();
|
~DtlsSrtpTransport();
|
||||||
|
|
||||||
bool send(message_ptr message) override;
|
bool sendMedia(message_ptr message);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void incoming(message_ptr message) override;
|
void incoming(message_ptr message) override;
|
||||||
|
void postCreation() override;
|
||||||
void postHandshake() override;
|
void postHandshake() override;
|
||||||
|
|
||||||
message_callback mSrtpRecvCallback;
|
message_callback mSrtpRecvCallback;
|
||||||
|
|
||||||
srtp_t mSrtp;
|
srtp_t mSrtpIn, mSrtpOut;
|
||||||
bool mCreated = false;
|
bool mInitDone = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
@ -85,6 +85,8 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr cer
|
|||||||
gnutls_transport_set_pull_function(mSession, ReadCallback);
|
gnutls_transport_set_pull_function(mSession, ReadCallback);
|
||||||
gnutls_transport_set_pull_timeout_function(mSession, TimeoutCallback);
|
gnutls_transport_set_pull_timeout_function(mSession, TimeoutCallback);
|
||||||
|
|
||||||
|
postCreation();
|
||||||
|
|
||||||
mRecvThread = std::thread(&DtlsTransport::runRecvLoop, this);
|
mRecvThread = std::thread(&DtlsTransport::runRecvLoop, this);
|
||||||
registerIncoming();
|
registerIncoming();
|
||||||
|
|
||||||
@ -137,6 +139,10 @@ void DtlsTransport::incoming(message_ptr message) {
|
|||||||
mIncomingQueue.push(message);
|
mIncomingQueue.push(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DtlsTransport::postCreation() {
|
||||||
|
// Dummy
|
||||||
|
}
|
||||||
|
|
||||||
void DtlsTransport::postHandshake() {
|
void DtlsTransport::postHandshake() {
|
||||||
// Dummy
|
// Dummy
|
||||||
}
|
}
|
||||||
@ -408,6 +414,10 @@ void DtlsTransport::incoming(message_ptr message) {
|
|||||||
mIncomingQueue.push(message);
|
mIncomingQueue.push(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DtlsTransport::postCreation() {
|
||||||
|
// Dummy
|
||||||
|
}
|
||||||
|
|
||||||
void DtlsTransport::postHandshake() {
|
void DtlsTransport::postHandshake() {
|
||||||
// Dummy
|
// Dummy
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,7 @@ public:
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void incoming(message_ptr message) override;
|
virtual void incoming(message_ptr message) override;
|
||||||
|
virtual void postCreation();
|
||||||
virtual void postHandshake();
|
virtual void postHandshake();
|
||||||
void runRecvLoop();
|
void runRecvLoop();
|
||||||
|
|
||||||
|
92
src/init.cpp
92
src/init.cpp
@ -21,6 +21,7 @@
|
|||||||
#include "certificate.hpp"
|
#include "certificate.hpp"
|
||||||
#include "dtlstransport.hpp"
|
#include "dtlstransport.hpp"
|
||||||
#include "sctptransport.hpp"
|
#include "sctptransport.hpp"
|
||||||
|
#include "threadpool.hpp"
|
||||||
#include "tls.hpp"
|
#include "tls.hpp"
|
||||||
|
|
||||||
#if RTC_ENABLE_WEBSOCKET
|
#if RTC_ENABLE_WEBSOCKET
|
||||||
@ -39,41 +40,21 @@ using std::shared_ptr;
|
|||||||
|
|
||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
std::weak_ptr<Init> Init::Weak;
|
namespace {
|
||||||
init_token Init::Global;
|
|
||||||
std::mutex Init::Mutex;
|
|
||||||
|
|
||||||
init_token Init::Token() {
|
void doInit() {
|
||||||
std::lock_guard lock(Mutex);
|
PLOG_DEBUG << "Global initialization";
|
||||||
|
|
||||||
if (!Global) {
|
|
||||||
if (auto token = Weak.lock())
|
|
||||||
Global = token;
|
|
||||||
else
|
|
||||||
Global = shared_ptr<Init>(new Init());
|
|
||||||
}
|
|
||||||
return Global;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Init::Preload() {
|
|
||||||
Token(); // pre-init
|
|
||||||
make_certificate().wait(); // preload certificate
|
|
||||||
}
|
|
||||||
|
|
||||||
void Init::Cleanup() {
|
|
||||||
Global.reset();
|
|
||||||
CleanupCertificateCache();
|
|
||||||
}
|
|
||||||
|
|
||||||
Init::Init() {
|
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
WSADATA wsaData;
|
WSADATA wsaData;
|
||||||
if (WSAStartup(MAKEWORD(2, 2), &wsaData))
|
if (WSAStartup(MAKEWORD(2, 2), &wsaData))
|
||||||
throw std::runtime_error("WSAStartup failed, error=" + std::to_string(WSAGetLastError()));
|
throw std::runtime_error("WSAStartup failed, error=" + std::to_string(WSAGetLastError()));
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
ThreadPool::Instance().spawn(THREADPOOL_SIZE);
|
||||||
|
|
||||||
#if USE_GNUTLS
|
#if USE_GNUTLS
|
||||||
// Nothing to do
|
// Nothing to do
|
||||||
#else
|
#else
|
||||||
openssl::init();
|
openssl::init();
|
||||||
#endif
|
#endif
|
||||||
@ -88,7 +69,12 @@ Init::Init() {
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
Init::~Init() {
|
void doCleanup() {
|
||||||
|
PLOG_DEBUG << "Global cleanup";
|
||||||
|
|
||||||
|
ThreadPool::Instance().join();
|
||||||
|
CleanupCertificateCache();
|
||||||
|
|
||||||
SctpTransport::Cleanup();
|
SctpTransport::Cleanup();
|
||||||
DtlsTransport::Cleanup();
|
DtlsTransport::Cleanup();
|
||||||
#if RTC_ENABLE_WEBSOCKET
|
#if RTC_ENABLE_WEBSOCKET
|
||||||
@ -103,5 +89,57 @@ Init::~Init() {
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
std::weak_ptr<void> Init::Weak;
|
||||||
|
std::shared_ptr<void> *Init::Global = nullptr;
|
||||||
|
bool Init::Initialized = false;
|
||||||
|
std::recursive_mutex Init::Mutex;
|
||||||
|
|
||||||
|
init_token Init::Token() {
|
||||||
|
std::unique_lock lock(Mutex);
|
||||||
|
if (auto token = Weak.lock())
|
||||||
|
return token;
|
||||||
|
|
||||||
|
delete Global;
|
||||||
|
Global = new shared_ptr<void>(new Init());
|
||||||
|
Weak = *Global;
|
||||||
|
return *Global;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Init::Preload() {
|
||||||
|
std::unique_lock lock(Mutex);
|
||||||
|
auto token = Token();
|
||||||
|
if (!Global)
|
||||||
|
Global = new shared_ptr<void>(token);
|
||||||
|
|
||||||
|
PLOG_DEBUG << "Preloading certificate";
|
||||||
|
make_certificate().wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Init::Cleanup() {
|
||||||
|
std::unique_lock lock(Mutex);
|
||||||
|
delete Global;
|
||||||
|
Global = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
Init::Init() {
|
||||||
|
// Mutex is locked by Token() here
|
||||||
|
if (!std::exchange(Initialized, true))
|
||||||
|
doInit();
|
||||||
|
}
|
||||||
|
|
||||||
|
Init::~Init() {
|
||||||
|
std::thread t([]() {
|
||||||
|
// We need to lock Mutex ourselves
|
||||||
|
std::unique_lock lock(Mutex);
|
||||||
|
if (Global)
|
||||||
|
return;
|
||||||
|
if (std::exchange(Initialized, false))
|
||||||
|
doCleanup();
|
||||||
|
});
|
||||||
|
t.detach();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
|
||||||
|
@ -18,9 +18,12 @@
|
|||||||
|
|
||||||
#include "peerconnection.hpp"
|
#include "peerconnection.hpp"
|
||||||
#include "certificate.hpp"
|
#include "certificate.hpp"
|
||||||
|
#include "include.hpp"
|
||||||
|
#include "processor.hpp"
|
||||||
|
#include "threadpool.hpp"
|
||||||
|
|
||||||
#include "dtlstransport.hpp"
|
#include "dtlstransport.hpp"
|
||||||
#include "icetransport.hpp"
|
#include "icetransport.hpp"
|
||||||
#include "include.hpp"
|
|
||||||
#include "sctptransport.hpp"
|
#include "sctptransport.hpp"
|
||||||
|
|
||||||
#if RTC_ENABLE_MEDIA
|
#if RTC_ENABLE_MEDIA
|
||||||
@ -39,8 +42,8 @@ using std::weak_ptr;
|
|||||||
PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
|
PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
|
||||||
|
|
||||||
PeerConnection::PeerConnection(const Configuration &config)
|
PeerConnection::PeerConnection(const Configuration &config)
|
||||||
: mConfig(config), mCertificate(make_certificate()), mState(State::New),
|
: mConfig(config), mCertificate(make_certificate()), mProcessor(std::make_unique<Processor>()),
|
||||||
mGatheringState(GatheringState::New) {
|
mState(State::New), mGatheringState(GatheringState::New) {
|
||||||
PLOG_VERBOSE << "Creating PeerConnection";
|
PLOG_VERBOSE << "Creating PeerConnection";
|
||||||
|
|
||||||
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
|
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
|
||||||
@ -145,6 +148,7 @@ void PeerConnection::addRemoteCandidate(Candidate candidate) {
|
|||||||
iceTransport->addRemoteCandidate(candidate);
|
iceTransport->addRemoteCandidate(candidate);
|
||||||
} else {
|
} else {
|
||||||
// OK, we might need a lookup, do it asynchronously
|
// OK, we might need a lookup, do it asynchronously
|
||||||
|
// We don't use the thread pool because we have no control on the timeout
|
||||||
weak_ptr<IceTransport> weakIceTransport{iceTransport};
|
weak_ptr<IceTransport> weakIceTransport{iceTransport};
|
||||||
std::thread t([weakIceTransport, candidate]() mutable {
|
std::thread t([weakIceTransport, candidate]() mutable {
|
||||||
if (candidate.resolve(Candidate::ResolveMode::Lookup))
|
if (candidate.resolve(Candidate::ResolveMode::Lookup))
|
||||||
@ -227,7 +231,7 @@ void PeerConnection::sendMedia(const binary &packet) {
|
|||||||
outgoingMedia(make_message(packet.begin(), packet.end(), Message::Binary));
|
outgoingMedia(make_message(packet.begin(), packet.end(), Message::Binary));
|
||||||
}
|
}
|
||||||
|
|
||||||
void PeerConnection::send(const byte *packet, size_t size) {
|
void PeerConnection::sendMedia(const byte *packet, size_t size) {
|
||||||
outgoingMedia(make_message(packet, packet + size, Message::Binary));
|
outgoingMedia(make_message(packet, packet + size, Message::Binary));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,7 +248,7 @@ void PeerConnection::outgoingMedia(message_ptr message) {
|
|||||||
if (!transport)
|
if (!transport)
|
||||||
throw std::runtime_error("PeerConnection is not open");
|
throw std::runtime_error("PeerConnection is not open");
|
||||||
|
|
||||||
std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)->send(message);
|
std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)->sendMedia(message);
|
||||||
#else
|
#else
|
||||||
PLOG_WARNING << "Ignoring sent media (not compiled with SRTP support)";
|
PLOG_WARNING << "Ignoring sent media (not compiled with SRTP support)";
|
||||||
#endif
|
#endif
|
||||||
@ -445,21 +449,18 @@ void PeerConnection::closeTransports() {
|
|||||||
auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
|
auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
|
||||||
auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
|
auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
|
||||||
auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
|
auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
|
||||||
if (sctp || dtls || ice) {
|
ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
|
||||||
std::thread t([sctp, dtls, ice, token = mInitToken]() mutable {
|
if (sctp)
|
||||||
if (sctp)
|
sctp->stop();
|
||||||
sctp->stop();
|
if (dtls)
|
||||||
if (dtls)
|
dtls->stop();
|
||||||
dtls->stop();
|
if (ice)
|
||||||
if (ice)
|
ice->stop();
|
||||||
ice->stop();
|
|
||||||
|
|
||||||
sctp.reset();
|
sctp.reset();
|
||||||
dtls.reset();
|
dtls.reset();
|
||||||
ice.reset();
|
ice.reset();
|
||||||
});
|
});
|
||||||
t.detach();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void PeerConnection::endLocalCandidates() {
|
void PeerConnection::endLocalCandidates() {
|
||||||
@ -502,7 +503,7 @@ void PeerConnection::forwardMessage(message_ptr message) {
|
|||||||
mDataChannels.insert(std::make_pair(message->stream, channel));
|
mDataChannels.insert(std::make_pair(message->stream, channel));
|
||||||
} else {
|
} else {
|
||||||
// Invalid, close the DataChannel
|
// Invalid, close the DataChannel
|
||||||
sctpTransport->close(message->stream);
|
sctpTransport->closeStream(message->stream);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -594,20 +595,26 @@ void PeerConnection::remoteCloseDataChannels() {
|
|||||||
|
|
||||||
void PeerConnection::processLocalDescription(Description description) {
|
void PeerConnection::processLocalDescription(Description description) {
|
||||||
std::optional<uint16_t> remoteSctpPort;
|
std::optional<uint16_t> remoteSctpPort;
|
||||||
if (auto remote = remoteDescription())
|
std::optional<string> remoteDataMid;
|
||||||
remoteSctpPort = remote->sctpPort();
|
if (auto remote = remoteDescription()) {
|
||||||
|
remoteDataMid = remote->dataMid();
|
||||||
|
remoteSctpPort = remote->sctpPort();
|
||||||
|
}
|
||||||
|
|
||||||
auto certificate = mCertificate.get(); // wait for certificate if not ready
|
auto certificate = mCertificate.get(); // wait for certificate if not ready
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mLocalDescriptionMutex);
|
std::lock_guard lock(mLocalDescriptionMutex);
|
||||||
mLocalDescription.emplace(std::move(description));
|
mLocalDescription.emplace(std::move(description));
|
||||||
|
if (remoteDataMid)
|
||||||
|
mLocalDescription->setDataMid(*remoteDataMid);
|
||||||
|
|
||||||
mLocalDescription->setFingerprint(certificate->fingerprint());
|
mLocalDescription->setFingerprint(certificate->fingerprint());
|
||||||
mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
|
mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
|
||||||
mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
|
mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
mLocalDescriptionCallback(*mLocalDescription);
|
mProcessor->enqueue([this]() { mLocalDescriptionCallback(*mLocalDescription); });
|
||||||
}
|
}
|
||||||
|
|
||||||
void PeerConnection::processLocalCandidate(Candidate candidate) {
|
void PeerConnection::processLocalCandidate(Candidate candidate) {
|
||||||
@ -617,7 +624,8 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
|
|||||||
|
|
||||||
mLocalDescription->addCandidate(candidate);
|
mLocalDescription->addCandidate(candidate);
|
||||||
|
|
||||||
mLocalCandidateCallback(candidate);
|
mProcessor->enqueue(
|
||||||
|
[this, candidate = std::move(candidate)]() { mLocalCandidateCallback(candidate); });
|
||||||
}
|
}
|
||||||
|
|
||||||
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
|
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
|
||||||
@ -625,7 +633,8 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
|
|||||||
if (!dataChannel)
|
if (!dataChannel)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
mDataChannelCallback(dataChannel);
|
mProcessor->enqueue(
|
||||||
|
[this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
|
||||||
}
|
}
|
||||||
|
|
||||||
bool PeerConnection::changeState(State state) {
|
bool PeerConnection::changeState(State state) {
|
||||||
@ -639,13 +648,13 @@ bool PeerConnection::changeState(State state) {
|
|||||||
|
|
||||||
} while (!mState.compare_exchange_weak(current, state));
|
} while (!mState.compare_exchange_weak(current, state));
|
||||||
|
|
||||||
mStateChangeCallback(state);
|
mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool PeerConnection::changeGatheringState(GatheringState state) {
|
bool PeerConnection::changeGatheringState(GatheringState state) {
|
||||||
if (mGatheringState.exchange(state) != state)
|
if (mGatheringState.exchange(state) != state)
|
||||||
mGatheringStateChangeCallback(state);
|
mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
44
src/processor.cpp
Normal file
44
src/processor.cpp
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2020 Paul-Louis Ageneau
|
||||||
|
*
|
||||||
|
* This library is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU Lesser General Public
|
||||||
|
* License as published by the Free Software Foundation; either
|
||||||
|
* version 2.1 of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This library is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||||
|
* Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public
|
||||||
|
* License along with this library; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "processor.hpp"
|
||||||
|
|
||||||
|
namespace rtc {
|
||||||
|
|
||||||
|
Processor::~Processor() { join(); }
|
||||||
|
|
||||||
|
void Processor::join() {
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
|
mCondition.wait(lock, [this]() { return !mPending && mTasks.empty(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void Processor::schedule() {
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
|
if (mTasks.empty()) {
|
||||||
|
// No more tasks
|
||||||
|
mPending = false;
|
||||||
|
mCondition.notify_all();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ThreadPool::Instance().enqueue(std::move(mTasks.front()));
|
||||||
|
mTasks.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace rtc
|
||||||
|
|
92
src/processor.hpp
Normal file
92
src/processor.hpp
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2020 Paul-Louis Ageneau
|
||||||
|
*
|
||||||
|
* This library is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU Lesser General Public
|
||||||
|
* License as published by the Free Software Foundation; either
|
||||||
|
* version 2.1 of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This library is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||||
|
* Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public
|
||||||
|
* License along with this library; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef RTC_PROCESSOR_H
|
||||||
|
#define RTC_PROCESSOR_H
|
||||||
|
|
||||||
|
#include "include.hpp"
|
||||||
|
#include "init.hpp"
|
||||||
|
#include "threadpool.hpp"
|
||||||
|
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <future>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
|
namespace rtc {
|
||||||
|
|
||||||
|
// Processed tasks in order by delegating them to the thread pool
|
||||||
|
class Processor final {
|
||||||
|
public:
|
||||||
|
Processor() = default;
|
||||||
|
~Processor();
|
||||||
|
|
||||||
|
Processor(const Processor &) = delete;
|
||||||
|
Processor &operator=(const Processor &) = delete;
|
||||||
|
Processor(Processor &&) = delete;
|
||||||
|
Processor &operator=(Processor &&) = delete;
|
||||||
|
|
||||||
|
void join();
|
||||||
|
|
||||||
|
template <class F, class... Args>
|
||||||
|
auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void schedule();
|
||||||
|
|
||||||
|
// Keep an init token
|
||||||
|
const init_token mInitToken = Init::Token();
|
||||||
|
|
||||||
|
std::queue<std::function<void()>> mTasks;
|
||||||
|
bool mPending = false; // true iff a task is pending in the thread pool
|
||||||
|
|
||||||
|
mutable std::mutex mMutex;
|
||||||
|
std::condition_variable mCondition;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <class F, class... Args>
|
||||||
|
auto Processor::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
|
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
|
||||||
|
auto task = std::make_shared<std::packaged_task<R()>>(
|
||||||
|
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
|
||||||
|
std::future<R> result = task->get_future();
|
||||||
|
|
||||||
|
auto bundle = [this, task = std::move(task)]() {
|
||||||
|
try {
|
||||||
|
(*task)();
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
PLOG_WARNING << "Unhandled exception in task: " << e.what();
|
||||||
|
}
|
||||||
|
schedule(); // chain the next task
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!mPending) {
|
||||||
|
ThreadPool::Instance().enqueue(std::move(bundle));
|
||||||
|
mPending = true;
|
||||||
|
} else {
|
||||||
|
mTasks.emplace(std::move(bundle));
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace rtc
|
||||||
|
|
||||||
|
#endif
|
68
src/rtc.cpp
68
src/rtc.cpp
@ -55,18 +55,15 @@ std::unordered_map<int, void *> userPointerMap;
|
|||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
int lastId = 0;
|
int lastId = 0;
|
||||||
|
|
||||||
void *getUserPointer(int id) {
|
std::optional<void *> getUserPointer(int id) {
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
auto it = userPointerMap.find(id);
|
auto it = userPointerMap.find(id);
|
||||||
return it != userPointerMap.end() ? it->second : nullptr;
|
return it != userPointerMap.end() ? std::make_optional(it->second) : nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setUserPointer(int i, void *ptr) {
|
void setUserPointer(int i, void *ptr) {
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
if (ptr)
|
userPointerMap[i] = ptr;
|
||||||
userPointerMap[i] = ptr;
|
|
||||||
else
|
|
||||||
userPointerMap.erase(i);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
shared_ptr<PeerConnection> getPeerConnection(int id) {
|
shared_ptr<PeerConnection> getPeerConnection(int id) {
|
||||||
@ -89,6 +86,7 @@ int emplacePeerConnection(shared_ptr<PeerConnection> ptr) {
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
int pc = ++lastId;
|
int pc = ++lastId;
|
||||||
peerConnectionMap.emplace(std::make_pair(pc, ptr));
|
peerConnectionMap.emplace(std::make_pair(pc, ptr));
|
||||||
|
userPointerMap.emplace(std::make_pair(pc, nullptr));
|
||||||
return pc;
|
return pc;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,6 +94,7 @@ int emplaceDataChannel(shared_ptr<DataChannel> ptr) {
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
int dc = ++lastId;
|
int dc = ++lastId;
|
||||||
dataChannelMap.emplace(std::make_pair(dc, ptr));
|
dataChannelMap.emplace(std::make_pair(dc, ptr));
|
||||||
|
userPointerMap.emplace(std::make_pair(dc, nullptr));
|
||||||
return dc;
|
return dc;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,6 +125,7 @@ int emplaceWebSocket(shared_ptr<WebSocket> ptr) {
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
int ws = ++lastId;
|
int ws = ++lastId;
|
||||||
webSocketMap.emplace(std::make_pair(ws, ptr));
|
webSocketMap.emplace(std::make_pair(ws, ptr));
|
||||||
|
userPointerMap.emplace(std::make_pair(ws, nullptr));
|
||||||
return ws;
|
return ws;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,7 +244,8 @@ int rtcCreateDataChannel(int pc, const char *label) {
|
|||||||
return WRAP({
|
return WRAP({
|
||||||
auto peerConnection = getPeerConnection(pc);
|
auto peerConnection = getPeerConnection(pc);
|
||||||
int dc = emplaceDataChannel(peerConnection->createDataChannel(string(label)));
|
int dc = emplaceDataChannel(peerConnection->createDataChannel(string(label)));
|
||||||
rtcSetUserPointer(dc, getUserPointer(pc));
|
if (auto ptr = getUserPointer(pc))
|
||||||
|
rtcSetUserPointer(dc, *ptr);
|
||||||
return dc;
|
return dc;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -293,9 +294,10 @@ int rtcSetDataChannelCallback(int pc, rtcDataChannelCallbackFunc cb) {
|
|||||||
if (cb)
|
if (cb)
|
||||||
peerConnection->onDataChannel([pc, cb](std::shared_ptr<DataChannel> dataChannel) {
|
peerConnection->onDataChannel([pc, cb](std::shared_ptr<DataChannel> dataChannel) {
|
||||||
int dc = emplaceDataChannel(dataChannel);
|
int dc = emplaceDataChannel(dataChannel);
|
||||||
void *ptr = getUserPointer(pc);
|
if (auto ptr = getUserPointer(pc)) {
|
||||||
rtcSetUserPointer(dc, ptr);
|
rtcSetUserPointer(dc, *ptr);
|
||||||
cb(dc, ptr);
|
cb(dc, *ptr);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
else
|
else
|
||||||
peerConnection->onDataChannel(nullptr);
|
peerConnection->onDataChannel(nullptr);
|
||||||
@ -307,7 +309,8 @@ int rtcSetLocalDescriptionCallback(int pc, rtcDescriptionCallbackFunc cb) {
|
|||||||
auto peerConnection = getPeerConnection(pc);
|
auto peerConnection = getPeerConnection(pc);
|
||||||
if (cb)
|
if (cb)
|
||||||
peerConnection->onLocalDescription([pc, cb](const Description &desc) {
|
peerConnection->onLocalDescription([pc, cb](const Description &desc) {
|
||||||
cb(string(desc).c_str(), desc.typeString().c_str(), getUserPointer(pc));
|
if (auto ptr = getUserPointer(pc))
|
||||||
|
cb(string(desc).c_str(), desc.typeString().c_str(), *ptr);
|
||||||
});
|
});
|
||||||
else
|
else
|
||||||
peerConnection->onLocalDescription(nullptr);
|
peerConnection->onLocalDescription(nullptr);
|
||||||
@ -319,7 +322,8 @@ int rtcSetLocalCandidateCallback(int pc, rtcCandidateCallbackFunc cb) {
|
|||||||
auto peerConnection = getPeerConnection(pc);
|
auto peerConnection = getPeerConnection(pc);
|
||||||
if (cb)
|
if (cb)
|
||||||
peerConnection->onLocalCandidate([pc, cb](const Candidate &cand) {
|
peerConnection->onLocalCandidate([pc, cb](const Candidate &cand) {
|
||||||
cb(cand.candidate().c_str(), cand.mid().c_str(), getUserPointer(pc));
|
if (auto ptr = getUserPointer(pc))
|
||||||
|
cb(cand.candidate().c_str(), cand.mid().c_str(), *ptr);
|
||||||
});
|
});
|
||||||
else
|
else
|
||||||
peerConnection->onLocalCandidate(nullptr);
|
peerConnection->onLocalCandidate(nullptr);
|
||||||
@ -331,7 +335,8 @@ int rtcSetStateChangeCallback(int pc, rtcStateChangeCallbackFunc cb) {
|
|||||||
auto peerConnection = getPeerConnection(pc);
|
auto peerConnection = getPeerConnection(pc);
|
||||||
if (cb)
|
if (cb)
|
||||||
peerConnection->onStateChange([pc, cb](PeerConnection::State state) {
|
peerConnection->onStateChange([pc, cb](PeerConnection::State state) {
|
||||||
cb(static_cast<rtcState>(state), getUserPointer(pc));
|
if (auto ptr = getUserPointer(pc))
|
||||||
|
cb(static_cast<rtcState>(state), *ptr);
|
||||||
});
|
});
|
||||||
else
|
else
|
||||||
peerConnection->onStateChange(nullptr);
|
peerConnection->onStateChange(nullptr);
|
||||||
@ -343,7 +348,8 @@ int rtcSetGatheringStateChangeCallback(int pc, rtcGatheringStateCallbackFunc cb)
|
|||||||
auto peerConnection = getPeerConnection(pc);
|
auto peerConnection = getPeerConnection(pc);
|
||||||
if (cb)
|
if (cb)
|
||||||
peerConnection->onGatheringStateChange([pc, cb](PeerConnection::GatheringState state) {
|
peerConnection->onGatheringStateChange([pc, cb](PeerConnection::GatheringState state) {
|
||||||
cb(static_cast<rtcGatheringState>(state), getUserPointer(pc));
|
if (auto ptr = getUserPointer(pc))
|
||||||
|
cb(static_cast<rtcGatheringState>(state), *ptr);
|
||||||
});
|
});
|
||||||
else
|
else
|
||||||
peerConnection->onGatheringStateChange(nullptr);
|
peerConnection->onGatheringStateChange(nullptr);
|
||||||
@ -435,7 +441,10 @@ int rtcSetOpenCallback(int id, rtcOpenCallbackFunc cb) {
|
|||||||
return WRAP({
|
return WRAP({
|
||||||
auto channel = getChannel(id);
|
auto channel = getChannel(id);
|
||||||
if (cb)
|
if (cb)
|
||||||
channel->onOpen([id, cb]() { cb(getUserPointer(id)); });
|
channel->onOpen([id, cb]() {
|
||||||
|
if (auto ptr = getUserPointer(id))
|
||||||
|
cb(*ptr);
|
||||||
|
});
|
||||||
else
|
else
|
||||||
channel->onOpen(nullptr);
|
channel->onOpen(nullptr);
|
||||||
});
|
});
|
||||||
@ -445,7 +454,10 @@ int rtcSetClosedCallback(int id, rtcClosedCallbackFunc cb) {
|
|||||||
return WRAP({
|
return WRAP({
|
||||||
auto channel = getChannel(id);
|
auto channel = getChannel(id);
|
||||||
if (cb)
|
if (cb)
|
||||||
channel->onClosed([id, cb]() { cb(getUserPointer(id)); });
|
channel->onClosed([id, cb]() {
|
||||||
|
if (auto ptr = getUserPointer(id))
|
||||||
|
cb(*ptr);
|
||||||
|
});
|
||||||
else
|
else
|
||||||
channel->onClosed(nullptr);
|
channel->onClosed(nullptr);
|
||||||
});
|
});
|
||||||
@ -455,8 +467,10 @@ int rtcSetErrorCallback(int id, rtcErrorCallbackFunc cb) {
|
|||||||
return WRAP({
|
return WRAP({
|
||||||
auto channel = getChannel(id);
|
auto channel = getChannel(id);
|
||||||
if (cb)
|
if (cb)
|
||||||
channel->onError(
|
channel->onError([id, cb](const string &error) {
|
||||||
[id, cb](const string &error) { cb(error.c_str(), getUserPointer(id)); });
|
if (auto ptr = getUserPointer(id))
|
||||||
|
cb(error.c_str(), *ptr);
|
||||||
|
});
|
||||||
else
|
else
|
||||||
channel->onError(nullptr);
|
channel->onError(nullptr);
|
||||||
});
|
});
|
||||||
@ -468,9 +482,13 @@ int rtcSetMessageCallback(int id, rtcMessageCallbackFunc cb) {
|
|||||||
if (cb)
|
if (cb)
|
||||||
channel->onMessage(
|
channel->onMessage(
|
||||||
[id, cb](const binary &b) {
|
[id, cb](const binary &b) {
|
||||||
cb(reinterpret_cast<const char *>(b.data()), int(b.size()), getUserPointer(id));
|
if (auto ptr = getUserPointer(id))
|
||||||
|
cb(reinterpret_cast<const char *>(b.data()), int(b.size()), *ptr);
|
||||||
},
|
},
|
||||||
[id, cb](const string &s) { cb(s.c_str(), -1, getUserPointer(id)); });
|
[id, cb](const string &s) {
|
||||||
|
if (auto ptr = getUserPointer(id))
|
||||||
|
cb(s.c_str(), -int(s.size() + 1), *ptr);
|
||||||
|
});
|
||||||
else
|
else
|
||||||
channel->onMessage(nullptr);
|
channel->onMessage(nullptr);
|
||||||
});
|
});
|
||||||
@ -514,7 +532,10 @@ int rtcSetBufferedAmountLowCallback(int id, rtcBufferedAmountLowCallbackFunc cb)
|
|||||||
return WRAP({
|
return WRAP({
|
||||||
auto channel = getChannel(id);
|
auto channel = getChannel(id);
|
||||||
if (cb)
|
if (cb)
|
||||||
channel->onBufferedAmountLow([id, cb]() { cb(getUserPointer(id)); });
|
channel->onBufferedAmountLow([id, cb]() {
|
||||||
|
if (auto ptr = getUserPointer(id))
|
||||||
|
cb(*ptr);
|
||||||
|
});
|
||||||
else
|
else
|
||||||
channel->onBufferedAmountLow(nullptr);
|
channel->onBufferedAmountLow(nullptr);
|
||||||
});
|
});
|
||||||
@ -528,7 +549,10 @@ int rtcSetAvailableCallback(int id, rtcAvailableCallbackFunc cb) {
|
|||||||
return WRAP({
|
return WRAP({
|
||||||
auto channel = getChannel(id);
|
auto channel = getChannel(id);
|
||||||
if (cb)
|
if (cb)
|
||||||
channel->onOpen([id, cb]() { cb(getUserPointer(id)); });
|
channel->onOpen([id, cb]() {
|
||||||
|
if (auto ptr = getUserPointer(id))
|
||||||
|
cb(*ptr);
|
||||||
|
});
|
||||||
else
|
else
|
||||||
channel->onOpen(nullptr);
|
channel->onOpen(nullptr);
|
||||||
});
|
});
|
||||||
|
@ -50,6 +50,9 @@ using std::shared_ptr;
|
|||||||
|
|
||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
|
std::unordered_set<SctpTransport *> SctpTransport::Instances;
|
||||||
|
std::shared_mutex SctpTransport::InstancesMutex;
|
||||||
|
|
||||||
void SctpTransport::Init() {
|
void SctpTransport::Init() {
|
||||||
usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
|
usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
|
||||||
usrsctp_sysctl_set_sctp_ecn_enable(0);
|
usrsctp_sysctl_set_sctp_ecn_enable(0);
|
||||||
@ -92,6 +95,11 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
|||||||
PLOG_DEBUG << "Initializing SCTP transport";
|
PLOG_DEBUG << "Initializing SCTP transport";
|
||||||
|
|
||||||
usrsctp_register_address(this);
|
usrsctp_register_address(this);
|
||||||
|
{
|
||||||
|
std::unique_lock lock(InstancesMutex);
|
||||||
|
Instances.insert(this);
|
||||||
|
}
|
||||||
|
|
||||||
mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, &SctpTransport::RecvCallback,
|
mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, &SctpTransport::RecvCallback,
|
||||||
&SctpTransport::SendCallback, 0, this);
|
&SctpTransport::SendCallback, 0, this);
|
||||||
if (!mSock)
|
if (!mSock)
|
||||||
@ -186,14 +194,21 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
|||||||
|
|
||||||
SctpTransport::~SctpTransport() {
|
SctpTransport::~SctpTransport() {
|
||||||
stop();
|
stop();
|
||||||
|
close();
|
||||||
if (mSock)
|
|
||||||
usrsctp_close(mSock);
|
|
||||||
|
|
||||||
usrsctp_deregister_address(this);
|
usrsctp_deregister_address(this);
|
||||||
|
{
|
||||||
|
std::unique_lock lock(InstancesMutex);
|
||||||
|
Instances.erase(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SctpTransport::stop() {
|
bool SctpTransport::stop() {
|
||||||
|
// Transport::stop() will unregister incoming() from the lower layer, therefore we need to make
|
||||||
|
// sure the thread from lower layers is not blocked in incoming() by the WrittenOnce condition.
|
||||||
|
mWrittenOnce = true;
|
||||||
|
mWrittenCondition.notify_all();
|
||||||
|
|
||||||
if (!Transport::stop())
|
if (!Transport::stop())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
@ -204,6 +219,13 @@ bool SctpTransport::stop() {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SctpTransport::close() {
|
||||||
|
if (mSock) {
|
||||||
|
usrsctp_close(mSock);
|
||||||
|
mSock = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void SctpTransport::connect() {
|
void SctpTransport::connect() {
|
||||||
if (!mSock)
|
if (!mSock)
|
||||||
return;
|
return;
|
||||||
@ -240,9 +262,7 @@ void SctpTransport::shutdown() {
|
|||||||
PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
|
PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// close() abort the connection when linger is disabled, call it now
|
close();
|
||||||
usrsctp_close(mSock);
|
|
||||||
mSock = nullptr;
|
|
||||||
|
|
||||||
PLOG_INFO << "SCTP disconnected";
|
PLOG_INFO << "SCTP disconnected";
|
||||||
changeState(State::Disconnected);
|
changeState(State::Disconnected);
|
||||||
@ -266,7 +286,7 @@ bool SctpTransport::send(message_ptr message) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SctpTransport::close(unsigned int stream) {
|
void SctpTransport::closeStream(unsigned int stream) {
|
||||||
send(make_message(0, Message::Reset, uint16_t(stream)));
|
send(make_message(0, Message::Reset, uint16_t(stream)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -405,7 +425,7 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
|
|||||||
try {
|
try {
|
||||||
mBufferedAmountCallback(streamId, amount);
|
mBufferedAmountCallback(streamId, amount);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
|
PLOG_DEBUG << "SCTP buffered amount callback: " << e.what();
|
||||||
}
|
}
|
||||||
mSendMutex.lock();
|
mSendMutex.lock();
|
||||||
}
|
}
|
||||||
@ -502,9 +522,9 @@ int SctpTransport::handleSend(size_t free) {
|
|||||||
|
|
||||||
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
|
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
|
||||||
try {
|
try {
|
||||||
|
std::unique_lock lock(mWriteMutex);
|
||||||
PLOG_VERBOSE << "Handle write, len=" << len;
|
PLOG_VERBOSE << "Handle write, len=" << len;
|
||||||
|
|
||||||
std::unique_lock lock(mWriteMutex);
|
|
||||||
if (!outgoing(make_message(data, data + len)))
|
if (!outgoing(make_message(data, data + len)))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
@ -628,7 +648,7 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
|
|||||||
if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
|
if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
|
||||||
for (int i = 0; i < count; ++i) {
|
for (int i = 0; i < count; ++i) {
|
||||||
uint16_t streamId = reset_event.strreset_stream_list[i];
|
uint16_t streamId = reset_event.strreset_stream_list[i];
|
||||||
close(streamId);
|
closeStream(streamId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
|
if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
|
||||||
@ -686,7 +706,15 @@ int SctpTransport::SendCallback(struct socket *sock, uint32_t sb_free) {
|
|||||||
|
|
||||||
auto sconn = reinterpret_cast<struct sockaddr_conn *>(&paddrinfo.spinfo_address);
|
auto sconn = reinterpret_cast<struct sockaddr_conn *>(&paddrinfo.spinfo_address);
|
||||||
void *ptr = sconn->sconn_addr;
|
void *ptr = sconn->sconn_addr;
|
||||||
return static_cast<SctpTransport *>(ptr)->handleSend(size_t(sb_free));
|
auto *transport = static_cast<SctpTransport *>(ptr);
|
||||||
|
|
||||||
|
// Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
|
||||||
|
// https://github.com/sctplab/usrsctp/issues/405
|
||||||
|
std::shared_lock lock(InstancesMutex);
|
||||||
|
if (Instances.find(transport) == Instances.end())
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
return transport->handleSend(size_t(sb_free));
|
||||||
}
|
}
|
||||||
|
|
||||||
int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
|
int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
|
||||||
|
@ -28,6 +28,8 @@
|
|||||||
#include <functional>
|
#include <functional>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <shared_mutex>
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
#include "usrsctp.h"
|
#include "usrsctp.h"
|
||||||
|
|
||||||
@ -46,7 +48,7 @@ public:
|
|||||||
|
|
||||||
bool stop() override;
|
bool stop() override;
|
||||||
bool send(message_ptr message) override; // false if buffered
|
bool send(message_ptr message) override; // false if buffered
|
||||||
void close(unsigned int stream);
|
void closeStream(unsigned int stream);
|
||||||
void flush();
|
void flush();
|
||||||
|
|
||||||
// Stats
|
// Stats
|
||||||
@ -70,6 +72,7 @@ private:
|
|||||||
|
|
||||||
void connect();
|
void connect();
|
||||||
void shutdown();
|
void shutdown();
|
||||||
|
void close();
|
||||||
void incoming(message_ptr message) override;
|
void incoming(message_ptr message) override;
|
||||||
|
|
||||||
bool trySendQueue();
|
bool trySendQueue();
|
||||||
@ -109,6 +112,9 @@ private:
|
|||||||
struct sctp_rcvinfo recv_info, int flags, void *user_data);
|
struct sctp_rcvinfo recv_info, int flags, void *user_data);
|
||||||
static int SendCallback(struct socket *sock, uint32_t sb_free);
|
static int SendCallback(struct socket *sock, uint32_t sb_free);
|
||||||
static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df);
|
static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df);
|
||||||
|
|
||||||
|
static std::unordered_set<SctpTransport *> Instances;
|
||||||
|
static std::shared_mutex InstancesMutex;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
@ -38,8 +38,8 @@ SelectInterrupter::SelectInterrupter() {
|
|||||||
throw std::runtime_error("Failed to create pipe");
|
throw std::runtime_error("Failed to create pipe");
|
||||||
::fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
|
::fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
|
||||||
::fcntl(pipefd[1], F_SETFL, O_NONBLOCK);
|
::fcntl(pipefd[1], F_SETFL, O_NONBLOCK);
|
||||||
mPipeOut = pipefd[0]; // read
|
mPipeOut = pipefd[1]; // read
|
||||||
mPipeIn = pipefd[1]; // write
|
mPipeIn = pipefd[0]; // write
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,11 +62,8 @@ int SelectInterrupter::prepare(fd_set &readfds, fd_set &writefds) {
|
|||||||
FD_SET(mDummySock, &readfds);
|
FD_SET(mDummySock, &readfds);
|
||||||
return SOCKET_TO_INT(mDummySock) + 1;
|
return SOCKET_TO_INT(mDummySock) + 1;
|
||||||
#else
|
#else
|
||||||
int ret;
|
char dummy;
|
||||||
do {
|
::read(mPipeIn, &dummy, 1);
|
||||||
char dummy;
|
|
||||||
ret = ::read(mPipeIn, &dummy, 1);
|
|
||||||
} while (ret > 0);
|
|
||||||
FD_SET(mPipeIn, &readfds);
|
FD_SET(mPipeIn, &readfds);
|
||||||
return mPipeIn + 1;
|
return mPipeIn + 1;
|
||||||
#endif
|
#endif
|
||||||
@ -107,6 +104,7 @@ bool TcpTransport::stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool TcpTransport::send(message_ptr message) {
|
bool TcpTransport::send(message_ptr message) {
|
||||||
|
std::unique_lock lock(mSockMutex);
|
||||||
if (state() != State::Connected)
|
if (state() != State::Connected)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
@ -126,6 +124,7 @@ void TcpTransport::incoming(message_ptr message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool TcpTransport::outgoing(message_ptr message) {
|
bool TcpTransport::outgoing(message_ptr message) {
|
||||||
|
// mSockMutex must be locked
|
||||||
// If nothing is pending, try to send directly
|
// If nothing is pending, try to send directly
|
||||||
// It's safe because if the queue is empty, the thread is not sending
|
// It's safe because if the queue is empty, the thread is not sending
|
||||||
if (mSendQueue.empty() && trySendMessage(message))
|
if (mSendQueue.empty() && trySendMessage(message))
|
||||||
@ -174,6 +173,7 @@ void TcpTransport::connect(const string &hostname, const string &service) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
||||||
|
std::unique_lock lock(mSockMutex);
|
||||||
try {
|
try {
|
||||||
char node[MAX_NUMERICNODE_LEN];
|
char node[MAX_NUMERICNODE_LEN];
|
||||||
char serv[MAX_NUMERICSERV_LEN];
|
char serv[MAX_NUMERICSERV_LEN];
|
||||||
@ -248,15 +248,18 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void TcpTransport::close() {
|
void TcpTransport::close() {
|
||||||
|
std::unique_lock lock(mSockMutex);
|
||||||
if (mSock != INVALID_SOCKET) {
|
if (mSock != INVALID_SOCKET) {
|
||||||
PLOG_DEBUG << "Closing TCP socket";
|
PLOG_DEBUG << "Closing TCP socket";
|
||||||
::closesocket(mSock);
|
::closesocket(mSock);
|
||||||
mSock = INVALID_SOCKET;
|
mSock = INVALID_SOCKET;
|
||||||
}
|
}
|
||||||
changeState(State::Disconnected);
|
changeState(State::Disconnected);
|
||||||
|
interruptSelect();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TcpTransport::trySendQueue() {
|
bool TcpTransport::trySendQueue() {
|
||||||
|
// mSockMutex must be locked
|
||||||
while (auto next = mSendQueue.peek()) {
|
while (auto next = mSendQueue.peek()) {
|
||||||
auto message = *next;
|
auto message = *next;
|
||||||
if (!trySendMessage(message)) {
|
if (!trySendMessage(message)) {
|
||||||
@ -269,6 +272,7 @@ bool TcpTransport::trySendQueue() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool TcpTransport::trySendMessage(message_ptr &message) {
|
bool TcpTransport::trySendMessage(message_ptr &message) {
|
||||||
|
// mSockMutex must be locked
|
||||||
auto data = reinterpret_cast<const char *>(message->data());
|
auto data = reinterpret_cast<const char *>(message->data());
|
||||||
auto size = message->size();
|
auto size = message->size();
|
||||||
while (size) {
|
while (size) {
|
||||||
@ -314,13 +318,22 @@ void TcpTransport::runLoop() {
|
|||||||
changeState(State::Connected);
|
changeState(State::Connected);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
std::unique_lock lock(mSockMutex);
|
||||||
|
if (mSock == INVALID_SOCKET)
|
||||||
|
break;
|
||||||
|
|
||||||
fd_set readfds, writefds;
|
fd_set readfds, writefds;
|
||||||
int n = prepareSelect(readfds, writefds);
|
int n = prepareSelect(readfds, writefds);
|
||||||
|
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
tv.tv_sec = 10;
|
tv.tv_sec = 10;
|
||||||
tv.tv_usec = 0;
|
tv.tv_usec = 0;
|
||||||
|
lock.unlock();
|
||||||
int ret = ::select(n, &readfds, &writefds, NULL, &tv);
|
int ret = ::select(n, &readfds, &writefds, NULL, &tv);
|
||||||
|
lock.lock();
|
||||||
|
if (mSock == INVALID_SOCKET)
|
||||||
|
break;
|
||||||
|
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
throw std::runtime_error("Failed to wait on socket");
|
throw std::runtime_error("Failed to wait on socket");
|
||||||
} else if (ret == 0) {
|
} else if (ret == 0) {
|
||||||
|
@ -78,6 +78,7 @@ private:
|
|||||||
string mHostname, mService;
|
string mHostname, mService;
|
||||||
|
|
||||||
socket_t mSock = INVALID_SOCKET;
|
socket_t mSock = INVALID_SOCKET;
|
||||||
|
std::mutex mSockMutex;
|
||||||
std::thread mThread;
|
std::thread mThread;
|
||||||
SelectInterrupter mInterrupter;
|
SelectInterrupter mInterrupter;
|
||||||
Queue<message_ptr> mSendQueue;
|
Queue<message_ptr> mSendQueue;
|
||||||
|
81
src/threadpool.cpp
Normal file
81
src/threadpool.cpp
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2020 Paul-Louis Ageneau
|
||||||
|
*
|
||||||
|
* This library is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU Lesser General Public
|
||||||
|
* License as published by the Free Software Foundation; either
|
||||||
|
* version 2.1 of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This library is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||||
|
* Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public
|
||||||
|
* License along with this library; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "threadpool.hpp"
|
||||||
|
|
||||||
|
namespace rtc {
|
||||||
|
|
||||||
|
ThreadPool &ThreadPool::Instance() {
|
||||||
|
static ThreadPool instance;
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
ThreadPool::~ThreadPool() { join(); }
|
||||||
|
|
||||||
|
int ThreadPool::count() const {
|
||||||
|
std::unique_lock lock(mWorkersMutex);
|
||||||
|
return mWorkers.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadPool::spawn(int count) {
|
||||||
|
std::unique_lock lock(mWorkersMutex);
|
||||||
|
mJoining = false;
|
||||||
|
while (count-- > 0)
|
||||||
|
mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadPool::join() {
|
||||||
|
std::unique_lock lock(mWorkersMutex);
|
||||||
|
mJoining = true;
|
||||||
|
mCondition.notify_all();
|
||||||
|
|
||||||
|
for (auto &w : mWorkers)
|
||||||
|
w.join();
|
||||||
|
|
||||||
|
mWorkers.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadPool::run() {
|
||||||
|
while (runOne()) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ThreadPool::runOne() {
|
||||||
|
if (auto task = dequeue()) {
|
||||||
|
try {
|
||||||
|
task();
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
PLOG_WARNING << "Unhandled exception in task: " << e.what();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::function<void()> ThreadPool::dequeue() {
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
|
mCondition.wait(lock, [this]() { return !mTasks.empty() || mJoining; });
|
||||||
|
if (mTasks.empty())
|
||||||
|
return nullptr;
|
||||||
|
auto task = std::move(mTasks.front());
|
||||||
|
mTasks.pop();
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace rtc
|
||||||
|
|
87
src/threadpool.hpp
Normal file
87
src/threadpool.hpp
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2020 Paul-Louis Ageneau
|
||||||
|
*
|
||||||
|
* This library is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU Lesser General Public
|
||||||
|
* License as published by the Free Software Foundation; either
|
||||||
|
* version 2.1 of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This library is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||||
|
* Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public
|
||||||
|
* License along with this library; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef RTC_THREADPOOL_H
|
||||||
|
#define RTC_THREADPOOL_H
|
||||||
|
|
||||||
|
#include "include.hpp"
|
||||||
|
#include "init.hpp"
|
||||||
|
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <functional>
|
||||||
|
#include <future>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace rtc {
|
||||||
|
|
||||||
|
template <class F, class... Args>
|
||||||
|
using invoke_future_t = std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>>;
|
||||||
|
|
||||||
|
class ThreadPool final {
|
||||||
|
public:
|
||||||
|
static ThreadPool &Instance();
|
||||||
|
|
||||||
|
ThreadPool(const ThreadPool &) = delete;
|
||||||
|
ThreadPool &operator=(const ThreadPool &) = delete;
|
||||||
|
ThreadPool(ThreadPool &&) = delete;
|
||||||
|
ThreadPool &operator=(ThreadPool &&) = delete;
|
||||||
|
|
||||||
|
int count() const;
|
||||||
|
void spawn(int count = 1);
|
||||||
|
void join();
|
||||||
|
void run();
|
||||||
|
bool runOne();
|
||||||
|
|
||||||
|
template <class F, class... Args>
|
||||||
|
auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
ThreadPool() = default;
|
||||||
|
~ThreadPool();
|
||||||
|
|
||||||
|
std::function<void()> dequeue(); // returns null function if joining
|
||||||
|
|
||||||
|
std::vector<std::thread> mWorkers;
|
||||||
|
std::queue<std::function<void()>> mTasks;
|
||||||
|
std::atomic<bool> mJoining = false;
|
||||||
|
|
||||||
|
mutable std::mutex mMutex, mWorkersMutex;
|
||||||
|
std::condition_variable mCondition;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <class F, class... Args>
|
||||||
|
auto ThreadPool::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
|
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
|
||||||
|
auto task = std::make_shared<std::packaged_task<R()>>(
|
||||||
|
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
|
||||||
|
std::future<R> result = task->get_future();
|
||||||
|
|
||||||
|
mTasks.emplace([task = std::move(task), token = Init::Token()]() { return (*task)(); });
|
||||||
|
mCondition.notify_one();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace rtc
|
||||||
|
|
||||||
|
#endif
|
@ -86,9 +86,8 @@ void init() {
|
|||||||
|
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
if (!done) {
|
if (!done) {
|
||||||
OPENSSL_init_ssl(0, NULL);
|
OPENSSL_init_ssl(OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, nullptr);
|
||||||
SSL_load_error_strings();
|
OPENSSL_init_crypto(OPENSSL_INIT_LOAD_CRYPTO_STRINGS, nullptr);
|
||||||
ERR_load_crypto_strings();
|
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,6 +60,8 @@ gnutls_datum_t make_datum(char *data, size_t size);
|
|||||||
#include <openssl/err.h>
|
#include <openssl/err.h>
|
||||||
#include <openssl/pem.h>
|
#include <openssl/pem.h>
|
||||||
#include <openssl/x509.h>
|
#include <openssl/x509.h>
|
||||||
|
#include <openssl/rsa.h>
|
||||||
|
#include <openssl/bn.h>
|
||||||
|
|
||||||
#ifndef BIO_EOF
|
#ifndef BIO_EOF
|
||||||
#define BIO_EOF -1
|
#define BIO_EOF -1
|
||||||
|
@ -41,12 +41,17 @@ public:
|
|||||||
|
|
||||||
virtual ~Transport() {
|
virtual ~Transport() {
|
||||||
stop();
|
stop();
|
||||||
if (mLower)
|
|
||||||
mLower->onRecv(nullptr); // doing it on stop could cause a deadlock
|
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual bool stop() {
|
virtual bool stop() {
|
||||||
return !mShutdown.exchange(true);
|
if (mShutdown.exchange(true))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// We don't want incoming() to be called by the lower layer anymore
|
||||||
|
if (mLower)
|
||||||
|
mLower->onRecv(nullptr);
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void registerIncoming() {
|
void registerIncoming() {
|
||||||
|
@ -18,8 +18,9 @@
|
|||||||
|
|
||||||
#if RTC_ENABLE_WEBSOCKET
|
#if RTC_ENABLE_WEBSOCKET
|
||||||
|
|
||||||
#include "include.hpp"
|
|
||||||
#include "websocket.hpp"
|
#include "websocket.hpp"
|
||||||
|
#include "include.hpp"
|
||||||
|
#include "threadpool.hpp"
|
||||||
|
|
||||||
#include "tcptransport.hpp"
|
#include "tcptransport.hpp"
|
||||||
#include "tlstransport.hpp"
|
#include "tlstransport.hpp"
|
||||||
@ -301,21 +302,18 @@ void WebSocket::closeTransports() {
|
|||||||
auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr));
|
auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr));
|
||||||
auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr));
|
auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr));
|
||||||
auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr));
|
auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr));
|
||||||
if (ws || tls || tcp) {
|
ThreadPool::Instance().enqueue([ws, tls, tcp]() mutable {
|
||||||
std::thread t([ws, tls, tcp, token = mInitToken]() mutable {
|
if (ws)
|
||||||
if (ws)
|
ws->stop();
|
||||||
ws->stop();
|
if (tls)
|
||||||
if (tls)
|
tls->stop();
|
||||||
tls->stop();
|
if (tcp)
|
||||||
if (tcp)
|
tcp->stop();
|
||||||
tcp->stop();
|
|
||||||
|
|
||||||
ws.reset();
|
ws.reset();
|
||||||
tls.reset();
|
tls.reset();
|
||||||
tcp.reset();
|
tcp.reset();
|
||||||
});
|
});
|
||||||
t.detach();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
@ -200,7 +200,7 @@ int test_capi_main() {
|
|||||||
|
|
||||||
// You may call rtcCleanup() when finished to free static resources
|
// You may call rtcCleanup() when finished to free static resources
|
||||||
rtcCleanup();
|
rtcCleanup();
|
||||||
sleep(1);
|
sleep(2);
|
||||||
|
|
||||||
printf("Success\n");
|
printf("Success\n");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -150,7 +150,7 @@ void test_connectivity() {
|
|||||||
|
|
||||||
// You may call rtc::Cleanup() when finished to free static resources
|
// You may call rtc::Cleanup() when finished to free static resources
|
||||||
rtc::Cleanup();
|
rtc::Cleanup();
|
||||||
this_thread::sleep_for(1s);
|
this_thread::sleep_for(2s);
|
||||||
|
|
||||||
cout << "Success" << endl;
|
cout << "Success" << endl;
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace chrono_literals;
|
using namespace chrono_literals;
|
||||||
@ -71,7 +72,10 @@ int main(int argc, char **argv) {
|
|||||||
cout << "*** Finished WebRTC benchmark" << endl;
|
cout << "*** Finished WebRTC benchmark" << endl;
|
||||||
} catch (const exception &e) {
|
} catch (const exception &e) {
|
||||||
cerr << "WebRTC benchmark failed: " << e.what() << endl;
|
cerr << "WebRTC benchmark failed: " << e.what() << endl;
|
||||||
|
std::this_thread::sleep_for(2s);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(2s);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ void test_websocket() {
|
|||||||
throw runtime_error("Expected message not received");
|
throw runtime_error("Expected message not received");
|
||||||
|
|
||||||
ws->close();
|
ws->close();
|
||||||
this_thread::sleep_for(1s);
|
this_thread::sleep_for(2s);
|
||||||
|
|
||||||
// You may call rtc::Cleanup() when finished to free static resources
|
// You may call rtc::Cleanup() when finished to free static resources
|
||||||
rtc::Cleanup();
|
rtc::Cleanup();
|
||||||
|
Reference in New Issue
Block a user