Compare commits

...

86 Commits

Author SHA1 Message Date
24e9e06c5a Bumped version to 0.4.4 2020-02-23 17:30:27 +01:00
443a19d8e7 Updated libjuice to v0.2.4 with better host candidates gathering 2020-02-23 17:24:47 +01:00
83de743924 Bumped version to 0.4.3 2020-02-21 12:39:27 +01:00
1dc1de4b86 Added platforms to Readme 2020-02-21 12:39:27 +01:00
8ca7722d48 Updated libjuice to v0.2.3 with Windows compilation support 2020-02-21 12:39:25 +01:00
3079072e63 For Win32, define _WIN32_WINNT to 0x0601 (Windows 7) if undefined 2020-02-21 00:04:57 +01:00
982d1c10e1 Merge pull request #35 from murat-dogan/master
Compile support on Windows with mingw-w64
2020-02-20 22:53:15 +00:00
50b22bbf3c delete win32 directive 2020-02-20 21:06:54 +03:00
93e153398f Compile support on Windows with mingw-w64 2020-02-20 14:55:21 +03:00
be6470d8bc Version 0.4.2 2020-02-06 11:42:03 +01:00
8a92c97058 Updated libjuice to v0.2.2 2020-02-06 11:36:01 +01:00
93da605230 Changed usrsctp submodule origin to sctplab/usrsctp 2020-01-29 11:24:11 +01:00
ff0f409d80 Added instructions to build with Makefile 2020-01-22 11:12:46 +01:00
a483e8135b Correct URL for libnice 2020-01-22 10:55:17 +01:00
36e4fdce1e Fixed and updated usrsctp then removed the usrsctp cmake hack 2020-01-22 10:54:21 +01:00
ea636d1f29 Changed version to 0.4.0 2020-01-21 15:39:30 +01:00
472d480978 Updated libjuice to v0.2.0 2020-01-21 15:31:22 +01:00
486fc373b2 Added details about dependencies 2020-01-21 15:30:27 +01:00
2a6c10269e Use nettle for libjuice only if using GnuTLS 2020-01-21 13:51:02 +01:00
ed68ba5402 Merge pull request #34 from paullouisageneau/libjuice
Libjuice as alternative to Libnice
2020-01-21 11:48:15 +00:00
5f91c0c1e3 Updated libjuice 2020-01-21 12:37:34 +01:00
d4c618ae38 Fixed OpenSSL MTU handling with a custom BIO for writing 2020-01-21 10:56:11 +01:00
3f52aa3d56 Fixed local and remote address getters with libjuice 2020-01-20 18:25:42 +01:00
c16ff99d83 Updated libjuice 2020-01-20 17:58:26 +01:00
e14b53f348 Include src headers from tests 2020-01-19 13:45:54 +01:00
ec8847cbf8 Updated libjuice 2020-01-19 11:54:00 +01:00
6c4e8f0d46 Fixed STUN server for libjuice 2020-01-18 17:31:29 +01:00
e2a5d5e1fe Updated libjuice and added logging for STUN server 2020-01-18 17:16:35 +01:00
fa8fda25c8 Updated Jamfile 2020-01-18 16:19:02 +01:00
47c29f0ec1 Set libjuice logging properly 2020-01-18 16:08:31 +01:00
a92438e94d Generated SDP description now uses CRLF instead of LF 2020-01-18 15:37:30 +01:00
0729ab28fd Replaced usrsctp-static by its alias Usrsctp::UsrsctpStatic 2020-01-18 12:50:14 +01:00
fa64b67006 Removed extern C around includes 2020-01-18 12:37:13 +01:00
b4d99158c6 Updated Readme 2020-01-18 00:33:06 +01:00
0ded19992c Integrated libjuice for ICE transport 2020-01-18 00:28:10 +01:00
2dece6afff Updated Makefile and CMakeLists for libjuice 2020-01-18 00:28:04 +01:00
0b554f988e Added submodule libjuice 2020-01-17 11:41:16 +01:00
15d29fc038 Merge pull request #33 from paullouisageneau/rfc8445
Updated to RFC 8445 for ICE
2020-01-14 08:27:16 +00:00
bb2c6c157d Updated to RFC 8445 for ICE 2020-01-14 09:26:05 +01:00
fd0f237a59 Merge pull request #32 from murat-dogan/master
Create stream before turn server addition
2019-12-26 09:05:57 +00:00
f09006f3ef Create stream before turn server addition 2019-12-26 09:48:55 +03:00
06369f1f14 Merge pull request #31 from paullouisageneau/turn-ipv6
TURN server URL and IPv6 support
2019-12-25 15:55:11 +00:00
61354b7101 Added comment 2019-12-25 16:55:01 +01:00
f34791b450 Removed useless include 2019-12-25 16:55:01 +01:00
c3f2f6bc63 Fixed TURN-TLS fallback port 2019-12-25 16:55:01 +01:00
a683b76a21 Extended STUN server URL parsing to TURN server URLs 2019-12-25 16:54:59 +01:00
e11de119be Added TURN server IPv6 support 2019-12-24 12:28:30 +01:00
2129e3cfb9 Cleanup datachannel closing test 2019-12-21 18:56:33 +01:00
58c8bad453 Reset ICE timeout id on timeout 2019-12-21 17:58:03 +01:00
1566c0ef21 Updated compilation with Makefile and Jamfile 2019-12-21 11:44:50 +01:00
03399e4b55 Fixed misplaced SCTP reading shutdown 2019-12-21 11:33:19 +01:00
0066b3aef0 Merge pull request #29 from paullouisageneau/ice-timeout
Fix connection shutdown
2019-12-20 23:01:10 +00:00
75f23f202f Removed trailing space 2019-12-20 23:58:41 +01:00
23e1a75248 Fixed DataChannel and SCTP shutdown 2019-12-20 23:52:38 +01:00
f930cfbe44 Reduced SCTP retransmission count and max RTO 2019-12-20 16:15:54 +01:00
72a0e2fe07 Merge pull request #28 from paullouisageneau/ice-timeout
Add trickle ICE timeout
2019-12-20 12:55:50 +00:00
f90ffbcf86 Added trickle ICE timeout + added some logging 2019-12-20 13:51:00 +01:00
a6992c765d Merge pull request #23 from murat-dogan/p2p-tests
P2P tests
2019-12-20 10:54:59 +00:00
1602498eab Merge pull request #27 from murat-dogan/master
Add Thread as a required package
2019-12-20 10:39:18 +00:00
e4ace4a750 Add Thread as an REQUIRED package 2019-12-20 12:00:30 +03:00
b5a13d2d66 Rename peers 2019-12-20 09:48:23 +03:00
aeb777aa49 Merge pull request #24 from paullouisageneau/integrate-plog
Integrate plog
2019-12-19 17:45:04 +00:00
7a552bb0fa Replaced console output with logging 2019-12-19 18:43:48 +01:00
402a4df4a0 Added InitLogger function and renamed enums 2019-12-19 15:58:48 +01:00
34ef87e271 Add executables to cmake 2019-12-19 15:44:36 +03:00
522319ac5d P2P Tests 2019-12-19 15:34:08 +03:00
1ac00ce396 Move logger init task to a function 2019-12-18 14:53:53 +03:00
92f08948d3 Integrate plog 2019-12-18 10:33:35 +03:00
9749f8d63e Merge pull request #21 from paullouisageneau/deps
Separate directory for dependencies
2019-12-17 20:42:34 +00:00
8626a07824 Updated Jamfile 2019-12-17 21:41:06 +01:00
37ca38999c Updated CMakeLists 2019-12-17 21:39:26 +01:00
18eeac3c0c Updated Makefile 2019-12-17 21:33:01 +01:00
c7de492b4b Moved usrsctp to deps 2019-12-17 21:31:52 +01:00
901700177b Merge pull request #17 from murat-dogan/master
Fix logic error: If we pass Relay Type then it is a TURN Server
2019-12-17 18:05:54 +00:00
88348732d9 Merge pull request #19 from paullouisageneau/fix-late-candidates
Fix late candidates triggering DTLS init twice
2019-12-17 15:04:14 +00:00
281eea2cec Fixed late candidates triggering DTLS init twice 2019-12-17 16:00:43 +01:00
28f923b1ce Fix Logic Error: If we are asking Relay Type than it is a TURN Server 2019-12-17 16:24:19 +03:00
48bdb6a1c9 Merge pull request #16 from murat-dogan/master
Option for enabling ICE TCP Candidates
2019-12-17 08:11:18 +00:00
278ac22766 Option for enabling ICE TCP Candidates 2019-12-17 07:49:26 +03:00
4fb14244db Merge pull request #14 from murat-dogan/master
TURN Server Support
2019-12-16 20:46:52 +00:00
432be41b9a Turn server protocol & Consistency Changes 2019-12-16 21:12:59 +03:00
78c80992bc Merge pull request #15 from paullouisageneau/revised-sync
Add close method, revise synchronization and states
2019-12-16 11:22:45 +00:00
8b64f8a406 Added PeerConnection::close() calls to test 2019-12-16 12:21:25 +01:00
1d7d1358be Added PeerConnection::close() and revised state machine 2019-12-16 12:15:59 +01:00
5fc6a1c8ad TURN Server Support 2019-12-16 13:04:32 +03:00
e5a19f85ed Revised synchronization 2019-12-16 10:45:00 +01:00
34 changed files with 1660 additions and 398 deletions

8
.gitmodules vendored
View File

@ -1,3 +1,9 @@
[submodule "deps/plog"]
path = deps/plog
url = https://github.com/SergiusTheBest/plog
[submodule "usrsctp"]
path = usrsctp
path = deps/usrsctp
url = https://github.com/sctplab/usrsctp.git
[submodule "deps/libjuice"]
path = deps/libjuice
url = https://github.com/paullouisageneau/libjuice

View File

@ -1,12 +1,27 @@
cmake_minimum_required (VERSION 3.7)
project (libdatachannel
DESCRIPTION "WebRTC Data Channels Library"
VERSION 0.2.1
DESCRIPTION "WebRTC DataChannels Library"
VERSION 0.4.4
LANGUAGES CXX)
option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
option(USE_JUICE "Use libjuice instead of libnice" OFF)
if(USE_GNUTLS)
option(USE_NETTLE "Use Nettle instead of OpenSSL in libjuice" ON)
else()
option(USE_NETTLE "Use Nettle instead of OpenSSL in libjuice" OFF)
endif()
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake/Modules)
if(WIN32)
if (MSYS OR MINGW)
add_definitions(-DSCTP_STDINT_INCLUDE=<stdint.h>)
endif()
endif()
set(LIBDATACHANNEL_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/candidate.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/certificate.cpp
@ -25,34 +40,20 @@ set(TESTS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/test/main.cpp
)
# Hack because usrsctp uses CMAKE_SOURCE_DIR instead of CMAKE_CURRENT_SOURCE_DIR
set(CMAKE_REQUIRED_FLAGS "-I${CMAKE_CURRENT_SOURCE_DIR}/usrsctp/usrsctplib")
set(TESTS_OFFERER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/test/p2p/offerer.cpp
)
add_subdirectory(usrsctp EXCLUDE_FROM_ALL)
set(TESTS_ANSWERER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/test/p2p/answerer.cpp
)
# Set include directory and custom options to make usrsctp compile with recent g++
target_include_directories(usrsctp-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/usrsctp/usrsctplib)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
# using regular Clang or AppleClang: Needed since they don't have -Wno-error=format-truncation
target_compile_options(usrsctp-static PRIVATE -Wno-error=address-of-packed-member)
else()
if (CMAKE_CXX_COMPILER_ID MATCHES "GNU")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS "9.0")
# GCC version below 9.0 does not support option address-of-packed-member
target_compile_options(usrsctp-static PRIVATE -Wno-error=format-truncation)
else()
target_compile_options(usrsctp-static PRIVATE -Wno-error=address-of-packed-member -Wno-error=format-truncation)
endif()
else()
# all other compilers
target_compile_options(usrsctp-static PRIVATE -Wno-error=address-of-packed-member -Wno-error=format-truncation)
endif()
endif()
option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
find_package(LibNice REQUIRED)
add_subdirectory(deps/usrsctp EXCLUDE_FROM_ALL)
add_library(Usrsctp::Usrsctp ALIAS usrsctp)
add_library(Usrsctp::UsrsctpStatic ALIAS usrsctp-static)
add_library(datachannel SHARED ${LIBDATACHANNEL_SOURCES})
set_target_properties(datachannel PROPERTIES
@ -62,7 +63,17 @@ set_target_properties(datachannel PROPERTIES
target_include_directories(datachannel PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_include_directories(datachannel PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc)
target_include_directories(datachannel PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(datachannel usrsctp-static LibNice::LibNice)
target_include_directories(datachannel PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/deps/plog/include)
target_link_libraries(datachannel
Threads::Threads
Usrsctp::UsrsctpStatic
)
if(WIN32)
target_link_libraries(datachannel
"wsock32" #winsock2
"ws2_32" #winsock2
)
endif()
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL ${LIBDATACHANNEL_SOURCES})
set_target_properties(datachannel-static PROPERTIES
@ -72,7 +83,17 @@ set_target_properties(datachannel-static PROPERTIES
target_include_directories(datachannel-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_include_directories(datachannel-static PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc)
target_include_directories(datachannel-static PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(datachannel-static usrsctp-static LibNice::LibNice)
target_include_directories(datachannel-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/deps/plog/include)
target_link_libraries(datachannel-static
Threads::Threads
Usrsctp::UsrsctpStatic
)
if(WIN32)
target_link_libraries(datachannel-static
"wsock32" #winsock2
"ws2_32" #winsock2
)
endif()
if (USE_GNUTLS)
find_package(GnuTLS REQUIRED)
@ -96,13 +117,45 @@ else()
target_link_libraries(datachannel-static OpenSSL::SSL)
endif()
if (USE_JUICE)
add_subdirectory(deps/libjuice EXCLUDE_FROM_ALL)
target_compile_definitions(datachannel PRIVATE USE_JUICE=1)
target_link_libraries(datachannel LibJuice::LibJuiceStatic)
target_compile_definitions(datachannel-static PRIVATE USE_JUICE=1)
target_link_libraries(datachannel-static LibJuice::LibJuiceStatic)
else()
find_package(LibNice REQUIRED)
target_compile_definitions(datachannel PRIVATE USE_JUICE=0)
target_link_libraries(datachannel LibNice::LibNice)
target_compile_definitions(datachannel-static PRIVATE USE_JUICE=0)
target_link_libraries(datachannel-static LibNice::LibNice)
endif()
add_library(LibDataChannel::LibDataChannel ALIAS datachannel)
add_library(LibDataChannel::LibDataChannelStatic ALIAS datachannel-static)
add_executable(tests ${TESTS_SOURCES})
set_target_properties(tests PROPERTIES
# Main Test
add_executable(datachannel-tests ${TESTS_SOURCES})
set_target_properties(datachannel-tests PROPERTIES
VERSION ${PROJECT_VERSION}
CXX_STANDARD 17)
set_target_properties(datachannel-tests PROPERTIES OUTPUT_NAME tests)
target_include_directories(datachannel-tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(datachannel-tests datachannel)
target_link_libraries(tests datachannel)
# P2P Test: offerer
add_executable(datachannel-offerer ${TESTS_OFFERER_SOURCES})
set_target_properties(datachannel-offerer PROPERTIES
VERSION ${PROJECT_VERSION}
CXX_STANDARD 17)
set_target_properties(datachannel-offerer PROPERTIES OUTPUT_NAME offerer)
target_link_libraries(datachannel-offerer datachannel)
# P2P Test: answerer
add_executable(datachannel-answerer ${TESTS_ANSWERER_SOURCES})
set_target_properties(datachannel-answerer PROPERTIES
VERSION ${PROJECT_VERSION}
CXX_STANDARD 17)
set_target_properties(datachannel-answerer PROPERTIES OUTPUT_NAME datachannel)
target_link_libraries(datachannel-answerer datachannel)

37
Jamfile
View File

@ -7,32 +7,59 @@ lib libdatachannel
: # requirements
<include>./include/rtc
<define>USE_GNUTLS=0
<cxxflags>"`pkg-config --cflags openssl glib-2.0 gobject-2.0 nice`"
<define>USE_JUICE=1
<cxxflags>"`pkg-config --cflags openssl`"
<library>/libdatachannel//usrsctp
<library>/libdatachannel//juice
: # default build
<link>static
: # usage requirements
<include>./include
<library>/libdatachannel//plog
<cxxflags>-pthread
<linkflags>"`pkg-config --libs openssl glib-2.0 gobject-2.0 nice`"
<linkflags>"`pkg-config --libs openssl`"
;
alias plog
: # no sources
: # no build requirements
: # no default build
: # usage requirements
<include>./deps/plog/include
;
alias usrsctp
: # no sources
: # no build requirements
: # no default build
: # usage requirements
<include>./usrsctp/usrsctplib
<include>./deps/usrsctp/usrsctplib
<library>libusrsctp.a
;
alias juice
: # no sources
: # no build requirements
: # no default build
: # usage requirements
<include>./deps/libjuice/include
<library>libjuice.a
;
make libusrsctp.a : : @make_libusrsctp ;
actions make_libusrsctp
{
(cd $(CWD)/usrsctp && \
(cd $(CWD)/deps/usrsctp && \
./bootstrap && \
./configure --enable-static --disable-debug CFLAGS="-fPIC -Wno-address-of-packed-member" && \
make)
cp $(CWD)/usrsctp/usrsctplib/.libs/libusrsctp.a $(<)
cp $(CWD)/deps/usrsctp/usrsctplib/.libs/libusrsctp.a $(<)
}
make libjuice.a : : @make_libjuice ;
actions make_libjuice
{
(cd $(CWD)/deps/libjuice && make USE_NETTLE=0)
cp $(CWD)/deps/libjuice/libjuice.a $(<)
}

View File

@ -4,23 +4,42 @@ NAME=libdatachannel
CXX=$(CROSS)g++
AR=$(CROSS)ar
RM=rm -f
CPPFLAGS=-O2 -pthread -fPIC -Wall -Wno-address-of-packed-member
CXXFLAGS=-std=c++17
CPPFLAGS=-O2 -pthread -fPIC -Wall -Wno-address-of-packed-member
LDFLAGS=-pthread
LIBS=glib-2.0 gobject-2.0 nice
USRSCTP_DIR=usrsctp
LIBS=
LOCALLIBS=libusrsctp.a
USRSCTP_DIR=deps/usrsctp
JUICE_DIR=deps/libjuice
PLOG_DIR=deps/plog
INCLUDES=-Iinclude/rtc -I$(PLOG_DIR)/include -I$(USRSCTP_DIR)/usrsctplib
LDLIBS=
USE_GNUTLS ?= 0
ifeq ($(USE_GNUTLS), 1)
CPPFLAGS+= -DUSE_GNUTLS=1
LIBS+= gnutls
ifneq ($(USE_GNUTLS), 0)
CPPFLAGS+=-DUSE_GNUTLS=1
LIBS+=gnutls
else
CPPFLAGS+= -DUSE_GNUTLS=0
LIBS+= openssl
CPPFLAGS+=-DUSE_GNUTLS=0
LIBS+=openssl
endif
LDLIBS= $(shell pkg-config --libs $(LIBS))
INCLUDES=-Iinclude/rtc -I$(USRSCTP_DIR)/usrsctplib $(shell pkg-config --cflags $(LIBS))
USE_JUICE ?= 0
ifneq ($(USE_JUICE), 0)
CPPFLAGS+=-DUSE_JUICE=1
INCLUDES+=-I$(JUICE_DIR)/include
LOCALLIBS+=libjuice.a
ifneq ($(USE_GNUTLS), 0)
LIBS+=nettle
endif
else
CPPFLAGS+=-DUSE_JUICE=0
LIBS+=glib-2.0 gobject-2.0 nice
endif
INCLUDES+=$(shell pkg-config --cflags $(LIBS))
LDLIBS+=$(LOCALLIBS) $(shell pkg-config --libs $(LIBS))
SRCS=$(shell printf "%s " src/*.cpp)
OBJS=$(subst .cpp,.o,$(SRCS))
@ -31,18 +50,18 @@ src/%.o: src/%.cpp
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) -MMD -MP -o $@ -c $<
test/%.o: test/%.cpp
$(CXX) $(CXXFLAGS) $(CPPFLAGS) -Iinclude -MMD -MP -o $@ -c $<
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) -Iinclude -Isrc -MMD -MP -o $@ -c $<
-include $(subst .cpp,.d,$(SRCS))
$(NAME).a: $(OBJS)
$(AR) crf $@ $(OBJS)
$(NAME).so: libusrsctp.a $(OBJS)
$(CXX) $(LDFLAGS) -shared -o $@ $(OBJS) $(LDLIBS) libusrsctp.a
$(NAME).so: $(LOCALLIBS) $(OBJS)
$(CXX) $(LDFLAGS) -shared -o $@ $(OBJS) $(LDLIBS)
tests: $(NAME).a test/main.o
$(CXX) $(LDFLAGS) -o $@ test/main.o $(LDLIBS) $(NAME).a libusrsctp.a
$(CXX) $(LDFLAGS) -o $@ test/main.o $(NAME).a $(LDLIBS)
clean:
-$(RM) include/rtc/*.d *.d
@ -53,11 +72,13 @@ dist-clean: clean
-$(RM) $(NAME).a
-$(RM) $(NAME).so
-$(RM) libusrsctp.a
-$(RM) libjuice.a
-$(RM) tests
-$(RM) include/*~
-$(RM) src/*~
-$(RM) test/*~
-cd $(USRSCTP_DIR) && make clean
-cd $(JUICE_DIR) && make clean
libusrsctp.a:
cd $(USRSCTP_DIR) && \
@ -66,3 +87,11 @@ libusrsctp.a:
make
cp $(USRSCTP_DIR)/usrsctplib/.libs/libusrsctp.a .
libjuice.a:
ifneq ($(USE_GNUTLS), 0)
cd $(JUICE_DIR) && make USE_NETTLE=1
else
cd $(JUICE_DIR) && make USE_NETTLE=0
endif
cp $(JUICE_DIR)/libjuice.a .

View File

@ -1,33 +1,46 @@
# libdatachannel - C/C++ WebRTC DataChannels
libdatachannel is a standalone implementation of WebRTC DataChannels in C++17 with C bindings. It enables direct connectivity between native applications and web browsers without the pain of importing the entire WebRTC stack. Its API is modelled as a simplified version of the JavaScript WebRTC API, in order to ease the design of cross-environment applications.
libdatachannel is a standalone implementation of WebRTC DataChannels in C++17 with C bindings for POSIX platforms and Microsoft Windows. It enables direct connectivity between native applications and web browsers without the pain of importing the entire WebRTC stack. Its API is modelled as a simplified version of the JavaScript WebRTC API, in order to ease the design of cross-environment applications.
This projet is originally inspired by [librtcdcpp](https://github.com/chadnickbok/librtcdcpp), however it is a complete rewrite from scratch, because the messy architecture of librtcdcpp made solving its implementation issues difficult.
The connectivity can be provided through my ad-hoc ICE library [libjuice](https://github.com/paullouisageneau/libjuice) as submodule or through [libnice](https://github.com/libnice/libnice). The security layer can be provided through [GnuTLS](https://www.gnutls.org/) or [OpenSSL](https://www.openssl.org/).
Licensed under LGPLv2, see [LICENSE](https://github.com/paullouisageneau/libdatachannel/blob/master/LICENSE).
## Compatibility
The library aims at fully implementing SCTP DataChannels ([draft-ietf-rtcweb-data-channel-13](https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13)) over DTLS/UDP ([RFC7350](https://tools.ietf.org/html/rfc7350) and [RFC8261](https://tools.ietf.org/html/rfc8261)) and has been tested to be compatible with Firefox and Chromium. It supports IPv6 and Multicast DNS candidates resolution ([draft-ietf-rtcweb-mdns-ice-candidates-03](https://tools.ietf.org/html/draft-ietf-rtcweb-mdns-ice-candidates-03)) provided the operating system also supports it.
The library aims at fully implementing WebRTC SCTP DataChannels ([draft-ietf-rtcweb-data-channel-13](https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13)) over DTLS/UDP ([RFC7350](https://tools.ietf.org/html/rfc7350) and [RFC8261](https://tools.ietf.org/html/rfc8261)) with ICE ([RFC8445](https://tools.ietf.org/html/rfc8445)). It has been tested to be compatible with Firefox and Chromium. It supports IPv6 and Multicast DNS candidates resolution ([draft-ietf-rtcweb-mdns-ice-candidates-03](https://tools.ietf.org/html/draft-ietf-rtcweb-mdns-ice-candidates-03)) provided the operating system also supports it.
## Dependencies
- libnice: https://github.com/libnice/libnice
- GnuTLS: https://www.gnutls.org/ or OpenSSL: https://www.openssl.org/
Optional:
- libnice: https://nice.freedesktop.org/ (substituable with libjuice)
Submodules:
- usrsctp: https://github.com/sctplab/usrsctp
- libjuice: https://github.com/paullouisageneau/libjuice
## Building
### Building with CMake (preferred)
```bash
$ git submodule update --init --recursive
$ mkdir build
$ cd build
$ cmake -DUSE_GNUTLS=1 ..
$ cmake -DUSE_JUICE=1 -DUSE_GNUTLS=1 ..
$ make
```
### Building directly with Make
```bash
$ git submodule update --init --recursive
$ make USE_JUICE=1 USE_GNUTLS=1
```
## Example
In the following example, note the callbacks are called in another thread.

1
deps/libjuice vendored Submodule

Submodule deps/libjuice added at 6ac73257b6

1
deps/plog vendored Submodule

Submodule deps/plog added at 2931644689

1
deps/usrsctp vendored Submodule

Submodule deps/usrsctp added at aa10d60bc2

View File

@ -27,16 +27,33 @@
namespace rtc {
struct IceServer {
IceServer(const string &host_);
IceServer(const string &hostname_, uint16_t port_);
IceServer(const string &hostname_, const string &service_);
enum class Type { Stun, Turn };
enum class RelayType { TurnUdp, TurnTcp, TurnTls };
// Any type
IceServer(const string &url);
// STUN
IceServer(string hostname_, uint16_t port_);
IceServer(string hostname_, string service_);
// TURN
IceServer(string hostname_, uint16_t port, string username_, string password_,
RelayType relayType_ = RelayType::TurnUdp);
IceServer(string hostname_, string service_, string username_, string password_,
RelayType relayType_ = RelayType::TurnUdp);
string hostname;
string service;
Type type;
string username;
string password;
RelayType relayType;
};
struct Configuration {
std::vector<IceServer> iceServers;
bool enableIceTcp = false;
uint16_t portRangeBegin = 1024;
uint16_t portRangeEnd = 65535;
};

View File

@ -36,7 +36,7 @@ namespace rtc {
class SctpTransport;
class PeerConnection;
class DataChannel : public Channel {
class DataChannel : public std::enable_shared_from_this<DataChannel>, public Channel {
public:
DataChannel(std::shared_ptr<PeerConnection> pc, unsigned int stream, string label,
string protocol, Reliability reliability);
@ -66,12 +66,13 @@ public:
Reliability reliability() const;
private:
void remoteClose();
void open(std::shared_ptr<SctpTransport> sctpTransport);
bool outgoing(mutable_message_ptr message);
void incoming(message_ptr message);
void processOpenMessage(message_ptr message);
std::shared_ptr<PeerConnection> mPeerConnection;
const std::shared_ptr<PeerConnection> mPeerConnection;
std::shared_ptr<SctpTransport> mSctpTransport;
unsigned int mStream;

View File

@ -45,6 +45,7 @@ public:
std::optional<string> fingerprint() const;
std::optional<uint16_t> sctpPort() const;
std::optional<size_t> maxMessageSize() const;
bool trickleEnabled() const;
void setFingerprint(string fingerprint);
void setSctpPort(uint16_t port);
@ -56,6 +57,8 @@ public:
operator string() const;
string generateSdp(const string &eol) const;
private:
Type mType;
Role mRole;

View File

@ -19,6 +19,12 @@
#ifndef RTC_INCLUDE_H
#define RTC_INCLUDE_H
#ifdef _WIN32
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0602
#endif
#endif
#include <cstddef>
#include <functional>
#include <memory>
@ -27,6 +33,9 @@
#include <string>
#include <vector>
#include "plog/Appenders/ColorConsoleAppender.h"
#include "plog/Log.h"
namespace rtc {
using std::byte;
@ -41,6 +50,8 @@ using std::uint32_t;
using std::uint64_t;
using std::uint8_t;
// Constants
const size_t MAX_NUMERICNODE_LEN = 48; // Max IPv6 string representation length
const size_t MAX_NUMERICSERV_LEN = 6; // Max port string representation length
@ -48,6 +59,30 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536; // Remote max message size if not specified in SDP
const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
// Log
enum class LogLevel { // Don't change, it must match plog severity
None = 0,
Fatal = 1,
Error = 2,
Warning = 3,
Info = 4,
Debug = 5,
Verbose = 6
};
inline void InitLogger(plog::Severity severity, plog::IAppender *appender = nullptr) {
static plog::ColorConsoleAppender<plog::TxtFormatter> consoleAppender;
if (!appender)
appender = &consoleAppender;
plog::init(severity, appender);
PLOG_DEBUG << "Logger initialized";
}
inline void InitLogger(LogLevel level) { InitLogger(static_cast<plog::Severity>(level)); }
// Utils
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
template <class... Ts> overloaded(Ts...)->overloaded<Ts...>;
@ -57,13 +92,13 @@ public:
~synchronized_callback() { *this = nullptr; }
synchronized_callback &operator=(std::function<void(P...)> func) {
std::lock_guard<std::recursive_mutex> lock(mutex);
std::lock_guard lock(mutex);
callback = func;
return *this;
}
void operator()(P... args) const {
std::lock_guard<std::recursive_mutex> lock(mutex);
std::lock_guard lock(mutex);
if (callback)
callback(args...);
}

View File

@ -31,6 +31,7 @@
#include <atomic>
#include <functional>
#include <list>
#include <mutex>
#include <thread>
#include <unordered_map>
@ -49,7 +50,8 @@ public:
Connected = RTC_CONNECTED,
Disconnected = RTC_DISCONNECTED,
Failed = RTC_FAILED,
Closed = RTC_CLOSED
Closed = RTC_CLOSED,
Destroying = RTC_DESTROYING
};
enum class GatheringState : int {
@ -62,6 +64,8 @@ public:
PeerConnection(const Configuration &config);
~PeerConnection();
void close();
const Configuration *config() const;
State state() const;
GatheringState gatheringState() const;
@ -83,16 +87,18 @@ public:
void onGatheringStateChange(std::function<void(GatheringState state)> callback);
private:
void initIceTransport(Description::Role role);
void initDtlsTransport();
void initSctpTransport();
std::shared_ptr<IceTransport> initIceTransport(Description::Role role);
std::shared_ptr<DtlsTransport> initDtlsTransport();
std::shared_ptr<SctpTransport> initSctpTransport();
void endLocalCandidates();
bool checkFingerprint(const std::string &fingerprint) const;
void forwardMessage(message_ptr message);
void forwardBufferedAmount(uint16_t stream, size_t amount);
void iterateDataChannels(std::function<void(std::shared_ptr<DataChannel> channel)> func);
void openDataChannels();
void closeDataChannels();
void remoteCloseDataChannels();
void processLocalDescription(Description description);
void processLocalCandidate(Candidate candidate);
@ -103,12 +109,13 @@ private:
const Configuration mConfig;
const std::shared_ptr<Certificate> mCertificate;
std::optional<Description> mLocalDescription;
std::optional<Description> mRemoteDescription;
std::optional<Description> mLocalDescription, mRemoteDescription;
mutable std::recursive_mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;
std::shared_ptr<IceTransport> mIceTransport;
std::shared_ptr<DtlsTransport> mDtlsTransport;
std::shared_ptr<SctpTransport> mSctpTransport;
std::recursive_mutex mInitMutex;
std::unordered_map<unsigned int, std::weak_ptr<DataChannel>> mDataChannels;

View File

@ -67,31 +67,31 @@ Queue<T>::Queue(size_t limit, amount_function func) : mLimit(limit), mAmount(0)
template <typename T> Queue<T>::~Queue() { stop(); }
template <typename T> void Queue<T>::stop() {
std::lock_guard<std::mutex> lock(mMutex);
std::lock_guard lock(mMutex);
mStopping = true;
mPopCondition.notify_all();
mPushCondition.notify_all();
}
template <typename T> bool Queue<T>::empty() const {
std::lock_guard<std::mutex> lock(mMutex);
std::lock_guard lock(mMutex);
return mQueue.empty();
}
template <typename T> size_t Queue<T>::size() const {
std::lock_guard<std::mutex> lock(mMutex);
std::lock_guard lock(mMutex);
return mQueue.size();
}
template <typename T> size_t Queue<T>::amount() const {
std::lock_guard<std::mutex> lock(mMutex);
std::lock_guard lock(mMutex);
return mAmount;
}
template <typename T> void Queue<T>::push(const T &element) { push(T{element}); }
template <typename T> void Queue<T>::push(T &&element) {
std::unique_lock<std::mutex> lock(mMutex);
std::unique_lock lock(mMutex);
mPushCondition.wait(lock, [this]() { return !mLimit || mQueue.size() < mLimit || mStopping; });
if (!mStopping) {
mAmount += mAmountFunction(element);
@ -101,7 +101,7 @@ template <typename T> void Queue<T>::push(T &&element) {
}
template <typename T> std::optional<T> Queue<T>::pop() {
std::unique_lock<std::mutex> lock(mMutex);
std::unique_lock lock(mMutex);
mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
if (!mQueue.empty()) {
mAmount -= mAmountFunction(mQueue.front());
@ -114,7 +114,7 @@ template <typename T> std::optional<T> Queue<T>::pop() {
}
template <typename T> std::optional<T> Queue<T>::peek() {
std::unique_lock<std::mutex> lock(mMutex);
std::unique_lock lock(mMutex);
if (!mQueue.empty()) {
return std::optional<T>{mQueue.front()};
} else {
@ -123,12 +123,12 @@ template <typename T> std::optional<T> Queue<T>::peek() {
}
template <typename T> void Queue<T>::wait() {
std::unique_lock<std::mutex> lock(mMutex);
std::unique_lock lock(mMutex);
mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
}
template <typename T> void Queue<T>::wait(const std::chrono::milliseconds &duration) {
std::unique_lock<std::mutex> lock(mMutex);
std::unique_lock lock(mMutex);
mPopCondition.wait_for(lock, duration, [this]() { return !mQueue.empty() || mStopping; });
}

View File

@ -23,7 +23,7 @@
extern "C" {
#endif
// libdatachannel rtc C API
// libdatachannel C API
typedef enum {
RTC_NEW = 0,
@ -31,7 +31,8 @@ typedef enum {
RTC_CONNECTED = 2,
RTC_DISCONNECTED = 3,
RTC_FAILED = 4,
RTC_CLOSED = 5
RTC_CLOSED = 5,
RTC_DESTROYING = 6 // internal
} rtc_state_t;
typedef enum {
@ -40,6 +41,19 @@ typedef enum {
RTC_GATHERING_COMPLETE = 2
} rtc_gathering_state_t;
// Don't change, it must match plog severity
typedef enum {
RTC_LOG_NONE = 0,
RTC_LOG_FATAL = 1,
RTC_LOG_ERROR = 2,
RTC_LOG_WARNING = 3,
RTC_LOG_INFO = 4,
RTC_LOG_DEBUG = 5,
RTC_LOG_VERBOSE = 6
} rtc_log_level_t;
void rtcInitLogger(rtc_log_level_t level);
int rtcCreatePeerConnection(const char **iceServers, int iceServersCount);
void rtcDeletePeerConnection(int pc);
int rtcCreateDataChannel(int pc, const char *label);
@ -51,8 +65,8 @@ void rtcSetLocalCandidateCallback(int pc,
void (*candidateCallback)(const char *, const char *, void *));
void rtcSetStateChangeCallback(int pc, void (*stateCallback)(rtc_state_t state, void *));
void rtcSetGatheringStateChangeCallback(int pc,
void (*gatheringStateCallback)(rtc_gathering_state_t state,
void *));
void (*gatheringStateCallback)(rtc_gathering_state_t state,
void *));
void rtcSetRemoteDescription(int pc, const char *sdp, const char *type);
void rtcAddRemoteCandidate(int pc, const char *candidate, const char *mid);
int rtcGetDataChannelLabel(int dc, char *data, int size);
@ -67,4 +81,3 @@ void rtcSetUserPointer(int i, void *ptr);
#endif
#endif

View File

@ -22,8 +22,14 @@
#include <array>
#include <sstream>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#elif __linux__
#include <netdb.h>
#include <sys/socket.h>
#endif
#include <sys/types.h>
using std::array;
@ -54,7 +60,7 @@ bool Candidate::resolve(ResolveMode mode) {
if (mIsResolved)
return true;
// See RFC 5245 for format
// See RFC 8445 for format
std::stringstream ss(mCandidate);
int component{0}, priority{0};
string foundation, transport, node, service, typ_, type;
@ -70,6 +76,11 @@ bool Candidate::resolve(ResolveMode mode) {
hints.ai_protocol = IPPROTO_UDP;
}
if (transport == "TCP" || transport == "tcp") {
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
}
if (mode == ResolveMode::Simple)
hints.ai_flags |= AI_NUMERICHOST;
@ -120,4 +131,3 @@ Candidate::operator string() const {
std::ostream &operator<<(std::ostream &out, const rtc::Candidate &candidate) {
return out << std::string(candidate);
}

View File

@ -145,7 +145,7 @@ shared_ptr<Certificate> make_certificate(const string &commonName) {
static std::unordered_map<string, shared_ptr<Certificate>> cache;
static std::mutex cacheMutex;
std::lock_guard<std::mutex> lock(cacheMutex);
std::lock_guard lock(cacheMutex);
if (auto it = cache.find(commonName); it != cache.end())
return it->second;
@ -241,7 +241,7 @@ shared_ptr<Certificate> make_certificate(const string &commonName) {
static std::unordered_map<string, shared_ptr<Certificate>> cache;
static std::mutex cacheMutex;
std::lock_guard<std::mutex> lock(cacheMutex);
std::lock_guard lock(cacheMutex);
if (auto it = cache.find(commonName); it != cache.end())
return it->second;

View File

@ -18,22 +18,70 @@
#include "configuration.hpp"
#include <regex>
namespace rtc {
using std::to_string;
IceServer::IceServer(const string &url) {
// Modified regex from RFC 3986, see https://tools.ietf.org/html/rfc3986#appendix-B
static const char *rs =
R"(^(([^:.@/?#]+):)?(/{0,2}((([^:@]*)(:([^@]*))?)@)?(([^:/?#]*)(:([^/?#]*))?))?([^?#]*)(\?([^#]*))?(#(.*))?)";
static const std::regex r(rs, std::regex::extended);
IceServer::IceServer(const string &host) {
if(size_t pos = host.rfind(':'); pos != string::npos) {
hostname = host.substr(0, pos);
service = host.substr(pos + 1);
} else {
hostname = host;
service = "3478"; // STUN UDP port
std::smatch m;
if (!std::regex_match(url, m, r) || m[10].length() == 0)
throw std::invalid_argument("Invalid ICE server URL: " + url);
std::vector<std::optional<string>> opt(m.size());
std::transform(m.begin(), m.end(), opt.begin(), [](const auto &sm) {
return sm.length() > 0 ? std::make_optional(string(sm)) : nullopt;
});
string scheme = opt[2].value_or("stun");
if (scheme == "stun" || scheme == "STUN")
type = Type::Stun;
else if (scheme == "turn" || scheme == "TURN")
type = Type::Turn;
else if (scheme == "turns" || scheme == "TURNS")
type = Type::Turn;
else
throw std::invalid_argument("Unknown ICE server protocol: " + scheme);
relayType = RelayType::TurnUdp;
if (auto &query = opt[15]) {
if (query->find("transport=udp") != string::npos)
relayType = RelayType::TurnUdp;
if (query->find("transport=tcp") != string::npos)
relayType = RelayType::TurnTcp;
if (query->find("transport=tls") != string::npos)
relayType = RelayType::TurnTls;
}
username = opt[6].value_or("");
password = opt[8].value_or("");
hostname = opt[10].value();
service = opt[12].value_or(relayType == RelayType::TurnTls ? "5349" : "3478");
while (!hostname.empty() && hostname.front() == '[')
hostname.erase(hostname.begin());
while (!hostname.empty() && hostname.back() == ']')
hostname.pop_back();
}
IceServer::IceServer(const string &hostname_, uint16_t port_) : IceServer(hostname_, to_string(port_)) {}
IceServer::IceServer(string hostname_, uint16_t port_)
: IceServer(std::move(hostname_), std::to_string(port_)) {}
IceServer::IceServer(const string &hostname_, const string &service_) : hostname(hostname_), service(service_) {}
IceServer::IceServer(string hostname_, string service_)
: hostname(std::move(hostname_)), service(std::move(service_)), type(Type::Stun) {}
IceServer::IceServer(string hostname_, uint16_t port_, string username_, string password_,
RelayType relayType_)
: IceServer(hostname_, std::to_string(port_), std::move(username_), std::move(password_),
relayType_) {}
IceServer::IceServer(string hostname_, string service_, string username_, string password_,
RelayType relayType_)
: hostname(std::move(hostname_)), service(std::move(service_)), type(Type::Turn),
username(std::move(username_)), password(std::move(password_)), relayType(relayType_) {}
} // namespace rtc

View File

@ -73,18 +73,22 @@ DataChannel::DataChannel(shared_ptr<PeerConnection> pc, shared_ptr<SctpTransport
mReliability(std::make_shared<Reliability>()),
mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
DataChannel::~DataChannel() { close(); }
DataChannel::~DataChannel() {
close();
}
void DataChannel::close() {
mIsOpen = false;
if (!mIsClosed.exchange(true)) {
if (mSctpTransport)
mSctpTransport->reset(mStream);
}
// Reset mSctpTransport first so SctpTransport is never alive without PeerConnection
if (mIsOpen.exchange(false) && mSctpTransport)
mSctpTransport->reset(mStream);
mIsClosed = true;
mSctpTransport.reset();
}
void DataChannel::remoteClose() {
mIsOpen = false;
if (!mIsClosed.exchange(true))
triggerClosed();
mSctpTransport.reset();
mPeerConnection.reset();
}
bool DataChannel::send(const std::variant<binary, string> &data) {
@ -108,12 +112,8 @@ std::optional<std::variant<binary, string>> DataChannel::receive() {
switch (message->type) {
case Message::Control: {
auto raw = reinterpret_cast<const uint8_t *>(message->data());
if (raw[0] == MESSAGE_CLOSE) {
if (mIsOpen) {
close();
triggerClosed();
}
}
if (raw[0] == MESSAGE_CLOSE)
remoteClose();
break;
}
case Message::String:

View File

@ -107,6 +107,8 @@ std::optional<uint16_t> Description::sctpPort() const { return mSctpPort; }
std::optional<size_t> Description::maxMessageSize() const { return mMaxMessageSize; }
bool Description::trickleEnabled() const { return mTrickle; }
void Description::setFingerprint(string fingerprint) {
mFingerprint.emplace(std::move(fingerprint));
}
@ -128,37 +130,39 @@ std::vector<Candidate> Description::extractCandidates() {
return result;
}
Description::operator string() const {
Description::operator string() const { return generateSdp("\r\n"); }
string Description::generateSdp(const string &eol) const {
if (!mFingerprint)
throw std::logic_error("Fingerprint must be set to generate a SDP");
throw std::logic_error("Fingerprint must be set to generate an SDP string");
std::ostringstream sdp;
sdp << "v=0\n";
sdp << "o=- " << mSessionId << " 0 IN IP4 127.0.0.1\n";
sdp << "s=-\n";
sdp << "t=0 0\n";
sdp << "a=group:BUNDLE 0\n";
sdp << "m=application 9 UDP/DTLS/SCTP webrtc-datachannel\n";
sdp << "c=IN IP4 0.0.0.0\n";
sdp << "a=ice-ufrag:" << mIceUfrag << "\n";
sdp << "a=ice-pwd:" << mIcePwd << "\n";
sdp << "v=0" << eol;
sdp << "o=- " << mSessionId << " 0 IN IP4 127.0.0.1" << eol;
sdp << "s=-" << eol;
sdp << "t=0 0" << eol;
sdp << "a=group:BUNDLE 0" << eol;
sdp << "m=application 9 UDP/DTLS/SCTP webrtc-datachannel" << eol;
sdp << "c=IN IP4 0.0.0.0" << eol;
sdp << "a=ice-ufrag:" << mIceUfrag << eol;
sdp << "a=ice-pwd:" << mIcePwd << eol;
if (mTrickle)
sdp << "a=ice-options:trickle\n";
sdp << "a=mid:" << mMid << "\n";
sdp << "a=setup:" << roleToString(mRole) << "\n";
sdp << "a=dtls-id:1\n";
sdp << "a=ice-options:trickle" << eol;
sdp << "a=mid:" << mMid << eol;
sdp << "a=setup:" << roleToString(mRole) << eol;
sdp << "a=dtls-id:1" << eol;
if (mFingerprint)
sdp << "a=fingerprint:sha-256 " << *mFingerprint << "\n";
sdp << "a=fingerprint:sha-256 " << *mFingerprint << eol;
if (mSctpPort)
sdp << "a=sctp-port:" << *mSctpPort << "\n";
sdp << "a=sctp-port:" << *mSctpPort << eol;
if (mMaxMessageSize)
sdp << "a=max-message-size:" << *mMaxMessageSize << "\n";
sdp << "a=max-message-size:" << *mMaxMessageSize << eol;
for (const auto &candidate : mCandidates) {
sdp << string(candidate) << "\n";
sdp << string(candidate) << eol;
}
if (!mTrickle)
sdp << "a=end-of-candidates\n";
sdp << "a=end-of-candidates" << eol;
return sdp.str();
}

View File

@ -18,12 +18,16 @@
#include "dtlstransport.hpp"
#include "icetransport.hpp"
#include "message.hpp"
#include <cassert>
#include <chrono>
#include <cstring>
#include <exception>
#include <iostream>
using namespace std::chrono;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
@ -37,8 +41,11 @@ namespace {
static bool check_gnutls(int ret, const string &message = "GnuTLS error") {
if (ret < 0) {
if (!gnutls_error_is_fatal(ret))
if (!gnutls_error_is_fatal(ret)) {
PLOG_INFO << gnutls_strerror(ret);
return false;
}
PLOG_ERROR << message << ": " << gnutls_strerror(ret);
throw std::runtime_error(message + ": " + gnutls_strerror(ret));
}
return true;
@ -54,6 +61,9 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
: Transport(lower), mCertificate(certificate), mState(State::Disconnected),
mVerifierCallback(std::move(verifierCallback)),
mStateChangeCallback(std::move(stateChangeCallback)) {
PLOG_DEBUG << "Initializing DTLS transport (GnuTLS)";
gnutls_certificate_set_verify_function(mCertificate->credentials(), CertificateCallback);
bool active = lower->role() == Description::Role::Active;
@ -85,7 +95,8 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
}
DtlsTransport::~DtlsTransport() {
gnutls_bye(mSession, GNUTLS_SHUT_RDWR);
stop();
gnutls_deinit(mSession);
}
@ -94,14 +105,20 @@ DtlsTransport::State DtlsTransport::state() const { return mState; }
void DtlsTransport::stop() {
Transport::stop();
mIncomingQueue.stop();
mRecvThread.join();
if (mRecvThread.joinable()) {
PLOG_DEBUG << "Stopping DTLS recv thread";
mIncomingQueue.stop();
gnutls_bye(mSession, GNUTLS_SHUT_RDWR);
mRecvThread.join();
}
}
bool DtlsTransport::send(message_ptr message) {
if (!message || mState != State::Connected)
return false;
PLOG_VERBOSE << "Send size=" << message->size();
ssize_t ret;
do {
ret = gnutls_record_send(mSession, message->data(), message->size());
@ -113,7 +130,12 @@ bool DtlsTransport::send(message_ptr message) {
return check_gnutls(ret);
}
void DtlsTransport::incoming(message_ptr message) { mIncomingQueue.push(message); }
void DtlsTransport::incoming(message_ptr message) {
if (message)
mIncomingQueue.push(message);
else
mIncomingQueue.stop();
}
void DtlsTransport::changeState(State state) {
if (mState.exchange(state) != state)
@ -142,7 +164,7 @@ void DtlsTransport::runRecvLoop() {
gnutls_dtls_set_mtu(mSession, maxMtu + 1);
} catch (const std::exception &e) {
std::cerr << "DTLS handshake: " << e.what() << std::endl;
PLOG_ERROR << "DTLS handshake: " << e.what();
changeState(State::Failed);
return;
}
@ -161,12 +183,15 @@ void DtlsTransport::runRecvLoop() {
} while (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN);
// Consider premature termination as remote closing
if (ret == GNUTLS_E_PREMATURE_TERMINATION)
if (ret == GNUTLS_E_PREMATURE_TERMINATION) {
PLOG_DEBUG << "DTLS connection terminated";
break;
}
if (check_gnutls(ret)) {
if (ret == 0) {
// Closed
PLOG_DEBUG << "DTLS connection cleanly closed";
break;
}
auto *b = reinterpret_cast<byte *>(buffer);
@ -175,9 +200,10 @@ void DtlsTransport::runRecvLoop() {
}
} catch (const std::exception &e) {
std::cerr << "DTLS recv: " << e.what() << std::endl;
PLOG_ERROR << "DTLS recv: " << e.what();
}
PLOG_INFO << "DTLS disconnected";
changeState(State::Disconnected);
recv(nullptr);
}
@ -222,24 +248,22 @@ ssize_t DtlsTransport::WriteCallback(gnutls_transport_ptr_t ptr, const void *dat
ssize_t DtlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size_t maxlen) {
DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
auto next = t->mIncomingQueue.pop();
auto message = next ? *next : nullptr;
if (!message) {
// Closed
if (auto next = t->mIncomingQueue.pop()) {
auto message = *next;
ssize_t len = std::min(maxlen, message->size());
std::memcpy(data, message->data(), len);
gnutls_transport_set_errno(t->mSession, 0);
return 0;
return len;
}
ssize_t len = std::min(maxlen, message->size());
std::memcpy(data, message->data(), len);
// Closed
gnutls_transport_set_errno(t->mSession, 0);
return len;
return 0;
}
int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
if (ms != GNUTLS_INDEFINITE_TIMEOUT)
t->mIncomingQueue.wait(std::chrono::milliseconds(ms));
t->mIncomingQueue.wait(milliseconds(ms));
else
t->mIncomingQueue.wait();
return !t->mIncomingQueue.empty() ? 1 : 0;
@ -247,7 +271,7 @@ int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms)
} // namespace rtc
#else
#else // USE_GNUTLS==0
#include <openssl/bio.h>
#include <openssl/ec.h>
@ -268,8 +292,10 @@ string openssl_error_string(unsigned long err) {
bool check_openssl(int success, const string &message = "OpenSSL error") {
if (success)
return true;
else
throw std::runtime_error(message + ": " + openssl_error_string(ERR_get_error()));
string str = openssl_error_string(ERR_get_error());
PLOG_ERROR << message << ": " << str;
throw std::runtime_error(message + ": " + str);
}
bool check_openssl_ret(SSL *ssl, int ret, const string &message = "OpenSSL error") {
@ -277,23 +303,37 @@ bool check_openssl_ret(SSL *ssl, int ret, const string &message = "OpenSSL error
return true;
unsigned long err = SSL_get_error(ssl, ret);
if (err == SSL_ERROR_NONE || err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE)
if (err == SSL_ERROR_NONE || err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
return true;
else if (err == SSL_ERROR_ZERO_RETURN)
}
if (err == SSL_ERROR_ZERO_RETURN) {
PLOG_DEBUG << "DTLS connection cleanly closed";
return false;
else
throw std::runtime_error(message + ": " + openssl_error_string(err));
}
string str = openssl_error_string(err);
PLOG_ERROR << str;
throw std::runtime_error(message + ": " + str);
}
} // namespace
namespace rtc {
BIO_METHOD *DtlsTransport::BioMethods = NULL;
int DtlsTransport::TransportExIndex = -1;
std::mutex DtlsTransport::GlobalMutex;
void DtlsTransport::GlobalInit() {
std::lock_guard<std::mutex> lock(GlobalMutex);
std::lock_guard lock(GlobalMutex);
if (!BioMethods) {
BioMethods = BIO_meth_new(BIO_TYPE_BIO, "DTLS writer");
if (!BioMethods)
throw std::runtime_error("Unable to BIO methods for DTLS writer");
BIO_meth_set_create(BioMethods, BioMethodNew);
BIO_meth_set_destroy(BioMethods, BioMethodFree);
BIO_meth_set_write(BioMethods, BioMethodWrite);
BIO_meth_set_ctrl(BioMethods, BioMethodCtrl);
}
if (TransportExIndex < 0) {
TransportExIndex = SSL_get_ex_new_index(0, NULL, NULL, NULL, NULL);
}
@ -305,6 +345,7 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
mVerifierCallback(std::move(verifierCallback)),
mStateChangeCallback(std::move(stateChangeCallback)) {
PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)";
GlobalInit();
if (!(mCtx = SSL_CTX_new(DTLS_method())))
@ -316,7 +357,7 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
// RFC 8261: SCTP performs segmentation and reassembly based on the path MTU.
// Therefore, the DTLS layer MUST NOT use any compression algorithm.
// See https://tools.ietf.org/html/rfc8261#section-5
SSL_CTX_set_options(mCtx, SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION);
SSL_CTX_set_options(mCtx, SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION | SSL_OP_NO_QUERY_MTU);
SSL_CTX_set_min_proto_version(mCtx, DTLS1_VERSION);
SSL_CTX_set_read_ahead(mCtx, 1);
SSL_CTX_set_quiet_shutdown(mCtx, 1);
@ -332,7 +373,7 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
check_openssl(SSL_CTX_check_private_key(mCtx), "SSL local private key check failed");
if (!(mSsl = SSL_new(mCtx)))
throw std::runtime_error("Unable to create SSL instance");
throw std::runtime_error("Unable to create SSL instance");
SSL_set_ex_data(mSsl, TransportExIndex, this);
SSL_set_mtu(mSsl, 1280 - 40 - 8); // min MTU over UDP/IPv6
@ -342,11 +383,11 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
else
SSL_set_accept_state(mSsl);
if (!(mInBio = BIO_new(BIO_s_mem())) || !(mOutBio = BIO_new(BIO_s_mem())))
throw std::runtime_error("Unable to create BIO");
if (!(mInBio = BIO_new(BIO_s_mem())) || !(mOutBio = BIO_new(BioMethods)))
throw std::runtime_error("Unable to create BIO");
BIO_set_mem_eof_return(mInBio, BIO_EOF);
BIO_set_mem_eof_return(mOutBio, BIO_EOF);
BIO_set_data(mOutBio, this);
SSL_set_bio(mSsl, mInBio, mOutBio);
auto ecdh = unique_ptr<EC_KEY, decltype(&EC_KEY_free)>(
@ -358,7 +399,8 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
}
DtlsTransport::~DtlsTransport() {
SSL_shutdown(mSsl);
stop();
SSL_free(mSsl);
SSL_CTX_free(mCtx);
}
@ -366,34 +408,35 @@ DtlsTransport::~DtlsTransport() {
void DtlsTransport::stop() {
Transport::stop();
mIncomingQueue.stop();
mRecvThread.join();
if (mRecvThread.joinable()) {
PLOG_DEBUG << "Stopping DTLS recv thread";
mIncomingQueue.stop();
mRecvThread.join();
SSL_shutdown(mSsl);
}
}
DtlsTransport::State DtlsTransport::state() const { return mState; }
bool DtlsTransport::send(message_ptr message) {
const size_t bufferSize = 4096;
byte buffer[bufferSize];
if (!message || mState != State::Connected)
return false;
PLOG_VERBOSE << "Send size=" << message->size();
int ret = SSL_write(mSsl, message->data(), message->size());
if (!check_openssl_ret(mSsl, ret)) {
if (!check_openssl_ret(mSsl, ret))
return false;
}
while (BIO_ctrl_pending(mOutBio) > 0) {
int ret = BIO_read(mOutBio, buffer, bufferSize);
if (check_openssl_ret(mSsl, ret) && ret > 0)
outgoing(make_message(buffer, buffer + ret));
}
return true;
}
void DtlsTransport::incoming(message_ptr message) { mIncomingQueue.push(message); }
void DtlsTransport::incoming(message_ptr message) {
if (message)
mIncomingQueue.push(message);
else
mIncomingQueue.stop();
}
void DtlsTransport::changeState(State state) {
if (mState.exchange(state) != state)
@ -401,19 +444,14 @@ void DtlsTransport::changeState(State state) {
}
void DtlsTransport::runRecvLoop() {
const size_t bufferSize = 4096;
byte buffer[bufferSize];
const size_t maxMtu = 4096;
try {
changeState(State::Connecting);
SSL_do_handshake(mSsl);
while (BIO_ctrl_pending(mOutBio) > 0) {
int ret = BIO_read(mOutBio, buffer, bufferSize);
if (check_openssl_ret(mSsl, ret) && ret > 0)
outgoing(make_message(buffer, buffer + ret));
}
const size_t bufferSize = maxMtu;
byte buffer[bufferSize];
while (auto next = mIncomingQueue.pop()) {
auto message = *next;
BIO_write(mInBio, message->data(), message->size());
@ -427,27 +465,28 @@ void DtlsTransport::runRecvLoop() {
if (unsigned long err = ERR_get_error())
throw std::runtime_error("handshake failed: " + openssl_error_string(err));
while (BIO_ctrl_pending(mOutBio) > 0) {
ret = BIO_read(mOutBio, buffer, bufferSize);
if (check_openssl_ret(mSsl, ret) && ret > 0)
outgoing(make_message(buffer, buffer + ret));
}
if (SSL_is_init_finished(mSsl))
if (SSL_is_init_finished(mSsl)) {
changeState(State::Connected);
// RFC 8261: DTLS MUST support sending messages larger than the current path MTU
// See https://tools.ietf.org/html/rfc8261#section-5
SSL_set_mtu(mSsl, maxMtu + 1);
}
}
if (decrypted)
recv(decrypted);
}
} catch (const std::exception &e) {
std::cerr << "DTLS recv: " << e.what() << std::endl;
PLOG_ERROR << "DTLS recv: " << e.what();
}
if (mState == State::Connected) {
PLOG_INFO << "DTLS disconnected";
changeState(State::Disconnected);
recv(nullptr);
} else {
PLOG_INFO << "DTLS handshake failed";
changeState(State::Failed);
}
}
@ -469,12 +508,52 @@ void DtlsTransport::InfoCallback(const SSL *ssl, int where, int ret) {
static_cast<DtlsTransport *>(SSL_get_ex_data(ssl, DtlsTransport::TransportExIndex));
if (where & SSL_CB_ALERT) {
if (ret != 256) // Close Notify
std::cerr << "DTLS alert: " << SSL_alert_desc_string_long(ret) << std::endl;
if (ret != 256) { // Close Notify
PLOG_ERROR << "DTLS alert: " << SSL_alert_desc_string_long(ret);
}
t->mIncomingQueue.stop(); // Close the connection
}
}
int DtlsTransport::BioMethodNew(BIO *bio) {
BIO_set_init(bio, 1);
BIO_set_data(bio, NULL);
BIO_set_shutdown(bio, 0);
return 1;
}
int DtlsTransport::BioMethodFree(BIO *bio) {
if (!bio)
return 0;
BIO_set_data(bio, NULL);
return 1;
}
int DtlsTransport::BioMethodWrite(BIO *bio, const char *in, int inl) {
if (inl <= 0)
return inl;
auto transport = reinterpret_cast<DtlsTransport *>(BIO_get_data(bio));
if (!transport)
return -1;
auto b = reinterpret_cast<const byte *>(in);
return transport->outgoing(make_message(b, b + inl)) ? inl : 0;
}
long DtlsTransport::BioMethodCtrl(BIO *bio, int cmd, long num, void *ptr) {
switch (cmd) {
case BIO_CTRL_FLUSH:
return 1;
case BIO_CTRL_DGRAM_QUERY_MTU:
return 0; // SSL_OP_NO_QUERY_MTU must be set
case BIO_CTRL_WPENDING:
case BIO_CTRL_PENDING:
return 0;
default:
break;
}
return 0;
}
} // namespace rtc
#endif

View File

@ -55,10 +55,10 @@ public:
State state() const;
void stop() override;
bool send(message_ptr message); // false if dropped
bool send(message_ptr message) override; // false if dropped
private:
void incoming(message_ptr message);
void incoming(message_ptr message) override;
void changeState(State state);
void runRecvLoop();
@ -83,12 +83,18 @@ private:
SSL *mSsl;
BIO *mInBio, *mOutBio;
static BIO_METHOD *BioMethods;
static int TransportExIndex;
static std::mutex GlobalMutex;
static void GlobalInit();
static int CertificateCallback(int preverify_ok, X509_STORE_CTX *ctx);
static void InfoCallback(const SSL *ssl, int where, int ret);
static int BioMethodNew(BIO *bio);
static int BioMethodFree(BIO *bio);
static int BioMethodWrite(BIO *bio, const char *in, int inl);
static long BioMethodCtrl(BIO *bio, int cmd, long num, void *ptr);
#endif
};

View File

@ -1,5 +1,5 @@
/**
* Copyright (c) 2019 Paul-Louis Ageneau
* Copyright (c) 2019-2020 Paul-Louis Ageneau
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@ -17,40 +17,273 @@
*/
#include "icetransport.hpp"
#include "configuration.hpp"
#ifdef _WIN32
#include <winsock2.h>
#elif __linux__
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#endif
#include <sys/types.h>
#include <chrono>
#include <iostream>
#include <random>
#include <sstream>
namespace rtc {
using namespace std::chrono_literals;
using std::shared_ptr;
using std::weak_ptr;
#if USE_JUICE
namespace rtc {
IceTransport::IceTransport(const Configuration &config, Description::Role role,
candidate_callback candidateCallback,
state_callback stateChangeCallback,
candidate_callback candidateCallback, state_callback stateChangeCallback,
gathering_state_callback gatheringStateChangeCallback)
: mRole(role), mMid("0"), mState(State::Disconnected), mGatheringState(GatheringState::New),
mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr),
mCandidateCallback(std::move(candidateCallback)),
mStateChangeCallback(std::move(stateChangeCallback)),
mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)) {
mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)),
mAgent(nullptr, nullptr) {
auto logLevelFlags = GLogLevelFlags(G_LOG_LEVEL_MASK | G_LOG_FLAG_FATAL | G_LOG_FLAG_RECURSION);
g_log_set_handler(nullptr, logLevelFlags, LogCallback, this);
nice_debug_enable(false);
PLOG_DEBUG << "Initializing ICE transport (libjuice)";
if (config.enableIceTcp) {
PLOG_WARNING << "ICE-TCP is not supported with libjuice";
}
juice_set_log_handler(IceTransport::LogCallback);
juice_set_log_level(JUICE_LOG_LEVEL_VERBOSE);
juice_config_t jconfig = {};
jconfig.cb_state_changed = IceTransport::StateChangeCallback;
jconfig.cb_candidate = IceTransport::CandidateCallback;
jconfig.cb_gathering_done = IceTransport::GatheringDoneCallback;
jconfig.cb_recv = IceTransport::RecvCallback;
jconfig.user_ptr = this;
// Randomize servers order
std::vector<IceServer> servers = config.iceServers;
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
std::shuffle(servers.begin(), servers.end(), std::default_random_engine(seed));
// Pick a STUN server
for (auto &server : servers) {
if (!server.hostname.empty() && server.type == IceServer::Type::Stun) {
if (server.service.empty())
server.service = "3478"; // STUN UDP port
PLOG_DEBUG << "Using STUN server \"" << server.hostname << ":" << server.service
<< "\"";
mStunHostname = server.hostname;
mStunService = server.service;
jconfig.stun_server_host = mStunHostname.c_str();
jconfig.stun_server_port = std::stoul(mStunService);
}
}
// TURN support is not implemented yet
// Create agent
mAgent = decltype(mAgent)(juice_create(&jconfig), juice_destroy);
if (!mAgent)
throw std::runtime_error("Failed to create the ICE agent");
}
IceTransport::~IceTransport() { stop(); }
void IceTransport::stop() {
// Nothing to do
}
Description::Role IceTransport::role() const { return mRole; }
IceTransport::State IceTransport::state() const { return mState; }
Description IceTransport::getLocalDescription(Description::Type type) const {
char sdp[JUICE_MAX_SDP_STRING_LEN];
if (juice_get_local_description(mAgent.get(), sdp, JUICE_MAX_SDP_STRING_LEN) < 0)
throw std::runtime_error("Failed to generate local SDP");
return Description(string(sdp), type, mRole);
}
void IceTransport::setRemoteDescription(const Description &description) {
mRole = description.role() == Description::Role::Active ? Description::Role::Passive
: Description::Role::Active;
mMid = description.mid();
// TODO
// mTrickleTimeout = description.trickleEnabled() ? 30s : 0s;
if (juice_set_remote_description(mAgent.get(), string(description).c_str()) < 0)
throw std::runtime_error("Failed to parse remote SDP");
}
bool IceTransport::addRemoteCandidate(const Candidate &candidate) {
// Don't try to pass unresolved candidates for more safety
if (!candidate.isResolved())
return false;
return juice_add_remote_candidate(mAgent.get(), string(candidate).c_str()) >= 0;
}
void IceTransport::gatherLocalCandidates() {
// Change state now as candidates calls can be synchronous
changeGatheringState(GatheringState::InProgress);
if (juice_gather_candidates(mAgent.get()) < 0) {
throw std::runtime_error("Failed to gather local ICE candidates");
}
}
std::optional<string> IceTransport::getLocalAddress() const {
char str[JUICE_MAX_ADDRESS_STRING_LEN];
if (juice_get_selected_addresses(mAgent.get(), str, JUICE_MAX_ADDRESS_STRING_LEN, NULL, 0) ==
0) {
return std::make_optional(string(str));
}
return nullopt;
}
std::optional<string> IceTransport::getRemoteAddress() const {
char str[JUICE_MAX_ADDRESS_STRING_LEN];
if (juice_get_selected_addresses(mAgent.get(), NULL, 0, str, JUICE_MAX_ADDRESS_STRING_LEN) ==
0) {
return std::make_optional(string(str));
}
return nullopt;
}
bool IceTransport::send(message_ptr message) {
if (!message || (mState != State::Connected && mState != State::Completed))
return false;
PLOG_VERBOSE << "Send size=" << message->size();
return outgoing(message);
}
void IceTransport::incoming(message_ptr message) { recv(message); }
void IceTransport::incoming(const byte *data, int size) {
incoming(make_message(data, data + size));
}
bool IceTransport::outgoing(message_ptr message) {
return juice_send(mAgent.get(), reinterpret_cast<const char *>(message->data()),
message->size()) >= 0;
}
void IceTransport::changeState(State state) {
if (mState.exchange(state) != state)
mStateChangeCallback(mState);
}
void IceTransport::changeGatheringState(GatheringState state) {
if (mGatheringState.exchange(state) != state)
mGatheringStateChangeCallback(mGatheringState);
}
void IceTransport::processStateChange(unsigned int state) {
changeState(static_cast<State>(state));
}
void IceTransport::processCandidate(const string &candidate) {
mCandidateCallback(Candidate(candidate, mMid));
}
void IceTransport::processGatheringDone() { changeGatheringState(GatheringState::Complete); }
void IceTransport::StateChangeCallback(juice_agent_t *agent, juice_state_t state, void *user_ptr) {
auto iceTransport = static_cast<rtc::IceTransport *>(user_ptr);
try {
iceTransport->processStateChange(static_cast<unsigned int>(state));
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
void IceTransport::CandidateCallback(juice_agent_t *agent, const char *sdp, void *user_ptr) {
auto iceTransport = static_cast<rtc::IceTransport *>(user_ptr);
try {
iceTransport->processCandidate(sdp);
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
void IceTransport::GatheringDoneCallback(juice_agent_t *agent, void *user_ptr) {
auto iceTransport = static_cast<rtc::IceTransport *>(user_ptr);
try {
iceTransport->processGatheringDone();
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
void IceTransport::RecvCallback(juice_agent_t *agent, const char *data, size_t size,
void *user_ptr) {
auto iceTransport = static_cast<rtc::IceTransport *>(user_ptr);
try {
iceTransport->incoming(reinterpret_cast<const byte *>(data), size);
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
void IceTransport::LogCallback(juice_log_level_t level, const char *message) {
plog::Severity severity;
switch (level) {
case JUICE_LOG_LEVEL_FATAL:
severity = plog::fatal;
break;
case JUICE_LOG_LEVEL_ERROR:
severity = plog::error;
break;
case JUICE_LOG_LEVEL_WARN:
severity = plog::warning;
break;
case JUICE_LOG_LEVEL_INFO:
severity = plog::info;
break;
case JUICE_LOG_LEVEL_DEBUG:
severity = plog::debug;
break;
default:
severity = plog::verbose;
break;
}
PLOG(severity) << "juice: " << message;
}
} // namespace rtc
#else // USE_JUICE == 0
namespace rtc {
IceTransport::IceTransport(const Configuration &config, Description::Role role,
candidate_callback candidateCallback, state_callback stateChangeCallback,
gathering_state_callback gatheringStateChangeCallback)
: mRole(role), mMid("0"), mState(State::Disconnected), mGatheringState(GatheringState::New),
mCandidateCallback(std::move(candidateCallback)),
mStateChangeCallback(std::move(stateChangeCallback)),
mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)),
mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr) {
PLOG_DEBUG << "Initializing ICE transport (libnice)";
g_log_set_handler("libnice", G_LOG_LEVEL_MASK, LogCallback, this);
IF_PLOG(plog::verbose) {
nice_debug_enable(false); // do not output STUN debug messages
}
mMainLoop = decltype(mMainLoop)(g_main_loop_new(nullptr, FALSE), g_main_loop_unref);
if (!mMainLoop)
std::runtime_error("Failed to create the main loop");
// RFC 5245 was obsoleted by RFC 8445 but this should be OK.
mNiceAgent = decltype(mNiceAgent)(
nice_agent_new(g_main_loop_get_context(mMainLoop.get()), NICE_COMPATIBILITY_RFC5245),
g_object_unref);
@ -60,23 +293,38 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
mMainLoopThread = std::thread(g_main_loop_run, mMainLoop.get());
g_object_set(G_OBJECT(mNiceAgent.get()), "controlling-mode", TRUE, nullptr);
mStreamId = nice_agent_add_stream(mNiceAgent.get(), 1);
if (!mStreamId)
throw std::runtime_error("Failed to add a stream");
g_object_set(G_OBJECT(mNiceAgent.get()), "controlling-mode", TRUE, nullptr); // decided later
g_object_set(G_OBJECT(mNiceAgent.get()), "ice-udp", TRUE, nullptr);
g_object_set(G_OBJECT(mNiceAgent.get()), "ice-tcp", FALSE, nullptr);
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-initial-timeout", 200, nullptr);
g_object_set(G_OBJECT(mNiceAgent.get()), "ice-tcp", config.enableIceTcp ? TRUE : FALSE,
nullptr);
// RFC 8445: Agents MUST NOT use an RTO value smaller than 500 ms.
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-initial-timeout", 500, nullptr);
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-max-retransmissions", 3, nullptr);
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-pacing-timer", 20, nullptr);
// RFC 8445: ICE agents SHOULD use a default Ta value, 50 ms, but MAY use another value based on
// the characteristics of the associated data.
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-pacing-timer", 25, nullptr);
g_object_set(G_OBJECT(mNiceAgent.get()), "upnp", FALSE, nullptr);
g_object_set(G_OBJECT(mNiceAgent.get()), "upnp-timeout", 200, nullptr);
// Randomize order
std::vector<IceServer> servers = config.iceServers;
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
std::shuffle(servers.begin(), servers.end(), std::default_random_engine(seed));
// Add one STUN server
bool success = false;
for (auto &server : servers) {
if (server.hostname.empty())
continue;
if (server.type != IceServer::Type::Stun)
continue;
if (server.service.empty())
server.service = "3478"; // STUN UDP port
@ -94,9 +342,10 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
char nodebuffer[MAX_NUMERICNODE_LEN];
char servbuffer[MAX_NUMERICSERV_LEN];
if (getnameinfo(p->ai_addr, p->ai_addrlen, nodebuffer, MAX_NUMERICNODE_LEN,
servbuffer, MAX_NUMERICNODE_LEN,
NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
PLOG_DEBUG << "Using STUN server \"" << server.hostname << ":" << server.service
<< "\"";
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-server", nodebuffer, nullptr);
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-server-port",
std::stoul(servbuffer), nullptr);
@ -111,6 +360,56 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
break;
}
// Add TURN servers
for (auto &server : servers) {
if (server.hostname.empty())
continue;
if (server.type != IceServer::Type::Turn)
continue;
if (server.service.empty())
server.service = server.relayType == IceServer::RelayType::TurnTls ? "5349" : "3478";
struct addrinfo hints = {};
hints.ai_family = AF_UNSPEC;
hints.ai_socktype =
server.relayType == IceServer::RelayType::TurnUdp ? SOCK_DGRAM : SOCK_STREAM;
hints.ai_protocol =
server.relayType == IceServer::RelayType::TurnUdp ? IPPROTO_UDP : IPPROTO_TCP;
hints.ai_flags = AI_ADDRCONFIG;
struct addrinfo *result = nullptr;
if (getaddrinfo(server.hostname.c_str(), server.service.c_str(), &hints, &result) != 0)
continue;
for (auto p = result; p; p = p->ai_next) {
if (p->ai_family == AF_INET || p->ai_family == AF_INET6) {
char nodebuffer[MAX_NUMERICNODE_LEN];
char servbuffer[MAX_NUMERICSERV_LEN];
if (getnameinfo(p->ai_addr, p->ai_addrlen, nodebuffer, MAX_NUMERICNODE_LEN,
servbuffer, MAX_NUMERICNODE_LEN,
NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
NiceRelayType niceRelayType;
switch (server.relayType) {
case IceServer::RelayType::TurnTcp:
niceRelayType = NICE_RELAY_TYPE_TURN_TCP;
break;
case IceServer::RelayType::TurnTls:
niceRelayType = NICE_RELAY_TYPE_TURN_TLS;
break;
default:
niceRelayType = NICE_RELAY_TYPE_TURN_UDP;
break;
}
nice_agent_set_relay_info(mNiceAgent.get(), mStreamId, 1, nodebuffer,
std::stoul(servbuffer), server.username.c_str(),
server.password.c_str(), niceRelayType);
}
}
}
freeaddrinfo(result);
}
g_signal_connect(G_OBJECT(mNiceAgent.get()), "component-state-changed",
G_CALLBACK(StateChangeCallback), this);
g_signal_connect(G_OBJECT(mNiceAgent.get()), "new-candidate-full",
@ -118,10 +417,6 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
g_signal_connect(G_OBJECT(mNiceAgent.get()), "candidate-gathering-done",
G_CALLBACK(GatheringDoneCallback), this);
mStreamId = nice_agent_add_stream(mNiceAgent.get(), 1);
if (!mStreamId)
throw std::runtime_error("Failed to add a stream");
nice_agent_set_stream_name(mNiceAgent.get(), mStreamId, "application");
nice_agent_set_port_range(mNiceAgent.get(), mStreamId, 1, config.portRangeBegin,
config.portRangeEnd);
@ -130,11 +425,18 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
RecvCallback, this);
}
IceTransport::~IceTransport() {}
IceTransport::~IceTransport() { stop(); }
void IceTransport::stop() {
g_main_loop_quit(mMainLoop.get());
mMainLoopThread.join();
if (mTimeoutId) {
g_source_remove(mTimeoutId);
mTimeoutId = 0;
}
if (mMainLoopThread.joinable()) {
PLOG_DEBUG << "Stopping ICE thread";
g_main_loop_quit(mMainLoop.get());
mMainLoopThread.join();
}
}
Description::Role IceTransport::role() const { return mRole; }
@ -142,8 +444,8 @@ Description::Role IceTransport::role() const { return mRole; }
IceTransport::State IceTransport::state() const { return mState; }
Description IceTransport::getLocalDescription(Description::Type type) const {
// RFC 5245: The agent that generated the offer which started the ICE processing MUST take the
// controlling role, and the other MUST take the controlled role.
// RFC 8445: The initiating agent that started the ICE processing MUST take the controlling
// role, and the other MUST take the controlled role.
g_object_set(G_OBJECT(mNiceAgent.get()), "controlling-mode",
type == Description::Type::Offer ? TRUE : FALSE, nullptr);
@ -156,14 +458,15 @@ void IceTransport::setRemoteDescription(const Description &description) {
mRole = description.role() == Description::Role::Active ? Description::Role::Passive
: Description::Role::Active;
mMid = description.mid();
mTrickleTimeout = description.trickleEnabled() ? 30s : 0s;
if (nice_agent_parse_remote_sdp(mNiceAgent.get(), string(description).c_str()) < 0)
// Warning: libnice expects "\n" as end of line
if (nice_agent_parse_remote_sdp(mNiceAgent.get(), description.generateSdp("\n").c_str()) < 0)
throw std::runtime_error("Failed to parse remote SDP");
}
bool IceTransport::addRemoteCandidate(const Candidate &candidate) {
// Don't try to pass unresolved candidates to libnice for more safety
if (!candidate.isResolved())
return false;
@ -199,6 +502,7 @@ std::optional<string> IceTransport::getLocalAddress() const {
}
return nullopt;
}
std::optional<string> IceTransport::getRemoteAddress() const {
NiceCandidate *local = nullptr;
NiceCandidate *remote = nullptr;
@ -209,11 +513,11 @@ std::optional<string> IceTransport::getRemoteAddress() const {
}
bool IceTransport::send(message_ptr message) {
if (!message || !mStreamId)
if (!message || (mState != State::Connected && mState != State::Completed))
return false;
outgoing(message);
return true;
PLOG_VERBOSE << "Send size=" << message->size();
return outgoing(message);
}
void IceTransport::incoming(message_ptr message) { recv(message); }
@ -222,9 +526,9 @@ void IceTransport::incoming(const byte *data, int size) {
incoming(make_message(data, data + size));
}
void IceTransport::outgoing(message_ptr message) {
nice_agent_send(mNiceAgent.get(), mStreamId, 1, message->size(),
reinterpret_cast<const char *>(message->data()));
bool IceTransport::outgoing(message_ptr message) {
return nice_agent_send(mNiceAgent.get(), mStreamId, 1, message->size(),
reinterpret_cast<const char *>(message->data())) >= 0;
}
void IceTransport::changeState(State state) {
@ -233,8 +537,14 @@ void IceTransport::changeState(State state) {
}
void IceTransport::changeGatheringState(GatheringState state) {
mGatheringState = state;
mGatheringStateChangeCallback(mGatheringState);
if (mGatheringState.exchange(state) != state)
mGatheringStateChangeCallback(mGatheringState);
}
void IceTransport::processTimeout() {
PLOG_WARNING << "ICE timeout";
mTimeoutId = 0;
changeState(State::Failed);
}
void IceTransport::processCandidate(const string &candidate) {
@ -243,9 +553,20 @@ void IceTransport::processCandidate(const string &candidate) {
void IceTransport::processGatheringDone() { changeGatheringState(GatheringState::Complete); }
void IceTransport::processStateChange(uint32_t state) {
if (state != NICE_COMPONENT_STATE_GATHERING)
changeState(static_cast<State>(state));
void IceTransport::processStateChange(unsigned int state) {
if (state == NICE_COMPONENT_STATE_FAILED && mTrickleTimeout.count() > 0) {
if (mTimeoutId)
g_source_remove(mTimeoutId);
mTimeoutId = g_timeout_add(mTrickleTimeout.count() /* ms */, TimeoutCallback, this);
return;
}
if (state == NICE_COMPONENT_STATE_CONNECTED && mTimeoutId) {
g_source_remove(mTimeoutId);
mTimeoutId = 0;
}
changeState(static_cast<State>(state));
}
string IceTransport::AddressToString(const NiceAddress &addr) {
@ -264,7 +585,7 @@ void IceTransport::CandidateCallback(NiceAgent *agent, NiceCandidate *candidate,
try {
iceTransport->processCandidate(cand);
} catch (const std::exception &e) {
std::cerr << "ICE candidate: " << e.what() << std::endl;
PLOG_WARNING << e.what();
}
g_free(cand);
}
@ -274,17 +595,17 @@ void IceTransport::GatheringDoneCallback(NiceAgent *agent, guint streamId, gpoin
try {
iceTransport->processGatheringDone();
} catch (const std::exception &e) {
std::cerr << "ICE gathering done: " << e.what() << std::endl;
PLOG_WARNING << e.what();
}
}
void IceTransport::StateChangeCallback(NiceAgent *agent, guint streamId, guint componentId,
guint state, gpointer userData) {
guint state, gpointer userData) {
auto iceTransport = static_cast<rtc::IceTransport *>(userData);
try {
iceTransport->processStateChange(state);
} catch (const std::exception &e) {
std::cerr << "ICE change state: " << e.what() << std::endl;
PLOG_WARNING << e.what();
}
}
@ -294,13 +615,40 @@ void IceTransport::RecvCallback(NiceAgent *agent, guint streamId, guint componen
try {
iceTransport->incoming(reinterpret_cast<byte *>(buf), len);
} catch (const std::exception &e) {
std::cerr << "ICE incoming: " << e.what() << std::endl;
PLOG_WARNING << e.what();
}
}
gboolean IceTransport::TimeoutCallback(gpointer userData) {
auto iceTransport = static_cast<rtc::IceTransport *>(userData);
try {
iceTransport->processTimeout();
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
return FALSE;
}
void IceTransport::LogCallback(const gchar *logDomain, GLogLevelFlags logLevel,
const gchar *message, gpointer userData) {
std::cout << message << std::endl;
plog::Severity severity;
unsigned int flags = logLevel & G_LOG_LEVEL_MASK;
if (flags & G_LOG_LEVEL_ERROR)
severity = plog::fatal;
else if (flags & G_LOG_LEVEL_CRITICAL)
severity = plog::error;
else if (flags & G_LOG_LEVEL_WARNING)
severity = plog::warning;
else if (flags & G_LOG_LEVEL_MESSAGE)
severity = plog::info;
else if (flags & G_LOG_LEVEL_INFO)
severity = plog::info;
else
severity = plog::verbose; // libnice debug as verbose
PLOG(severity) << "nice: " << message;
}
} // namespace rtc
#endif

View File

@ -1,5 +1,5 @@
/**
* Copyright (c) 2019 Paul-Louis Ageneau
* Copyright (c) 2019-2020 Paul-Louis Ageneau
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@ -26,25 +26,37 @@
#include "peerconnection.hpp"
#include "transport.hpp"
extern "C" {
#if USE_JUICE
#include <juice/juice.h>
#else
#include <nice/agent.h>
}
#endif
#include <atomic>
#include <chrono>
#include <thread>
namespace rtc {
class IceTransport : public Transport {
public:
enum class State : uint32_t {
#if USE_JUICE
enum class State : unsigned int{
Disconnected = JUICE_STATE_DISCONNECTED,
Connecting = JUICE_STATE_CONNECTING,
Connected = JUICE_STATE_CONNECTED,
Completed = JUICE_STATE_COMPLETED,
Failed = JUICE_STATE_FAILED,
};
#else
enum class State : unsigned int {
Disconnected = NICE_COMPONENT_STATE_DISCONNECTED,
Connecting = NICE_COMPONENT_STATE_CONNECTING,
Connected = NICE_COMPONENT_STATE_CONNECTED,
Completed = NICE_COMPONENT_STATE_READY,
Failed = NICE_COMPONENT_STATE_FAILED
Failed = NICE_COMPONENT_STATE_FAILED,
};
#endif
enum class GatheringState { New = 0, InProgress = 1, Complete = 2 };
using candidate_callback = std::function<void(const Candidate &candidate)>;
@ -71,41 +83,57 @@ public:
bool send(message_ptr message) override; // false if dropped
private:
void incoming(message_ptr message);
void incoming(message_ptr message) override;
void incoming(const byte *data, int size);
void outgoing(message_ptr message);
bool outgoing(message_ptr message) override;
void changeState(State state);
void changeGatheringState(GatheringState state);
void processStateChange(unsigned int state);
void processCandidate(const string &candidate);
void processGatheringDone();
void processStateChange(uint32_t state);
void processTimeout();
Description::Role mRole;
string mMid;
std::chrono::milliseconds mTrickleTimeout;
std::atomic<State> mState;
std::atomic<GatheringState> mGatheringState;
uint32_t mStreamId = 0;
std::unique_ptr<NiceAgent, void (*)(gpointer)> mNiceAgent;
std::unique_ptr<GMainLoop, void (*)(GMainLoop *)> mMainLoop;
std::thread mMainLoopThread;
candidate_callback mCandidateCallback;
state_callback mStateChangeCallback;
gathering_state_callback mGatheringStateChangeCallback;
#if USE_JUICE
std::unique_ptr<juice_agent_t, void (*)(juice_agent_t *)> mAgent;
string mStunHostname;
string mStunService;
static void StateChangeCallback(juice_agent_t *agent, juice_state_t state, void *user_ptr);
static void CandidateCallback(juice_agent_t *agent, const char *sdp, void *user_ptr);
static void GatheringDoneCallback(juice_agent_t *agent, void *user_ptr);
static void RecvCallback(juice_agent_t *agent, const char *data, size_t size, void *user_ptr);
static void LogCallback(juice_log_level_t level, const char *message);
#else
uint32_t mStreamId = 0;
std::unique_ptr<NiceAgent, void (*)(gpointer)> mNiceAgent;
std::unique_ptr<GMainLoop, void (*)(GMainLoop *)> mMainLoop;
std::thread mMainLoopThread;
guint mTimeoutId = 0;
static string AddressToString(const NiceAddress &addr);
static void CandidateCallback(NiceAgent *agent, NiceCandidate *candidate, gpointer userData);
static void GatheringDoneCallback(NiceAgent *agent, guint streamId, gpointer userData);
static void StateChangeCallback(NiceAgent *agent, guint streamId, guint componentId,
guint state, gpointer userData);
guint state, gpointer userData);
static void RecvCallback(NiceAgent *agent, guint stream_id, guint component_id, guint len,
gchar *buf, gpointer userData);
static gboolean TimeoutCallback(gpointer userData);
static void LogCallback(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message,
gpointer user_data);
#endif
};
} // namespace rtc

View File

@ -20,6 +20,7 @@
#include "certificate.hpp"
#include "dtlstransport.hpp"
#include "icetransport.hpp"
#include "include.hpp"
#include "sctptransport.hpp"
#include <iostream>
@ -37,44 +38,66 @@ PeerConnection::PeerConnection(const Configuration &config)
: mConfig(config), mCertificate(make_certificate("libdatachannel")), mState(State::New) {}
PeerConnection::~PeerConnection() {
if (mIceTransport)
mIceTransport->stop();
if (mDtlsTransport)
mDtlsTransport->stop();
if (mSctpTransport)
mSctpTransport->stop();
changeState(State::Destroying);
close();
mSctpTransport.reset();
mDtlsTransport.reset();
mIceTransport.reset();
}
void PeerConnection::close() {
// Close DataChannels
closeDataChannels();
mDataChannels.clear();
// Close Transports
for (int i = 0; i < 2; ++i) { // Make sure a transport wasn't spawn behind our back
if (auto transport = std::atomic_load(&mSctpTransport))
transport->stop();
if (auto transport = std::atomic_load(&mDtlsTransport))
transport->stop();
if (auto transport = std::atomic_load(&mIceTransport))
transport->stop();
}
changeState(State::Closed);
}
const Configuration *PeerConnection::config() const { return &mConfig; }
PeerConnection::State PeerConnection::state() const { return mState; }
PeerConnection::GatheringState PeerConnection::gatheringState() const { return mGatheringState; }
std::optional<Description> PeerConnection::localDescription() const { return mLocalDescription; }
std::optional<Description> PeerConnection::localDescription() const {
std::lock_guard lock(mLocalDescriptionMutex);
return mLocalDescription;
}
std::optional<Description> PeerConnection::remoteDescription() const { return mRemoteDescription; }
std::optional<Description> PeerConnection::remoteDescription() const {
std::lock_guard lock(mRemoteDescriptionMutex);
return mRemoteDescription;
}
void PeerConnection::setRemoteDescription(Description description) {
std::lock_guard lock(mRemoteDescriptionMutex);
auto remoteCandidates = description.extractCandidates();
mRemoteDescription.emplace(std::move(description));
if (!mIceTransport)
initIceTransport(Description::Role::ActPass);
auto iceTransport = std::atomic_load(&mIceTransport);
if (!iceTransport)
iceTransport = initIceTransport(Description::Role::ActPass);
mIceTransport->setRemoteDescription(*mRemoteDescription);
iceTransport->setRemoteDescription(*mRemoteDescription);
if (mRemoteDescription->type() == Description::Type::Offer) {
// This is an offer and we are the answerer.
processLocalDescription(mIceTransport->getLocalDescription(Description::Type::Answer));
mIceTransport->gatherLocalCandidates();
processLocalDescription(iceTransport->getLocalDescription(Description::Type::Answer));
iceTransport->gatherLocalCandidates();
} else {
// This is an answer and we are the offerer.
if (!mSctpTransport && mIceTransport->role() == Description::Role::Active) {
auto sctpTransport = std::atomic_load(&mSctpTransport);
if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
// Since we assumed passive role during DataChannel creation, we need to shift the
// stream numbers by one to shift them from odd to even.
decltype(mDataChannels) newDataChannels;
@ -92,16 +115,19 @@ void PeerConnection::setRemoteDescription(Description description) {
}
void PeerConnection::addRemoteCandidate(Candidate candidate) {
if (!mRemoteDescription || !mIceTransport)
std::lock_guard lock(mRemoteDescriptionMutex);
auto iceTransport = std::atomic_load(&mIceTransport);
if (!mRemoteDescription || !iceTransport)
throw std::logic_error("Remote candidate set without remote description");
mRemoteDescription->addCandidate(candidate);
if (candidate.resolve(Candidate::ResolveMode::Simple)) {
mIceTransport->addRemoteCandidate(candidate);
iceTransport->addRemoteCandidate(candidate);
} else {
// OK, we might need a lookup, do it asynchronously
weak_ptr<IceTransport> weakIceTransport{mIceTransport};
weak_ptr<IceTransport> weakIceTransport{iceTransport};
std::thread t([weakIceTransport, candidate]() mutable {
if (candidate.resolve(Candidate::ResolveMode::Lookup))
if (auto iceTransport = weakIceTransport.lock())
@ -112,11 +138,13 @@ void PeerConnection::addRemoteCandidate(Candidate candidate) {
}
std::optional<string> PeerConnection::localAddress() const {
return mIceTransport ? mIceTransport->getLocalAddress() : nullopt;
auto iceTransport = std::atomic_load(&mIceTransport);
return iceTransport ? iceTransport->getLocalAddress() : nullopt;
}
std::optional<string> PeerConnection::remoteAddress() const {
return mIceTransport ? mIceTransport->getRemoteAddress() : nullopt;
auto iceTransport = std::atomic_load(&mIceTransport);
return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
}
shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
@ -126,7 +154,8 @@ shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
// setup:passive. [...] Thus, setup:active is RECOMMENDED.
// See https://tools.ietf.org/html/rfc5763#section-5
// Therefore, we assume passive role when we are the offerer.
auto role = mIceTransport ? mIceTransport->role() : Description::Role::Passive;
auto iceTransport = std::atomic_load(&mIceTransport);
auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
// The active side must use streams with even identifiers, whereas the passive side must use
// streams with odd identifiers.
@ -142,15 +171,17 @@ shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
std::make_shared<DataChannel>(shared_from_this(), stream, label, protocol, reliability);
mDataChannels.insert(std::make_pair(stream, channel));
if (!mIceTransport) {
if (!iceTransport) {
// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
// setup:actpass.
// See https://tools.ietf.org/html/rfc5763#section-5
initIceTransport(Description::Role::ActPass);
processLocalDescription(mIceTransport->getLocalDescription(Description::Type::Offer));
mIceTransport->gatherLocalCandidates();
} else if (mSctpTransport && mSctpTransport->state() == SctpTransport::State::Connected) {
channel->open(mSctpTransport);
iceTransport = initIceTransport(Description::Role::ActPass);
processLocalDescription(iceTransport->getLocalDescription(Description::Type::Offer));
iceTransport->gatherLocalCandidates();
} else {
if (auto transport = std::atomic_load(&mSctpTransport))
if (transport->state() == SctpTransport::State::Connected)
channel->open(transport);
}
return channel;
}
@ -177,8 +208,12 @@ void PeerConnection::onGatheringStateChange(std::function<void(GatheringState st
mGatheringStateChangeCallback = callback;
}
void PeerConnection::initIceTransport(Description::Role role) {
mIceTransport = std::make_shared<IceTransport>(
shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
std::lock_guard lock(mInitMutex);
if (auto transport = std::atomic_load(&mIceTransport))
return transport;
auto transport = std::make_shared<IceTransport>(
mConfig, role, std::bind(&PeerConnection::processLocalCandidate, this, _1),
[this](IceTransport::State state) {
switch (state) {
@ -191,6 +226,9 @@ void PeerConnection::initIceTransport(Description::Role role) {
case IceTransport::State::Connected:
initDtlsTransport();
break;
case IceTransport::State::Disconnected:
changeState(State::Disconnected);
break;
default:
// Ignore
break;
@ -202,8 +240,7 @@ void PeerConnection::initIceTransport(Description::Role role) {
changeGatheringState(GatheringState::InProgress);
break;
case IceTransport::GatheringState::Complete:
if (mLocalDescription)
mLocalDescription->endCandidates();
endLocalCandidates();
changeGatheringState(GatheringState::Complete);
break;
default:
@ -211,11 +248,18 @@ void PeerConnection::initIceTransport(Description::Role role) {
break;
}
});
std::atomic_store(&mIceTransport, transport);
return transport;
}
void PeerConnection::initDtlsTransport() {
mDtlsTransport = std::make_shared<DtlsTransport>(
mIceTransport, mCertificate, std::bind(&PeerConnection::checkFingerprint, this, _1),
shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
std::lock_guard lock(mInitMutex);
if (auto transport = std::atomic_load(&mDtlsTransport))
return transport;
auto lower = std::atomic_load(&mIceTransport);
auto transport = std::make_shared<DtlsTransport>(
lower, mCertificate, std::bind(&PeerConnection::checkFingerprint, this, _1),
[this](DtlsTransport::State state) {
switch (state) {
case DtlsTransport::State::Connected:
@ -224,17 +268,27 @@ void PeerConnection::initDtlsTransport() {
case DtlsTransport::State::Failed:
changeState(State::Failed);
break;
case DtlsTransport::State::Disconnected:
changeState(State::Disconnected);
break;
default:
// Ignore
break;
}
});
std::atomic_store(&mDtlsTransport, transport);
return transport;
}
void PeerConnection::initSctpTransport() {
uint16_t sctpPort = mRemoteDescription->sctpPort().value_or(DEFAULT_SCTP_PORT);
mSctpTransport = std::make_shared<SctpTransport>(
mDtlsTransport, sctpPort, std::bind(&PeerConnection::forwardMessage, this, _1),
shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
std::lock_guard lock(mInitMutex);
if (auto transport = std::atomic_load(&mSctpTransport))
return transport;
uint16_t sctpPort = remoteDescription()->sctpPort().value_or(DEFAULT_SCTP_PORT);
auto lower = std::atomic_load(&mDtlsTransport);
auto transport = std::make_shared<SctpTransport>(
lower, sctpPort, std::bind(&PeerConnection::forwardMessage, this, _1),
std::bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
[this](SctpTransport::State state) {
switch (state) {
@ -243,9 +297,11 @@ void PeerConnection::initSctpTransport() {
openDataChannels();
break;
case SctpTransport::State::Failed:
remoteCloseDataChannels();
changeState(State::Failed);
break;
case SctpTransport::State::Disconnected:
remoteCloseDataChannels();
changeState(State::Disconnected);
break;
default:
@ -253,9 +309,18 @@ void PeerConnection::initSctpTransport() {
break;
}
});
std::atomic_store(&mSctpTransport, transport);
return transport;
}
void PeerConnection::endLocalCandidates() {
std::lock_guard lock(mLocalDescriptionMutex);
if (mLocalDescription)
mLocalDescription->endCandidates();
}
bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
std::lock_guard lock(mRemoteDescriptionMutex);
if (auto expectedFingerprint =
mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt) {
return *expectedFingerprint == fingerprint;
@ -264,11 +329,8 @@ bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
}
void PeerConnection::forwardMessage(message_ptr message) {
if (!mIceTransport || !mSctpTransport)
throw std::logic_error("Got a DataChannel message without transport");
if (!message) {
closeDataChannels();
remoteCloseDataChannels();
return;
}
@ -281,19 +343,24 @@ void PeerConnection::forwardMessage(message_ptr message) {
}
}
auto iceTransport = std::atomic_load(&mIceTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport);
if (!iceTransport || !sctpTransport)
return;
if (!channel) {
const byte dataChannelOpenMessage{0x03};
unsigned int remoteParity = (mIceTransport->role() == Description::Role::Active) ? 1 : 0;
unsigned int remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
message->stream % 2 == remoteParity) {
channel =
std::make_shared<DataChannel>(shared_from_this(), mSctpTransport, message->stream);
std::make_shared<DataChannel>(shared_from_this(), sctpTransport, message->stream);
channel->onOpen(std::bind(&PeerConnection::triggerDataChannel, this,
weak_ptr<DataChannel>{channel}));
mDataChannels.insert(std::make_pair(message->stream, channel));
} else {
// Invalid, close the DataChannel by resetting the stream
mSctpTransport->reset(message->stream);
sctpTransport->reset(message->stream);
return;
}
}
@ -330,16 +397,24 @@ void PeerConnection::iterateDataChannels(
}
void PeerConnection::openDataChannels() {
iterateDataChannels([this](shared_ptr<DataChannel> channel) { channel->open(mSctpTransport); });
if (auto transport = std::atomic_load(&mSctpTransport))
iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
}
void PeerConnection::closeDataChannels() {
iterateDataChannels([](shared_ptr<DataChannel> channel) { channel->close(); });
iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
}
void PeerConnection::remoteCloseDataChannels() {
iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
}
void PeerConnection::processLocalDescription(Description description) {
auto remoteSctpPort = mRemoteDescription ? mRemoteDescription->sctpPort() : nullopt;
std::optional<uint16_t> remoteSctpPort;
if (auto remote = remoteDescription())
remoteSctpPort = remote->sctpPort();
std::lock_guard lock(mLocalDescriptionMutex);
mLocalDescription.emplace(std::move(description));
mLocalDescription->setFingerprint(mCertificate->fingerprint());
mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
@ -349,6 +424,7 @@ void PeerConnection::processLocalDescription(Description description) {
}
void PeerConnection::processLocalCandidate(Candidate candidate) {
std::lock_guard lock(mLocalDescriptionMutex);
if (!mLocalDescription)
throw std::logic_error("Got a local candidate without local description");
@ -366,7 +442,14 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
}
void PeerConnection::changeState(State state) {
if (mState.exchange(state) != state)
State current;
do {
current = mState.load();
if (current == state || current == State::Destroying)
return;
} while (!mState.compare_exchange_weak(current, state));
if (state != State::Destroying)
mStateChangeCallback(state);
}
@ -396,6 +479,12 @@ std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &st
case State::Failed:
str = "failed";
break;
case State::Closed:
str = "closed";
break;
case State::Destroying:
str = "destroying";
break;
default:
str = "unknown";
break;

View File

@ -17,12 +17,15 @@
*/
#include "datachannel.hpp"
#include "include.hpp"
#include "peerconnection.hpp"
#include <rtc.h>
#include <unordered_map>
#include <plog/Appenders/ColorConsoleAppender.h>
using namespace rtc;
using std::shared_ptr;
using std::string;
@ -41,6 +44,8 @@ void *getUserPointer(int id) {
} // namespace
void rtcInitLogger(rtc_log_level_t level) { InitLogger(static_cast<LogLevel>(level)); }
int rtcCreatePeerConnection(const char **iceServers, int iceServersCount) {
Configuration config;
for (int i = 0; i < iceServersCount; ++i) {
@ -112,8 +117,8 @@ void rtcSetStateChangeCallback(int pc, void (*stateCallback)(rtc_state_t state,
}
void rtcSetGatheringStateChangeCallback(int pc,
void (*gatheringStateCallback)(rtc_gathering_state_t state,
void *)) {
void (*gatheringStateCallback)(rtc_gathering_state_t state,
void *)) {
auto it = peerConnectionMap.find(pc);
if (it == peerConnectionMap.end())
return;
@ -209,4 +214,3 @@ void rtcSetUserPointer(int i, void *ptr) {
else
userPointerMap.erase(i);
}

View File

@ -23,7 +23,12 @@
#include <iostream>
#include <vector>
#ifdef __linux__
#include <arpa/inet.h>
#endif
using namespace std::chrono_literals;
using namespace std::chrono;
using std::shared_ptr;
@ -33,15 +38,23 @@ std::mutex SctpTransport::GlobalMutex;
int SctpTransport::InstancesCount = 0;
void SctpTransport::GlobalInit() {
std::unique_lock<std::mutex> lock(GlobalMutex);
std::lock_guard lock(GlobalMutex);
if (InstancesCount++ == 0) {
usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
usrsctp_sysctl_set_sctp_ecn_enable(0);
usrsctp_sysctl_set_sctp_init_rtx_max_default(5);
usrsctp_sysctl_set_sctp_path_rtx_max_default(5);
usrsctp_sysctl_set_sctp_assoc_rtx_max_default(5); // single path
usrsctp_sysctl_set_sctp_rto_min_default(1 * 1000); // ms
usrsctp_sysctl_set_sctp_rto_max_default(10 * 1000); // ms
usrsctp_sysctl_set_sctp_rto_initial_default(1 * 1000); // ms
usrsctp_sysctl_set_sctp_init_rto_max_default(10 * 1000); // ms
usrsctp_sysctl_set_sctp_heartbeat_interval_default(10 * 1000); // ms
}
}
void SctpTransport::GlobalCleanup() {
std::unique_lock<std::mutex> lock(GlobalMutex);
std::lock_guard lock(GlobalMutex);
if (--InstancesCount == 0) {
usrsctp_finish();
}
@ -55,6 +68,7 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
mStateChangeCallback(std::move(stateChangeCallback)), mState(State::Disconnected) {
onRecv(recvCallback);
PLOG_DEBUG << "Initializing SCTP transport";
GlobalInit();
usrsctp_register_address(this);
@ -143,12 +157,11 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
}
SctpTransport::~SctpTransport() {
if (mSock) {
usrsctp_shutdown(mSock, SHUT_RDWR);
usrsctp_close(mSock);
}
stop();
usrsctp_close(mSock);
usrsctp_deregister_address(this);
GlobalCleanup();
}
@ -156,18 +169,17 @@ SctpTransport::State SctpTransport::state() const { return mState; }
void SctpTransport::stop() {
Transport::stop();
onRecv(nullptr);
mSendQueue.stop();
// Unblock incoming
if (!mConnectDataSent) {
std::unique_lock<std::mutex> lock(mConnectMutex);
mConnectDataSent = true;
mConnectCondition.notify_all();
if (!mShutdown.exchange(true)) {
mSendQueue.stop();
flush();
shutdown();
}
}
void SctpTransport::connect() {
PLOG_DEBUG << "SCTP connect";
changeState(State::Connecting);
struct sockaddr_conn sconn = {};
@ -189,12 +201,25 @@ void SctpTransport::connect() {
throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
}
bool SctpTransport::send(message_ptr message) {
std::lock_guard<std::mutex> lock(mSendMutex);
void SctpTransport::shutdown() {
PLOG_DEBUG << "SCTP shutdown";
if (usrsctp_shutdown(mSock, SHUT_RDWR)) {
PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
}
PLOG_INFO << "SCTP disconnected";
changeState(State::Disconnected);
mWrittenCondition.notify_all();
}
bool SctpTransport::send(message_ptr message) {
std::lock_guard lock(mSendMutex);
if (!message)
return mSendQueue.empty();
PLOG_VERBOSE << "Send size=" << message->size();
// If nothing is pending, try to send directly
if (mSendQueue.empty() && trySendMessage(message))
return true;
@ -204,7 +229,16 @@ bool SctpTransport::send(message_ptr message) {
return false;
}
void SctpTransport::flush() {
std::lock_guard lock(mSendMutex);
trySendQueue();
}
void SctpTransport::reset(unsigned int stream) {
PLOG_DEBUG << "SCTP resetting stream " << stream;
std::unique_lock lock(mWriteMutex);
mWritten = false;
using srs_t = struct sctp_reset_streams;
const size_t len = sizeof(srs_t) + sizeof(uint16_t);
byte buffer[len] = {};
@ -212,25 +246,30 @@ void SctpTransport::reset(unsigned int stream) {
srs.srs_flags = SCTP_STREAM_RESET_OUTGOING;
srs.srs_number_streams = 1;
srs.srs_stream_list[0] = uint16_t(stream);
usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len);
if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len) == 0) {
mWrittenCondition.wait_for(lock, 1000ms,
[&]() { return mWritten || mState != State::Connected; });
} else {
PLOG_WARNING << "SCTP reset stream " << stream << " failed, errno=" << errno;
}
}
void SctpTransport::incoming(message_ptr message) {
if (!message) {
changeState(State::Disconnected);
recv(nullptr);
return;
}
// There could be a race condition here where we receive the remote INIT before the local one is
// sent, which would result in the connection being aborted. Therefore, we need to wait for data
// to be sent on our side (i.e. the local INIT) before proceeding.
if (!mConnectDataSent) {
std::unique_lock<std::mutex> lock(mConnectMutex);
mConnectCondition.wait(lock, [this]() -> bool { return mConnectDataSent; });
{
std::unique_lock lock(mWriteMutex);
mWrittenCondition.wait(lock, [&]() { return mWrittenOnce || mState != State::Connected; });
}
usrsctp_conninput(this, message->data(), message->size(), 0);
if (message) {
usrsctp_conninput(this, message->data(), message->size(), 0);
} else {
PLOG_INFO << "SCTP disconnected";
changeState(State::Disconnected);
recv(nullptr);
}
}
void SctpTransport::changeState(State state) {
@ -252,7 +291,11 @@ bool SctpTransport::trySendQueue() {
bool SctpTransport::trySendMessage(message_ptr message) {
// Requires mSendMutex to be locked
//
if (mState != State::Connected)
return false;
PLOG_VERBOSE << "SCTP try send size=" << message->size();
// TODO: Implement SCTP ndata specification draft when supported everywhere
// See https://tools.ietf.org/html/draft-ietf-tsvwg-sctp-ndata-08
@ -284,7 +327,6 @@ bool SctpTransport::trySendMessage(message_ptr message) {
if (reliability.unordered)
spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
using std::chrono::milliseconds;
switch (reliability.type) {
case Reliability::TYPE_PARTIAL_RELIABLE_REXMIT:
spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
@ -310,12 +352,16 @@ bool SctpTransport::trySendMessage(message_ptr message) {
ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
}
if (ret >= 0)
if (ret >= 0) {
PLOG_VERBOSE << "SCTP sent size=" << message->size();
return true;
else if (errno == EWOULDBLOCK && errno == EAGAIN)
} else if (errno == EWOULDBLOCK && errno == EAGAIN) {
PLOG_VERBOSE << "SCTP sending not possible";
return false;
else
} else {
PLOG_ERROR << "SCTP sending failed, errno=" << errno;
throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
}
}
void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
@ -331,10 +377,8 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
int SctpTransport::handleRecv(struct socket *sock, union sctp_sockstore addr, const byte *data,
size_t len, struct sctp_rcvinfo info, int flags) {
try {
if (!data) {
recv(nullptr);
return 0;
}
if (!len)
return -1;
if (flags & MSG_EOR) {
if (!mPartialRecv.empty()) {
mPartialRecv.insert(mPartialRecv.end(), data, data + len);
@ -353,7 +397,7 @@ int SctpTransport::handleRecv(struct socket *sock, union sctp_sockstore addr, co
mPartialRecv.insert(mPartialRecv.end(), data, data + len);
}
} catch (const std::exception &e) {
std::cerr << "SCTP recv: " << e.what() << std::endl;
PLOG_ERROR << "SCTP recv: " << e.what();
return -1;
}
return 0; // success
@ -361,10 +405,10 @@ int SctpTransport::handleRecv(struct socket *sock, union sctp_sockstore addr, co
int SctpTransport::handleSend(size_t free) {
try {
std::lock_guard<std::mutex> lock(mSendMutex);
std::lock_guard lock(mSendMutex);
trySendQueue();
} catch (const std::exception &e) {
std::cerr << "SCTP send: " << e.what() << std::endl;
PLOG_ERROR << "SCTP send: " << e.what();
return -1;
}
return 0; // success
@ -372,15 +416,14 @@ int SctpTransport::handleSend(size_t free) {
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df) {
try {
outgoing(make_message(data, data + len));
if (!mConnectDataSent) {
std::unique_lock<std::mutex> lock(mConnectMutex);
mConnectDataSent = true;
mConnectCondition.notify_all();
}
std::unique_lock lock(mWriteMutex);
if (!outgoing(make_message(data, data + len)))
return -1;
mWritten = true;
mWrittenOnce = true;
mWrittenCondition.notify_all();
} catch (const std::exception &e) {
std::cerr << "SCTP write: " << e.what() << std::endl;
PLOG_ERROR << "SCTP write: " << e.what();
return -1;
}
return 0; // success
@ -441,7 +484,7 @@ void SctpTransport::processData(const byte *data, size_t len, uint16_t sid, Payl
default:
// Unknown
std::cerr << "Unknown PPID: " << uint32_t(ppid) << std::endl;
PLOG_WARNING << "Unknown PPID: " << uint32_t(ppid);
return;
}
}
@ -453,36 +496,38 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
switch (notify->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE: {
const struct sctp_assoc_change &assoc_change = notify->sn_assoc_change;
std::unique_lock<std::mutex> lock(mConnectMutex);
if (assoc_change.sac_state == SCTP_COMM_UP) {
PLOG_INFO << "SCTP connected";
changeState(State::Connected);
} else {
if (mState == State::Connecting) {
std::cerr << "SCTP connection failed" << std::endl;
PLOG_ERROR << "SCTP connection failed";
changeState(State::Failed);
} else {
PLOG_INFO << "SCTP disconnected";
changeState(State::Disconnected);
}
mWrittenCondition.notify_all();
}
}
case SCTP_SENDER_DRY_EVENT: {
// It not should be necessary since the send callback should have been called already,
// but to be sure, let's try to send now.
std::lock_guard<std::mutex> lock(mSendMutex);
std::lock_guard lock(mSendMutex);
trySendQueue();
}
case SCTP_STREAM_RESET_EVENT: {
const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
const uint16_t flags = reset_event.strreset_flags;
if (reset_event.strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
for (int i = 0; i < count; ++i) {
uint16_t streamId = reset_event.strreset_stream_list[i];
reset(streamId);
}
}
if (reset_event.strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
const byte dataChannelCloseMessage{0x04};
for (int i = 0; i < count; ++i) {
uint16_t streamId = reset_event.strreset_stream_list[i];

View File

@ -28,9 +28,13 @@
#include <functional>
#include <map>
#include <mutex>
#include <thread>
#ifdef _WIN32
#include <winsock2.h>
#elif __linux__
#include <sys/socket.h>
#endif
#include <sys/types.h>
#include "usrsctp.h"
@ -52,6 +56,7 @@ public:
void stop() override;
bool send(message_ptr message) override; // false if buffered
void flush();
void reset(unsigned int stream);
private:
@ -68,7 +73,8 @@ private:
};
void connect();
void incoming(message_ptr message);
void shutdown();
void incoming(message_ptr message) override;
void changeState(State state);
bool trySendQueue();
@ -91,10 +97,12 @@ private:
std::map<uint16_t, size_t> mBufferedAmount;
amount_callback mBufferedAmountCallback;
std::mutex mConnectMutex;
std::condition_variable mConnectCondition;
std::atomic<bool> mConnectDataSent = false;
std::atomic<bool> mStopping = false;
std::recursive_mutex mWriteMutex;
std::condition_variable_any mWrittenCondition;
bool mWritten = false;
bool mWrittenOnce = false;
std::atomic<bool> mShutdown = false;
state_callback mStateChangeCallback;
std::atomic<State> mState;

View File

@ -33,12 +33,16 @@ using namespace std::placeholders;
class Transport {
public:
Transport(std::shared_ptr<Transport> lower = nullptr) : mLower(std::move(lower)) {
if (auto lower = std::atomic_load(&mLower))
lower->onRecv(std::bind(&Transport::incoming, this, _1));
if (mLower)
mLower->onRecv(std::bind(&Transport::incoming, this, _1));
}
virtual ~Transport() { stop(); }
virtual void stop() {
if (mLower)
mLower->onRecv(nullptr);
}
virtual ~Transport() {}
virtual void stop() { resetLower(); }
virtual bool send(message_ptr message) = 0;
void onRecv(message_callback callback) { mRecvCallback = std::move(callback); }
@ -46,15 +50,12 @@ public:
protected:
void recv(message_ptr message) { mRecvCallback(message); }
void resetLower() {
if (auto lower = std::atomic_exchange(&mLower, std::shared_ptr<Transport>(nullptr)))
lower->onRecv(nullptr);
}
virtual void incoming(message_ptr message) = 0;
virtual void outgoing(message_ptr message) {
if (auto lower = std::atomic_load(&mLower))
lower->send(message);
virtual bool outgoing(message_ptr message) {
if (mLower)
return mLower->send(message);
else
return false;
}
private:

View File

@ -23,29 +23,56 @@
#include <memory>
#include <thread>
#ifdef _WIN32
#include <winsock2.h>
#endif
using namespace rtc;
using namespace std;
template <class T>
weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
int main(int argc, char **argv) {
rtc::Configuration config;
// config.iceServers.emplace_back("stun.l.google.com:19302");
InitLogger(LogLevel::Warning);
Configuration config;
// config.iceServers.emplace_back("stun:stun.l.google.com:19302");
// config.enableIceTcp = true;
// TURN server example
// IceServer turnServer("TURN_SERVER_URL", "PORT_NO", "USERNAME", "PASSWORD",
// IceServer::RelayType::TurnUdp);
// config.iceServers.push_back(turnServer);
auto pc1 = std::make_shared<PeerConnection>(config);
auto pc2 = std::make_shared<PeerConnection>(config);
#ifdef _WIN32
WSADATA wsaData;
int iResult;
// Initialize Winsock
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != 0) {
std::string err("WSAStartup failed. Error:");
err.append(WSAGetLastError() + "");
std::cout << err;
return -1;
}
#endif
pc1->onLocalDescription([wpc2 = make_weak_ptr(pc2)](const Description &sdp) {
auto pc2 = wpc2.lock();
if (!pc2) return;
auto pc2 = wpc2.lock();
if (!pc2)
return;
cout << "Description 1: " << sdp << endl;
pc2->setRemoteDescription(sdp);
});
pc1->onLocalCandidate([wpc2 = make_weak_ptr(pc2)](const Candidate &candidate) {
auto pc2 = wpc2.lock();
if (!pc2) return;
auto pc2 = wpc2.lock();
if (!pc2)
return;
cout << "Candidate 1: " << candidate << endl;
pc2->addRemoteCandidate(candidate);
});
@ -56,15 +83,17 @@ int main(int argc, char **argv) {
});
pc2->onLocalDescription([wpc1 = make_weak_ptr(pc1)](const Description &sdp) {
auto pc1 = wpc1.lock();
if (!pc1) return;
auto pc1 = wpc1.lock();
if (!pc1)
return;
cout << "Description 2: " << sdp << endl;
pc1->setRemoteDescription(sdp);
});
pc2->onLocalCandidate([wpc1 = make_weak_ptr(pc1)](const Candidate &candidate) {
auto pc1 = wpc1.lock();
if (!pc1) return;
auto pc1 = wpc1.lock();
if (!pc1)
return;
cout << "Candidate 2: " << candidate << endl;
pc1->addRemoteCandidate(candidate);
});
@ -88,8 +117,9 @@ int main(int argc, char **argv) {
auto dc1 = pc1->createDataChannel("test");
dc1->onOpen([wdc1 = make_weak_ptr(dc1)]() {
auto dc1 = wdc1.lock();
if (!dc1) return;
auto dc1 = wdc1.lock();
if (!dc1)
return;
cout << "DataChannel open: " << dc1->label() << endl;
dc1->send("Hello from 1");
});
@ -101,15 +131,29 @@ int main(int argc, char **argv) {
this_thread::sleep_for(3s);
if (auto addr = pc1->localAddress())
cout << "Local address 1: " << *addr << endl;
if (auto addr = pc1->remoteAddress())
cout << "Remote address 1: " << *addr << endl;
if (auto addr = pc2->localAddress())
cout << "Local address 2: " << *addr << endl;
if (auto addr = pc2->remoteAddress())
cout << "Remote address 2: " << *addr << endl;
if (dc1->isOpen() && dc2->isOpen()) {
dc1->close();
dc2->close();
pc1->close();
pc2->close();
cout << "Success" << endl;
#ifdef _WIN32
WSACleanup();
#endif
return 0;
} else {
cout << "Failure" << endl;
#ifdef _WIN32
WSACleanup();
#endif
return 1;
}
}

9
test/p2p/README.md Normal file
View File

@ -0,0 +1,9 @@
* Execute ```offerer``` app in console
* Execute ```answerer``` app in another console
* Copy "Local Description" from ```offerer```
* Enter 1 to ```answerer```
* Paste copied description, press enter
* Redo same procedure for ```answerer```
* Redo same procedure for candidates
* Wait for "DataChannel open" message
* Send message from one peer to another

168
test/p2p/answerer.cpp Normal file
View File

@ -0,0 +1,168 @@
/**
* Copyright (c) 2019 Paul-Louis Ageneau, Murat Dogan
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "rtc/rtc.hpp"
#include <chrono>
#include <iostream>
#include <memory>
#ifdef _WIN32
#include <winsock2.h>
#endif
using namespace rtc;
using namespace std;
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
int main(int argc, char **argv) {
InitLogger(LogLevel::Warning);
Configuration config;
// config.iceServers.emplace_back("stun.l.google.com:19302");
// config.enableIceTcp = true;
// TURN Server example
// IceServer turnServer("TURN_SERVER_URL", "PORT_NO", "USERNAME", "PASSWORD",
// IceServer::RelayType::TurnUdp);
// config.iceServers.push_back(turnServer);
auto pc = std::make_shared<PeerConnection>(config);
#ifdef _WIN32
WSADATA wsaData;
int iResult;
// Initialize Winsock
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != 0) {
std::string err("WSAStartup failed. Error:");
err.append(WSAGetLastError() + "");
std::cout << err;
return -1;
}
#endif
pc->onLocalDescription([](const Description &sdp) {
std::string s(sdp);
std::replace(s.begin(), s.end(), '\n', static_cast<char>(94));
std::replace(s.begin(), s.end(), '\r', static_cast<char>(95));
cout << "Local Description (Paste this to other peer):" << endl << s << endl << endl;
});
pc->onLocalCandidate([](const Candidate &candidate) {
cout << "Local Candidate (Paste this to other peer):" << endl << candidate << endl << endl;
});
pc->onStateChange(
[](PeerConnection::State state) { cout << "[ State: " << state << " ]" << endl; });
pc->onGatheringStateChange([](PeerConnection::GatheringState state) {
cout << "[ Gathering State: " << state << " ]" << endl;
});
shared_ptr<DataChannel> dc = nullptr;
pc->onDataChannel([&](shared_ptr<DataChannel> _dc) {
cout << "[ Got a DataChannel with label: " << _dc->label() << " ]" << endl;
dc = _dc;
dc->onClosed([&]() { cout << "[ DataChannel closed: " << dc->label() << " ]" << endl; });
dc->onMessage([](const variant<binary, string> &message) {
if (holds_alternative<string>(message)) {
cout << "[ Received: " << get<string>(message) << " ]" << endl;
}
});
});
bool exit = false;
while (!exit) {
cout << endl
<< endl
<< "*************************************************************************" << endl
<< "* 0: Exit /"
<< " 1: Enter Description /"
<< " 2: Enter Candidate /"
<< " 3: Send Message *" << endl
<< " [Command]: ";
int command;
std::string sdp, candidate, message;
const char *a;
std::unique_ptr<Candidate> candidatePtr;
std::unique_ptr<Description> descPtr;
cin >> command;
switch (command) {
case 0:
exit = true;
break;
case 1:
// Parse Description
cout << "[SDP]: ";
sdp = "";
while (sdp.length() == 0)
getline(cin, sdp);
std::replace(sdp.begin(), sdp.end(), static_cast<char>(94), '\n');
std::replace(sdp.begin(), sdp.end(), static_cast<char>(95), '\r');
descPtr = std::make_unique<Description>(sdp, Description::Type::Offer,
Description::Role::Passive);
pc->setRemoteDescription(*descPtr);
break;
case 2:
// Parse Candidate
cout << "[Candidate]: ";
candidate = "";
while (candidate.length() == 0)
getline(cin, candidate);
candidatePtr = std::make_unique<Candidate>(candidate);
pc->addRemoteCandidate(*candidatePtr);
break;
case 3:
// Send Message
if (!dc || !dc->isOpen()) {
cout << "** Channel is not Open ** ";
break;
}
cout << "[Message]: ";
message = "";
while (message.length() == 0)
getline(cin, message);
dc->send(message);
break;
default:
cout << "** Invalid Command ** ";
break;
}
}
if (dc)
dc->close();
if (pc)
pc->close();
#ifdef _WIN32
WSACleanup();
#endif
}

165
test/p2p/offerer.cpp Normal file
View File

@ -0,0 +1,165 @@
/**
* Copyright (c) 2019 Paul-Louis Ageneau, Murat Dogan
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "rtc/rtc.hpp"
#include <chrono>
#include <iostream>
#include <memory>
#ifdef _WIN32
#include <winsock2.h>
#endif
using namespace rtc;
using namespace std;
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
int main(int argc, char **argv) {
InitLogger(LogLevel::Warning);
Configuration config;
// config.iceServers.emplace_back("stun.l.google.com:19302");
// config.enableIceTcp = true;
// TURN server example
// IceServer turnServer("TURN_SERVER_URL", "PORT_NO", "USERNAME", "PASSWORD",
// IceServer::RelayType::TurnUdp);
// config.iceServers.push_back(turnServer);
auto pc = std::make_shared<PeerConnection>(config);
#ifdef _WIN32
WSADATA wsaData;
int iResult;
// Initialize Winsock
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != 0) {
std::string err("WSAStartup failed. Error:");
err.append(WSAGetLastError() + "");
std::cout << err;
return -1;
}
#endif
pc->onLocalDescription([](const Description &sdp) {
std::string s(sdp);
std::replace(s.begin(), s.end(), '\n', static_cast<char>(94));
std::replace(s.begin(), s.end(), '\r', static_cast<char>(95));
cout << "Local Description (Paste this to other peer):" << endl << s << endl << endl;
});
pc->onLocalCandidate([](const Candidate &candidate) {
cout << "Local Candidate (Paste this to other peer):" << endl << candidate << endl << endl;
});
pc->onStateChange(
[](PeerConnection::State state) { cout << "[ State: " << state << " ]" << endl; });
pc->onGatheringStateChange([](PeerConnection::GatheringState state) {
cout << "[ Gathering State: " << state << " ]" << endl;
});
auto dc = pc->createDataChannel("test");
dc->onOpen([&]() { cout << "[ DataChannel open: " << dc->label() << " ]" << endl; });
dc->onClosed([&]() { cout << "[ DataChannel closed: " << dc->label() << " ]" << endl; });
dc->onMessage([](const variant<binary, string> &message) {
if (holds_alternative<string>(message)) {
cout << "[ Received: " << get<string>(message) << " ]" << endl;
}
});
bool exit = false;
while (!exit) {
cout << endl
<< endl
<< "*************************************************************************" << endl
<< "* 0: Exit /"
<< " 1: Enter Description /"
<< " 2: Enter Candidate /"
<< " 3: Send Message *" << endl
<< " [Command]: ";
int command;
std::string sdp, candidate, message;
const char *a;
std::unique_ptr<Candidate> candidatePtr;
std::unique_ptr<Description> descPtr;
cin >> command;
switch (command) {
case 0:
exit = true;
break;
case 1:
// Parse Description
cout << "[SDP]: ";
sdp = "";
while (sdp.length() == 0)
getline(cin, sdp);
std::replace(sdp.begin(), sdp.end(), static_cast<char>(94), '\n');
std::replace(sdp.begin(), sdp.end(), static_cast<char>(95), '\r');
descPtr = std::make_unique<Description>(sdp);
pc->setRemoteDescription(*descPtr);
break;
case 2:
// Parse Candidate
cout << "[Candidate]: ";
candidate = "";
while (candidate.length() == 0)
getline(cin, candidate);
candidatePtr = std::make_unique<Candidate>(candidate);
pc->addRemoteCandidate(*candidatePtr);
break;
case 3:
// Send Message
if (!dc->isOpen()) {
cout << "** Channel is not Open ** ";
break;
}
cout << "[Message]: ";
message = "";
while (message.length() == 0)
getline(cin, message);
dc->send(message);
break;
default:
cout << "** Invalid Command ** ";
break;
}
}
if (dc)
dc->close();
if (pc)
pc->close();
#ifdef _WIN32
WSACleanup();
#endif
}

Submodule usrsctp deleted from 04d617c9c1