mirror of
https://github.com/mii443/libdatachannel.git
synced 2025-08-23 15:48:03 +00:00
Compare commits
39 Commits
Author | SHA1 | Date | |
---|---|---|---|
2129e3cfb9 | |||
58c8bad453 | |||
1566c0ef21 | |||
03399e4b55 | |||
0066b3aef0 | |||
75f23f202f | |||
23e1a75248 | |||
f930cfbe44 | |||
72a0e2fe07 | |||
f90ffbcf86 | |||
a6992c765d | |||
1602498eab | |||
e4ace4a750 | |||
b5a13d2d66 | |||
aeb777aa49 | |||
7a552bb0fa | |||
402a4df4a0 | |||
34ef87e271 | |||
522319ac5d | |||
1ac00ce396 | |||
92f08948d3 | |||
9749f8d63e | |||
8626a07824 | |||
37ca38999c | |||
18eeac3c0c | |||
c7de492b4b | |||
901700177b | |||
88348732d9 | |||
281eea2cec | |||
28f923b1ce | |||
48bdb6a1c9 | |||
278ac22766 | |||
4fb14244db | |||
432be41b9a | |||
78c80992bc | |||
8b64f8a406 | |||
1d7d1358be | |||
5fc6a1c8ad | |||
e5a19f85ed |
5
.gitmodules
vendored
5
.gitmodules
vendored
@ -1,3 +1,6 @@
|
||||
[submodule "usrsctp"]
|
||||
path = usrsctp
|
||||
path = deps/usrsctp
|
||||
url = https://github.com/sctplab/usrsctp.git
|
||||
[submodule "deps/plog"]
|
||||
path = deps/plog
|
||||
url = https://github.com/SergiusTheBest/plog
|
||||
|
@ -25,13 +25,21 @@ set(TESTS_SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test/main.cpp
|
||||
)
|
||||
|
||||
# Hack because usrsctp uses CMAKE_SOURCE_DIR instead of CMAKE_CURRENT_SOURCE_DIR
|
||||
set(CMAKE_REQUIRED_FLAGS "-I${CMAKE_CURRENT_SOURCE_DIR}/usrsctp/usrsctplib")
|
||||
set(TESTS_OFFERER_SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test/p2p/offerer.cpp
|
||||
)
|
||||
|
||||
add_subdirectory(usrsctp EXCLUDE_FROM_ALL)
|
||||
set(TESTS_ANSWERER_SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test/p2p/answerer.cpp
|
||||
)
|
||||
|
||||
# Hack because usrsctp uses CMAKE_SOURCE_DIR instead of CMAKE_CURRENT_SOURCE_DIR
|
||||
set(CMAKE_REQUIRED_FLAGS "-I${CMAKE_CURRENT_SOURCE_DIR}/deps/usrsctp/usrsctplib")
|
||||
|
||||
add_subdirectory(deps/usrsctp EXCLUDE_FROM_ALL)
|
||||
|
||||
# Set include directory and custom options to make usrsctp compile with recent g++
|
||||
target_include_directories(usrsctp-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/usrsctp/usrsctplib)
|
||||
target_include_directories(usrsctp-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/deps/usrsctp/usrsctplib)
|
||||
|
||||
if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
|
||||
# using regular Clang or AppleClang: Needed since they don't have -Wno-error=format-truncation
|
||||
@ -43,7 +51,7 @@ else()
|
||||
target_compile_options(usrsctp-static PRIVATE -Wno-error=format-truncation)
|
||||
else()
|
||||
target_compile_options(usrsctp-static PRIVATE -Wno-error=address-of-packed-member -Wno-error=format-truncation)
|
||||
endif()
|
||||
endif()
|
||||
else()
|
||||
# all other compilers
|
||||
target_compile_options(usrsctp-static PRIVATE -Wno-error=address-of-packed-member -Wno-error=format-truncation)
|
||||
@ -54,6 +62,9 @@ option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
|
||||
|
||||
find_package(LibNice REQUIRED)
|
||||
|
||||
set(THREADS_PREFER_PTHREAD_FLAG ON)
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
add_library(datachannel SHARED ${LIBDATACHANNEL_SOURCES})
|
||||
set_target_properties(datachannel PROPERTIES
|
||||
VERSION ${PROJECT_VERSION}
|
||||
@ -62,7 +73,12 @@ set_target_properties(datachannel PROPERTIES
|
||||
target_include_directories(datachannel PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
|
||||
target_include_directories(datachannel PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc)
|
||||
target_include_directories(datachannel PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
|
||||
target_link_libraries(datachannel usrsctp-static LibNice::LibNice)
|
||||
target_include_directories(datachannel PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/deps/plog/include)
|
||||
target_link_libraries(datachannel
|
||||
Threads::Threads
|
||||
usrsctp-static
|
||||
LibNice::LibNice
|
||||
)
|
||||
|
||||
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL ${LIBDATACHANNEL_SOURCES})
|
||||
set_target_properties(datachannel-static PROPERTIES
|
||||
@ -72,7 +88,12 @@ set_target_properties(datachannel-static PROPERTIES
|
||||
target_include_directories(datachannel-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
|
||||
target_include_directories(datachannel-static PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc)
|
||||
target_include_directories(datachannel-static PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
|
||||
target_link_libraries(datachannel-static usrsctp-static LibNice::LibNice)
|
||||
target_include_directories(datachannel-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/deps/plog/include)
|
||||
target_link_libraries(datachannel-static
|
||||
Threads::Threads
|
||||
usrsctp-static
|
||||
LibNice::LibNice
|
||||
)
|
||||
|
||||
if (USE_GNUTLS)
|
||||
find_package(GnuTLS REQUIRED)
|
||||
@ -99,6 +120,7 @@ endif()
|
||||
add_library(LibDataChannel::LibDataChannel ALIAS datachannel)
|
||||
add_library(LibDataChannel::LibDataChannelStatic ALIAS datachannel-static)
|
||||
|
||||
# Main Test
|
||||
add_executable(tests ${TESTS_SOURCES})
|
||||
set_target_properties(tests PROPERTIES
|
||||
VERSION ${PROJECT_VERSION}
|
||||
@ -106,3 +128,18 @@ set_target_properties(tests PROPERTIES
|
||||
|
||||
target_link_libraries(tests datachannel)
|
||||
|
||||
# P2P Test: offerer
|
||||
add_executable(offerer ${TESTS_OFFERER_SOURCES})
|
||||
set_target_properties(offerer PROPERTIES
|
||||
VERSION ${PROJECT_VERSION}
|
||||
CXX_STANDARD 17)
|
||||
|
||||
target_link_libraries(offerer datachannel)
|
||||
|
||||
# P2P Test: answerer
|
||||
add_executable(answerer ${TESTS_ANSWERER_SOURCES})
|
||||
set_target_properties(answerer PROPERTIES
|
||||
VERSION ${PROJECT_VERSION}
|
||||
CXX_STANDARD 17)
|
||||
|
||||
target_link_libraries(answerer datachannel)
|
||||
|
15
Jamfile
15
Jamfile
@ -13,26 +13,35 @@ lib libdatachannel
|
||||
<link>static
|
||||
: # usage requirements
|
||||
<include>./include
|
||||
<library>/libdatachannel//plog
|
||||
<cxxflags>-pthread
|
||||
<linkflags>"`pkg-config --libs openssl glib-2.0 gobject-2.0 nice`"
|
||||
;
|
||||
|
||||
alias plog
|
||||
: # no sources
|
||||
: # no build requirements
|
||||
: # no default build
|
||||
: # usage requirements
|
||||
<include>./deps/plog/include
|
||||
;
|
||||
|
||||
alias usrsctp
|
||||
: # no sources
|
||||
: # no build requirements
|
||||
: # no default build
|
||||
: # usage requirements
|
||||
<include>./usrsctp/usrsctplib
|
||||
<include>./deps/usrsctp/usrsctplib
|
||||
<library>libusrsctp.a
|
||||
;
|
||||
|
||||
make libusrsctp.a : : @make_libusrsctp ;
|
||||
actions make_libusrsctp
|
||||
{
|
||||
(cd $(CWD)/usrsctp && \
|
||||
(cd $(CWD)/deps/usrsctp && \
|
||||
./bootstrap && \
|
||||
./configure --enable-static --disable-debug CFLAGS="-fPIC -Wno-address-of-packed-member" && \
|
||||
make)
|
||||
cp $(CWD)/usrsctp/usrsctplib/.libs/libusrsctp.a $(<)
|
||||
cp $(CWD)/deps/usrsctp/usrsctplib/.libs/libusrsctp.a $(<)
|
||||
}
|
||||
|
||||
|
9
Makefile
9
Makefile
@ -8,10 +8,11 @@ CPPFLAGS=-O2 -pthread -fPIC -Wall -Wno-address-of-packed-member
|
||||
CXXFLAGS=-std=c++17
|
||||
LDFLAGS=-pthread
|
||||
LIBS=glib-2.0 gobject-2.0 nice
|
||||
USRSCTP_DIR=usrsctp
|
||||
USRSCTP_DIR=deps/usrsctp
|
||||
PLOG_DIR=deps/plog
|
||||
|
||||
USE_GNUTLS ?= 0
|
||||
ifeq ($(USE_GNUTLS), 1)
|
||||
ifneq ($(USE_GNUTLS), 0)
|
||||
CPPFLAGS+= -DUSE_GNUTLS=1
|
||||
LIBS+= gnutls
|
||||
else
|
||||
@ -20,7 +21,7 @@ else
|
||||
endif
|
||||
|
||||
LDLIBS= $(shell pkg-config --libs $(LIBS))
|
||||
INCLUDES=-Iinclude/rtc -I$(USRSCTP_DIR)/usrsctplib $(shell pkg-config --cflags $(LIBS))
|
||||
INCLUDES=-Iinclude/rtc -I$(PLOG_DIR)/include -I$(USRSCTP_DIR)/usrsctplib $(shell pkg-config --cflags $(LIBS))
|
||||
|
||||
SRCS=$(shell printf "%s " src/*.cpp)
|
||||
OBJS=$(subst .cpp,.o,$(SRCS))
|
||||
@ -31,7 +32,7 @@ src/%.o: src/%.cpp
|
||||
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) -MMD -MP -o $@ -c $<
|
||||
|
||||
test/%.o: test/%.cpp
|
||||
$(CXX) $(CXXFLAGS) $(CPPFLAGS) -Iinclude -MMD -MP -o $@ -c $<
|
||||
$(CXX) $(CXXFLAGS) $(CPPFLAGS) -Iinclude -I$(PLOG_DIR)/include -MMD -MP -o $@ -c $<
|
||||
|
||||
-include $(subst .cpp,.d,$(SRCS))
|
||||
|
||||
|
1
deps/plog
vendored
Submodule
1
deps/plog
vendored
Submodule
Submodule deps/plog added at 2931644689
0
usrsctp → deps/usrsctp
vendored
0
usrsctp → deps/usrsctp
vendored
@ -27,16 +27,28 @@
|
||||
namespace rtc {
|
||||
|
||||
struct IceServer {
|
||||
enum class Type { Stun, Turn };
|
||||
|
||||
// Don' Change It! It should be same order as enum NiceRelayType
|
||||
enum class RelayType { TurnUdp, TurnTcp, TurnTls };
|
||||
|
||||
IceServer(const string &host_);
|
||||
IceServer(const string &hostname_, uint16_t port_);
|
||||
IceServer(const string &hostname_, const string &service_);
|
||||
IceServer(const string &hostname_, const string &service_, string username_, string password_,
|
||||
RelayType relayType_);
|
||||
|
||||
string hostname;
|
||||
string service;
|
||||
Type type;
|
||||
string username;
|
||||
string password;
|
||||
RelayType relayType;
|
||||
};
|
||||
|
||||
struct Configuration {
|
||||
std::vector<IceServer> iceServers;
|
||||
bool enableIceTcp = false;
|
||||
uint16_t portRangeBegin = 1024;
|
||||
uint16_t portRangeEnd = 65535;
|
||||
};
|
||||
|
@ -36,7 +36,7 @@ namespace rtc {
|
||||
class SctpTransport;
|
||||
class PeerConnection;
|
||||
|
||||
class DataChannel : public Channel {
|
||||
class DataChannel : public std::enable_shared_from_this<DataChannel>, public Channel {
|
||||
public:
|
||||
DataChannel(std::shared_ptr<PeerConnection> pc, unsigned int stream, string label,
|
||||
string protocol, Reliability reliability);
|
||||
@ -66,12 +66,13 @@ public:
|
||||
Reliability reliability() const;
|
||||
|
||||
private:
|
||||
void remoteClose();
|
||||
void open(std::shared_ptr<SctpTransport> sctpTransport);
|
||||
bool outgoing(mutable_message_ptr message);
|
||||
void incoming(message_ptr message);
|
||||
void processOpenMessage(message_ptr message);
|
||||
|
||||
std::shared_ptr<PeerConnection> mPeerConnection;
|
||||
const std::shared_ptr<PeerConnection> mPeerConnection;
|
||||
std::shared_ptr<SctpTransport> mSctpTransport;
|
||||
|
||||
unsigned int mStream;
|
||||
|
@ -45,6 +45,7 @@ public:
|
||||
std::optional<string> fingerprint() const;
|
||||
std::optional<uint16_t> sctpPort() const;
|
||||
std::optional<size_t> maxMessageSize() const;
|
||||
bool trickleEnabled() const;
|
||||
|
||||
void setFingerprint(string fingerprint);
|
||||
void setSctpPort(uint16_t port);
|
||||
|
@ -27,6 +27,9 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "plog/Appenders/ColorConsoleAppender.h"
|
||||
#include "plog/Log.h"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
using std::byte;
|
||||
@ -48,6 +51,27 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
|
||||
const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536; // Remote max message size if not specified in SDP
|
||||
const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
|
||||
|
||||
inline void InitLogger(plog::Severity severity, plog::IAppender *appender = nullptr) {
|
||||
static plog::ColorConsoleAppender<plog::TxtFormatter> consoleAppender;
|
||||
if (!appender)
|
||||
appender = &consoleAppender;
|
||||
plog::init(severity, appender);
|
||||
PLOG_DEBUG << "Logger initialized";
|
||||
}
|
||||
|
||||
// Don't change, it must match plog severity
|
||||
enum class LogLevel {
|
||||
None = 0,
|
||||
Fatal = 1,
|
||||
Error = 2,
|
||||
Warning = 3,
|
||||
Info = 4,
|
||||
Debug = 5,
|
||||
Verbose = 6
|
||||
};
|
||||
|
||||
inline void InitLogger(LogLevel level) { InitLogger(static_cast<plog::Severity>(level)); }
|
||||
|
||||
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
|
||||
template <class... Ts> overloaded(Ts...)->overloaded<Ts...>;
|
||||
|
||||
@ -57,13 +81,13 @@ public:
|
||||
~synchronized_callback() { *this = nullptr; }
|
||||
|
||||
synchronized_callback &operator=(std::function<void(P...)> func) {
|
||||
std::lock_guard<std::recursive_mutex> lock(mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
callback = func;
|
||||
return *this;
|
||||
}
|
||||
|
||||
void operator()(P... args) const {
|
||||
std::lock_guard<std::recursive_mutex> lock(mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
if (callback)
|
||||
callback(args...);
|
||||
}
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
|
||||
@ -49,7 +50,8 @@ public:
|
||||
Connected = RTC_CONNECTED,
|
||||
Disconnected = RTC_DISCONNECTED,
|
||||
Failed = RTC_FAILED,
|
||||
Closed = RTC_CLOSED
|
||||
Closed = RTC_CLOSED,
|
||||
Destroying = RTC_DESTROYING
|
||||
};
|
||||
|
||||
enum class GatheringState : int {
|
||||
@ -62,6 +64,8 @@ public:
|
||||
PeerConnection(const Configuration &config);
|
||||
~PeerConnection();
|
||||
|
||||
void close();
|
||||
|
||||
const Configuration *config() const;
|
||||
State state() const;
|
||||
GatheringState gatheringState() const;
|
||||
@ -83,16 +87,18 @@ public:
|
||||
void onGatheringStateChange(std::function<void(GatheringState state)> callback);
|
||||
|
||||
private:
|
||||
void initIceTransport(Description::Role role);
|
||||
void initDtlsTransport();
|
||||
void initSctpTransport();
|
||||
std::shared_ptr<IceTransport> initIceTransport(Description::Role role);
|
||||
std::shared_ptr<DtlsTransport> initDtlsTransport();
|
||||
std::shared_ptr<SctpTransport> initSctpTransport();
|
||||
|
||||
void endLocalCandidates();
|
||||
bool checkFingerprint(const std::string &fingerprint) const;
|
||||
void forwardMessage(message_ptr message);
|
||||
void forwardBufferedAmount(uint16_t stream, size_t amount);
|
||||
void iterateDataChannels(std::function<void(std::shared_ptr<DataChannel> channel)> func);
|
||||
void openDataChannels();
|
||||
void closeDataChannels();
|
||||
void remoteCloseDataChannels();
|
||||
|
||||
void processLocalDescription(Description description);
|
||||
void processLocalCandidate(Candidate candidate);
|
||||
@ -103,12 +109,13 @@ private:
|
||||
const Configuration mConfig;
|
||||
const std::shared_ptr<Certificate> mCertificate;
|
||||
|
||||
std::optional<Description> mLocalDescription;
|
||||
std::optional<Description> mRemoteDescription;
|
||||
std::optional<Description> mLocalDescription, mRemoteDescription;
|
||||
mutable std::recursive_mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;
|
||||
|
||||
std::shared_ptr<IceTransport> mIceTransport;
|
||||
std::shared_ptr<DtlsTransport> mDtlsTransport;
|
||||
std::shared_ptr<SctpTransport> mSctpTransport;
|
||||
std::recursive_mutex mInitMutex;
|
||||
|
||||
std::unordered_map<unsigned int, std::weak_ptr<DataChannel>> mDataChannels;
|
||||
|
||||
|
@ -67,31 +67,31 @@ Queue<T>::Queue(size_t limit, amount_function func) : mLimit(limit), mAmount(0)
|
||||
template <typename T> Queue<T>::~Queue() { stop(); }
|
||||
|
||||
template <typename T> void Queue<T>::stop() {
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
std::lock_guard lock(mMutex);
|
||||
mStopping = true;
|
||||
mPopCondition.notify_all();
|
||||
mPushCondition.notify_all();
|
||||
}
|
||||
|
||||
template <typename T> bool Queue<T>::empty() const {
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
std::lock_guard lock(mMutex);
|
||||
return mQueue.empty();
|
||||
}
|
||||
|
||||
template <typename T> size_t Queue<T>::size() const {
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
std::lock_guard lock(mMutex);
|
||||
return mQueue.size();
|
||||
}
|
||||
|
||||
template <typename T> size_t Queue<T>::amount() const {
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
std::lock_guard lock(mMutex);
|
||||
return mAmount;
|
||||
}
|
||||
|
||||
template <typename T> void Queue<T>::push(const T &element) { push(T{element}); }
|
||||
|
||||
template <typename T> void Queue<T>::push(T &&element) {
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
std::unique_lock lock(mMutex);
|
||||
mPushCondition.wait(lock, [this]() { return !mLimit || mQueue.size() < mLimit || mStopping; });
|
||||
if (!mStopping) {
|
||||
mAmount += mAmountFunction(element);
|
||||
@ -101,7 +101,7 @@ template <typename T> void Queue<T>::push(T &&element) {
|
||||
}
|
||||
|
||||
template <typename T> std::optional<T> Queue<T>::pop() {
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
std::unique_lock lock(mMutex);
|
||||
mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
|
||||
if (!mQueue.empty()) {
|
||||
mAmount -= mAmountFunction(mQueue.front());
|
||||
@ -114,7 +114,7 @@ template <typename T> std::optional<T> Queue<T>::pop() {
|
||||
}
|
||||
|
||||
template <typename T> std::optional<T> Queue<T>::peek() {
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
std::unique_lock lock(mMutex);
|
||||
if (!mQueue.empty()) {
|
||||
return std::optional<T>{mQueue.front()};
|
||||
} else {
|
||||
@ -123,12 +123,12 @@ template <typename T> std::optional<T> Queue<T>::peek() {
|
||||
}
|
||||
|
||||
template <typename T> void Queue<T>::wait() {
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
std::unique_lock lock(mMutex);
|
||||
mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
|
||||
}
|
||||
|
||||
template <typename T> void Queue<T>::wait(const std::chrono::milliseconds &duration) {
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
std::unique_lock lock(mMutex);
|
||||
mPopCondition.wait_for(lock, duration, [this]() { return !mQueue.empty() || mStopping; });
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,7 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
// libdatachannel rtc C API
|
||||
// libdatachannel C API
|
||||
|
||||
typedef enum {
|
||||
RTC_NEW = 0,
|
||||
@ -31,7 +31,8 @@ typedef enum {
|
||||
RTC_CONNECTED = 2,
|
||||
RTC_DISCONNECTED = 3,
|
||||
RTC_FAILED = 4,
|
||||
RTC_CLOSED = 5
|
||||
RTC_CLOSED = 5,
|
||||
RTC_DESTROYING = 6 // internal
|
||||
} rtc_state_t;
|
||||
|
||||
typedef enum {
|
||||
@ -40,6 +41,19 @@ typedef enum {
|
||||
RTC_GATHERING_COMPLETE = 2
|
||||
} rtc_gathering_state_t;
|
||||
|
||||
// Don't change, it must match plog severity
|
||||
typedef enum {
|
||||
RTC_LOG_NONE = 0,
|
||||
RTC_LOG_FATAL = 1,
|
||||
RTC_LOG_ERROR = 2,
|
||||
RTC_LOG_WARNING = 3,
|
||||
RTC_LOG_INFO = 4,
|
||||
RTC_LOG_DEBUG = 5,
|
||||
RTC_LOG_VERBOSE = 6
|
||||
} rtc_log_level_t;
|
||||
|
||||
void rtcInitLogger(rtc_log_level_t level);
|
||||
|
||||
int rtcCreatePeerConnection(const char **iceServers, int iceServersCount);
|
||||
void rtcDeletePeerConnection(int pc);
|
||||
int rtcCreateDataChannel(int pc, const char *label);
|
||||
@ -51,8 +65,8 @@ void rtcSetLocalCandidateCallback(int pc,
|
||||
void (*candidateCallback)(const char *, const char *, void *));
|
||||
void rtcSetStateChangeCallback(int pc, void (*stateCallback)(rtc_state_t state, void *));
|
||||
void rtcSetGatheringStateChangeCallback(int pc,
|
||||
void (*gatheringStateCallback)(rtc_gathering_state_t state,
|
||||
void *));
|
||||
void (*gatheringStateCallback)(rtc_gathering_state_t state,
|
||||
void *));
|
||||
void rtcSetRemoteDescription(int pc, const char *sdp, const char *type);
|
||||
void rtcAddRemoteCandidate(int pc, const char *candidate, const char *mid);
|
||||
int rtcGetDataChannelLabel(int dc, char *data, int size);
|
||||
@ -67,4 +81,3 @@ void rtcSetUserPointer(int i, void *ptr);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -70,6 +70,11 @@ bool Candidate::resolve(ResolveMode mode) {
|
||||
hints.ai_protocol = IPPROTO_UDP;
|
||||
}
|
||||
|
||||
if (transport == "TCP" || transport == "tcp") {
|
||||
hints.ai_socktype = SOCK_STREAM;
|
||||
hints.ai_protocol = IPPROTO_TCP;
|
||||
}
|
||||
|
||||
if (mode == ResolveMode::Simple)
|
||||
hints.ai_flags |= AI_NUMERICHOST;
|
||||
|
||||
@ -120,4 +125,3 @@ Candidate::operator string() const {
|
||||
std::ostream &operator<<(std::ostream &out, const rtc::Candidate &candidate) {
|
||||
return out << std::string(candidate);
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ shared_ptr<Certificate> make_certificate(const string &commonName) {
|
||||
static std::unordered_map<string, shared_ptr<Certificate>> cache;
|
||||
static std::mutex cacheMutex;
|
||||
|
||||
std::lock_guard<std::mutex> lock(cacheMutex);
|
||||
std::lock_guard lock(cacheMutex);
|
||||
if (auto it = cache.find(commonName); it != cache.end())
|
||||
return it->second;
|
||||
|
||||
@ -241,7 +241,7 @@ shared_ptr<Certificate> make_certificate(const string &commonName) {
|
||||
static std::unordered_map<string, shared_ptr<Certificate>> cache;
|
||||
static std::mutex cacheMutex;
|
||||
|
||||
std::lock_guard<std::mutex> lock(cacheMutex);
|
||||
std::lock_guard lock(cacheMutex);
|
||||
if (auto it = cache.find(commonName); it != cache.end())
|
||||
return it->second;
|
||||
|
||||
|
@ -22,8 +22,8 @@ namespace rtc {
|
||||
|
||||
using std::to_string;
|
||||
|
||||
IceServer::IceServer(const string &host) {
|
||||
if(size_t pos = host.rfind(':'); pos != string::npos) {
|
||||
IceServer::IceServer(const string &host) : type(Type::Stun) {
|
||||
if (size_t pos = host.rfind(':'); pos != string::npos) {
|
||||
hostname = host.substr(0, pos);
|
||||
service = host.substr(pos + 1);
|
||||
} else {
|
||||
@ -32,8 +32,15 @@ IceServer::IceServer(const string &host) {
|
||||
}
|
||||
}
|
||||
|
||||
IceServer::IceServer(const string &hostname_, uint16_t port_) : IceServer(hostname_, to_string(port_)) {}
|
||||
IceServer::IceServer(const string &hostname_, uint16_t port_)
|
||||
: IceServer(hostname_, to_string(port_)) {}
|
||||
|
||||
IceServer::IceServer(const string &hostname_, const string &service_) : hostname(hostname_), service(service_) {}
|
||||
IceServer::IceServer(const string &hostname_, const string &service_)
|
||||
: hostname(hostname_), service(service_), type(Type::Stun) {}
|
||||
|
||||
IceServer::IceServer(const string &hostname_, const string &service_, string username_,
|
||||
string password_, RelayType relayType_)
|
||||
: hostname(hostname_), service(service_), type(Type::Turn), username(username_),
|
||||
password(password_), relayType(relayType_) {}
|
||||
|
||||
} // namespace rtc
|
||||
|
@ -73,18 +73,22 @@ DataChannel::DataChannel(shared_ptr<PeerConnection> pc, shared_ptr<SctpTransport
|
||||
mReliability(std::make_shared<Reliability>()),
|
||||
mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
|
||||
|
||||
DataChannel::~DataChannel() { close(); }
|
||||
DataChannel::~DataChannel() {
|
||||
close();
|
||||
}
|
||||
|
||||
void DataChannel::close() {
|
||||
mIsOpen = false;
|
||||
if (!mIsClosed.exchange(true)) {
|
||||
if (mSctpTransport)
|
||||
mSctpTransport->reset(mStream);
|
||||
}
|
||||
|
||||
// Reset mSctpTransport first so SctpTransport is never alive without PeerConnection
|
||||
if (mIsOpen.exchange(false) && mSctpTransport)
|
||||
mSctpTransport->reset(mStream);
|
||||
mIsClosed = true;
|
||||
mSctpTransport.reset();
|
||||
}
|
||||
|
||||
void DataChannel::remoteClose() {
|
||||
mIsOpen = false;
|
||||
if (!mIsClosed.exchange(true))
|
||||
triggerClosed();
|
||||
mSctpTransport.reset();
|
||||
mPeerConnection.reset();
|
||||
}
|
||||
|
||||
bool DataChannel::send(const std::variant<binary, string> &data) {
|
||||
@ -108,12 +112,8 @@ std::optional<std::variant<binary, string>> DataChannel::receive() {
|
||||
switch (message->type) {
|
||||
case Message::Control: {
|
||||
auto raw = reinterpret_cast<const uint8_t *>(message->data());
|
||||
if (raw[0] == MESSAGE_CLOSE) {
|
||||
if (mIsOpen) {
|
||||
close();
|
||||
triggerClosed();
|
||||
}
|
||||
}
|
||||
if (raw[0] == MESSAGE_CLOSE)
|
||||
remoteClose();
|
||||
break;
|
||||
}
|
||||
case Message::String:
|
||||
|
@ -107,6 +107,8 @@ std::optional<uint16_t> Description::sctpPort() const { return mSctpPort; }
|
||||
|
||||
std::optional<size_t> Description::maxMessageSize() const { return mMaxMessageSize; }
|
||||
|
||||
bool Description::trickleEnabled() const { return mTrickle; }
|
||||
|
||||
void Description::setFingerprint(string fingerprint) {
|
||||
mFingerprint.emplace(std::move(fingerprint));
|
||||
}
|
||||
|
@ -20,10 +20,13 @@
|
||||
#include "icetransport.hpp"
|
||||
|
||||
#include <cassert>
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
#include <exception>
|
||||
#include <iostream>
|
||||
|
||||
using namespace std::chrono;
|
||||
|
||||
using std::shared_ptr;
|
||||
using std::string;
|
||||
using std::unique_ptr;
|
||||
@ -37,8 +40,11 @@ namespace {
|
||||
|
||||
static bool check_gnutls(int ret, const string &message = "GnuTLS error") {
|
||||
if (ret < 0) {
|
||||
if (!gnutls_error_is_fatal(ret))
|
||||
if (!gnutls_error_is_fatal(ret)) {
|
||||
PLOG_INFO << gnutls_strerror(ret);
|
||||
return false;
|
||||
}
|
||||
PLOG_ERROR << message << ": " << gnutls_strerror(ret);
|
||||
throw std::runtime_error(message + ": " + gnutls_strerror(ret));
|
||||
}
|
||||
return true;
|
||||
@ -54,6 +60,9 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
|
||||
: Transport(lower), mCertificate(certificate), mState(State::Disconnected),
|
||||
mVerifierCallback(std::move(verifierCallback)),
|
||||
mStateChangeCallback(std::move(stateChangeCallback)) {
|
||||
|
||||
PLOG_DEBUG << "Initializing DTLS transport (GnuTLS)";
|
||||
|
||||
gnutls_certificate_set_verify_function(mCertificate->credentials(), CertificateCallback);
|
||||
|
||||
bool active = lower->role() == Description::Role::Active;
|
||||
@ -85,7 +94,8 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
|
||||
}
|
||||
|
||||
DtlsTransport::~DtlsTransport() {
|
||||
gnutls_bye(mSession, GNUTLS_SHUT_RDWR);
|
||||
stop();
|
||||
|
||||
gnutls_deinit(mSession);
|
||||
}
|
||||
|
||||
@ -94,14 +104,20 @@ DtlsTransport::State DtlsTransport::state() const { return mState; }
|
||||
void DtlsTransport::stop() {
|
||||
Transport::stop();
|
||||
|
||||
mIncomingQueue.stop();
|
||||
mRecvThread.join();
|
||||
if (mRecvThread.joinable()) {
|
||||
PLOG_DEBUG << "Stopping DTLS recv thread";
|
||||
mIncomingQueue.stop();
|
||||
gnutls_bye(mSession, GNUTLS_SHUT_RDWR);
|
||||
mRecvThread.join();
|
||||
}
|
||||
}
|
||||
|
||||
bool DtlsTransport::send(message_ptr message) {
|
||||
if (!message || mState != State::Connected)
|
||||
return false;
|
||||
|
||||
PLOG_VERBOSE << "Send size=" << message->size();
|
||||
|
||||
ssize_t ret;
|
||||
do {
|
||||
ret = gnutls_record_send(mSession, message->data(), message->size());
|
||||
@ -113,7 +129,12 @@ bool DtlsTransport::send(message_ptr message) {
|
||||
return check_gnutls(ret);
|
||||
}
|
||||
|
||||
void DtlsTransport::incoming(message_ptr message) { mIncomingQueue.push(message); }
|
||||
void DtlsTransport::incoming(message_ptr message) {
|
||||
if (message)
|
||||
mIncomingQueue.push(message);
|
||||
else
|
||||
mIncomingQueue.stop();
|
||||
}
|
||||
|
||||
void DtlsTransport::changeState(State state) {
|
||||
if (mState.exchange(state) != state)
|
||||
@ -142,7 +163,7 @@ void DtlsTransport::runRecvLoop() {
|
||||
gnutls_dtls_set_mtu(mSession, maxMtu + 1);
|
||||
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "DTLS handshake: " << e.what() << std::endl;
|
||||
PLOG_ERROR << "DTLS handshake: " << e.what();
|
||||
changeState(State::Failed);
|
||||
return;
|
||||
}
|
||||
@ -161,12 +182,15 @@ void DtlsTransport::runRecvLoop() {
|
||||
} while (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN);
|
||||
|
||||
// Consider premature termination as remote closing
|
||||
if (ret == GNUTLS_E_PREMATURE_TERMINATION)
|
||||
if (ret == GNUTLS_E_PREMATURE_TERMINATION) {
|
||||
PLOG_DEBUG << "DTLS connection terminated";
|
||||
break;
|
||||
}
|
||||
|
||||
if (check_gnutls(ret)) {
|
||||
if (ret == 0) {
|
||||
// Closed
|
||||
PLOG_DEBUG << "DTLS connection cleanly closed";
|
||||
break;
|
||||
}
|
||||
auto *b = reinterpret_cast<byte *>(buffer);
|
||||
@ -175,9 +199,10 @@ void DtlsTransport::runRecvLoop() {
|
||||
}
|
||||
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "DTLS recv: " << e.what() << std::endl;
|
||||
PLOG_ERROR << "DTLS recv: " << e.what();
|
||||
}
|
||||
|
||||
PLOG_INFO << "DTLS disconnected";
|
||||
changeState(State::Disconnected);
|
||||
recv(nullptr);
|
||||
}
|
||||
@ -222,24 +247,22 @@ ssize_t DtlsTransport::WriteCallback(gnutls_transport_ptr_t ptr, const void *dat
|
||||
|
||||
ssize_t DtlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size_t maxlen) {
|
||||
DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
|
||||
auto next = t->mIncomingQueue.pop();
|
||||
auto message = next ? *next : nullptr;
|
||||
if (!message) {
|
||||
// Closed
|
||||
if (auto next = t->mIncomingQueue.pop()) {
|
||||
auto message = *next;
|
||||
ssize_t len = std::min(maxlen, message->size());
|
||||
std::memcpy(data, message->data(), len);
|
||||
gnutls_transport_set_errno(t->mSession, 0);
|
||||
return 0;
|
||||
return len;
|
||||
}
|
||||
|
||||
ssize_t len = std::min(maxlen, message->size());
|
||||
std::memcpy(data, message->data(), len);
|
||||
// Closed
|
||||
gnutls_transport_set_errno(t->mSession, 0);
|
||||
return len;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
|
||||
DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
|
||||
if (ms != GNUTLS_INDEFINITE_TIMEOUT)
|
||||
t->mIncomingQueue.wait(std::chrono::milliseconds(ms));
|
||||
t->mIncomingQueue.wait(milliseconds(ms));
|
||||
else
|
||||
t->mIncomingQueue.wait();
|
||||
return !t->mIncomingQueue.empty() ? 1 : 0;
|
||||
@ -268,8 +291,10 @@ string openssl_error_string(unsigned long err) {
|
||||
bool check_openssl(int success, const string &message = "OpenSSL error") {
|
||||
if (success)
|
||||
return true;
|
||||
else
|
||||
throw std::runtime_error(message + ": " + openssl_error_string(ERR_get_error()));
|
||||
|
||||
string str = openssl_error_string(ERR_get_error());
|
||||
PLOG_ERROR << message << ": " << str;
|
||||
throw std::runtime_error(message + ": " + str);
|
||||
}
|
||||
|
||||
bool check_openssl_ret(SSL *ssl, int ret, const string &message = "OpenSSL error") {
|
||||
@ -277,12 +302,16 @@ bool check_openssl_ret(SSL *ssl, int ret, const string &message = "OpenSSL error
|
||||
return true;
|
||||
|
||||
unsigned long err = SSL_get_error(ssl, ret);
|
||||
if (err == SSL_ERROR_NONE || err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE)
|
||||
if (err == SSL_ERROR_NONE || err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
|
||||
return true;
|
||||
else if (err == SSL_ERROR_ZERO_RETURN)
|
||||
}
|
||||
if (err == SSL_ERROR_ZERO_RETURN) {
|
||||
PLOG_DEBUG << "DTLS connection cleanly closed";
|
||||
return false;
|
||||
else
|
||||
throw std::runtime_error(message + ": " + openssl_error_string(err));
|
||||
}
|
||||
string str = openssl_error_string(err);
|
||||
PLOG_ERROR << str;
|
||||
throw std::runtime_error(message + ": " + str);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
@ -293,7 +322,7 @@ int DtlsTransport::TransportExIndex = -1;
|
||||
std::mutex DtlsTransport::GlobalMutex;
|
||||
|
||||
void DtlsTransport::GlobalInit() {
|
||||
std::lock_guard<std::mutex> lock(GlobalMutex);
|
||||
std::lock_guard lock(GlobalMutex);
|
||||
if (TransportExIndex < 0) {
|
||||
TransportExIndex = SSL_get_ex_new_index(0, NULL, NULL, NULL, NULL);
|
||||
}
|
||||
@ -305,6 +334,7 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
|
||||
mVerifierCallback(std::move(verifierCallback)),
|
||||
mStateChangeCallback(std::move(stateChangeCallback)) {
|
||||
|
||||
PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)";
|
||||
GlobalInit();
|
||||
|
||||
if (!(mCtx = SSL_CTX_new(DTLS_method())))
|
||||
@ -358,7 +388,8 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
|
||||
}
|
||||
|
||||
DtlsTransport::~DtlsTransport() {
|
||||
SSL_shutdown(mSsl);
|
||||
stop();
|
||||
|
||||
SSL_free(mSsl);
|
||||
SSL_CTX_free(mCtx);
|
||||
}
|
||||
@ -366,34 +397,39 @@ DtlsTransport::~DtlsTransport() {
|
||||
void DtlsTransport::stop() {
|
||||
Transport::stop();
|
||||
|
||||
mIncomingQueue.stop();
|
||||
mRecvThread.join();
|
||||
if (mRecvThread.joinable()) {
|
||||
PLOG_DEBUG << "Stopping DTLS recv thread";
|
||||
mIncomingQueue.stop();
|
||||
mRecvThread.join();
|
||||
|
||||
SSL_shutdown(mSsl);
|
||||
writePending();
|
||||
}
|
||||
}
|
||||
|
||||
DtlsTransport::State DtlsTransport::state() const { return mState; }
|
||||
|
||||
bool DtlsTransport::send(message_ptr message) {
|
||||
const size_t bufferSize = 4096;
|
||||
byte buffer[bufferSize];
|
||||
|
||||
if (!message || mState != State::Connected)
|
||||
return false;
|
||||
|
||||
PLOG_VERBOSE << "Send size=" << message->size();
|
||||
|
||||
int ret = SSL_write(mSsl, message->data(), message->size());
|
||||
if (!check_openssl_ret(mSsl, ret)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
while (BIO_ctrl_pending(mOutBio) > 0) {
|
||||
int ret = BIO_read(mOutBio, buffer, bufferSize);
|
||||
if (check_openssl_ret(mSsl, ret) && ret > 0)
|
||||
outgoing(make_message(buffer, buffer + ret));
|
||||
}
|
||||
|
||||
writePending();
|
||||
return true;
|
||||
}
|
||||
|
||||
void DtlsTransport::incoming(message_ptr message) { mIncomingQueue.push(message); }
|
||||
void DtlsTransport::incoming(message_ptr message) {
|
||||
if (message)
|
||||
mIncomingQueue.push(message);
|
||||
else
|
||||
mIncomingQueue.stop();
|
||||
}
|
||||
|
||||
void DtlsTransport::changeState(State state) {
|
||||
if (mState.exchange(state) != state)
|
||||
@ -408,11 +444,7 @@ void DtlsTransport::runRecvLoop() {
|
||||
changeState(State::Connecting);
|
||||
|
||||
SSL_do_handshake(mSsl);
|
||||
while (BIO_ctrl_pending(mOutBio) > 0) {
|
||||
int ret = BIO_read(mOutBio, buffer, bufferSize);
|
||||
if (check_openssl_ret(mSsl, ret) && ret > 0)
|
||||
outgoing(make_message(buffer, buffer + ret));
|
||||
}
|
||||
writePending();
|
||||
|
||||
while (auto next = mIncomingQueue.pop()) {
|
||||
auto message = *next;
|
||||
@ -427,12 +459,7 @@ void DtlsTransport::runRecvLoop() {
|
||||
if (unsigned long err = ERR_get_error())
|
||||
throw std::runtime_error("handshake failed: " + openssl_error_string(err));
|
||||
|
||||
while (BIO_ctrl_pending(mOutBio) > 0) {
|
||||
ret = BIO_read(mOutBio, buffer, bufferSize);
|
||||
if (check_openssl_ret(mSsl, ret) && ret > 0)
|
||||
outgoing(make_message(buffer, buffer + ret));
|
||||
}
|
||||
|
||||
writePending();
|
||||
if (SSL_is_init_finished(mSsl))
|
||||
changeState(State::Connected);
|
||||
}
|
||||
@ -441,17 +468,29 @@ void DtlsTransport::runRecvLoop() {
|
||||
recv(decrypted);
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "DTLS recv: " << e.what() << std::endl;
|
||||
PLOG_ERROR << "DTLS recv: " << e.what();
|
||||
}
|
||||
|
||||
if (mState == State::Connected) {
|
||||
PLOG_INFO << "DTLS disconnected";
|
||||
changeState(State::Disconnected);
|
||||
recv(nullptr);
|
||||
} else {
|
||||
PLOG_INFO << "DTLS handshake failed";
|
||||
changeState(State::Failed);
|
||||
}
|
||||
}
|
||||
|
||||
void DtlsTransport::writePending() {
|
||||
const size_t bufferSize = 4096;
|
||||
byte buffer[bufferSize];
|
||||
while (BIO_ctrl_pending(mOutBio) > 0) {
|
||||
int ret = BIO_read(mOutBio, buffer, bufferSize);
|
||||
if (check_openssl_ret(mSsl, ret) && ret > 0)
|
||||
outgoing(make_message(buffer, buffer + ret));
|
||||
}
|
||||
}
|
||||
|
||||
int DtlsTransport::CertificateCallback(int preverify_ok, X509_STORE_CTX *ctx) {
|
||||
SSL *ssl =
|
||||
static_cast<SSL *>(X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()));
|
||||
@ -469,8 +508,9 @@ void DtlsTransport::InfoCallback(const SSL *ssl, int where, int ret) {
|
||||
static_cast<DtlsTransport *>(SSL_get_ex_data(ssl, DtlsTransport::TransportExIndex));
|
||||
|
||||
if (where & SSL_CB_ALERT) {
|
||||
if (ret != 256) // Close Notify
|
||||
std::cerr << "DTLS alert: " << SSL_alert_desc_string_long(ret) << std::endl;
|
||||
if (ret != 256) { // Close Notify
|
||||
PLOG_ERROR << "DTLS alert: " << SSL_alert_desc_string_long(ret);
|
||||
}
|
||||
t->mIncomingQueue.stop(); // Close the connection
|
||||
}
|
||||
}
|
||||
|
@ -55,10 +55,10 @@ public:
|
||||
State state() const;
|
||||
|
||||
void stop() override;
|
||||
bool send(message_ptr message); // false if dropped
|
||||
bool send(message_ptr message) override; // false if dropped
|
||||
|
||||
private:
|
||||
void incoming(message_ptr message);
|
||||
void incoming(message_ptr message) override;
|
||||
void changeState(State state);
|
||||
void runRecvLoop();
|
||||
|
||||
@ -79,6 +79,8 @@ private:
|
||||
static ssize_t ReadCallback(gnutls_transport_ptr_t ptr, void *data, size_t maxlen);
|
||||
static int TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms);
|
||||
#else
|
||||
void writePending();
|
||||
|
||||
SSL_CTX *mCtx;
|
||||
SSL *mSsl;
|
||||
BIO *mInBio, *mOutBio;
|
||||
|
@ -23,19 +23,19 @@
|
||||
#include <sys/socket.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
#include <sstream>
|
||||
|
||||
namespace rtc {
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using std::shared_ptr;
|
||||
using std::weak_ptr;
|
||||
|
||||
IceTransport::IceTransport(const Configuration &config, Description::Role role,
|
||||
candidate_callback candidateCallback,
|
||||
state_callback stateChangeCallback,
|
||||
candidate_callback candidateCallback, state_callback stateChangeCallback,
|
||||
gathering_state_callback gatheringStateChangeCallback)
|
||||
: mRole(role), mMid("0"), mState(State::Disconnected), mGatheringState(GatheringState::New),
|
||||
mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr),
|
||||
@ -43,9 +43,13 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
|
||||
mStateChangeCallback(std::move(stateChangeCallback)),
|
||||
mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)) {
|
||||
|
||||
auto logLevelFlags = GLogLevelFlags(G_LOG_LEVEL_MASK | G_LOG_FLAG_FATAL | G_LOG_FLAG_RECURSION);
|
||||
g_log_set_handler(nullptr, logLevelFlags, LogCallback, this);
|
||||
nice_debug_enable(false);
|
||||
PLOG_DEBUG << "Initializing ICE transport";
|
||||
|
||||
g_log_set_handler("libnice", G_LOG_LEVEL_MASK, LogCallback, this);
|
||||
|
||||
IF_PLOG(plog::verbose) {
|
||||
nice_debug_enable(false); // do not output STUN debug messages
|
||||
}
|
||||
|
||||
mMainLoop = decltype(mMainLoop)(g_main_loop_new(nullptr, FALSE), g_main_loop_unref);
|
||||
if (!mMainLoop)
|
||||
@ -62,7 +66,8 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
|
||||
|
||||
g_object_set(G_OBJECT(mNiceAgent.get()), "controlling-mode", TRUE, nullptr);
|
||||
g_object_set(G_OBJECT(mNiceAgent.get()), "ice-udp", TRUE, nullptr);
|
||||
g_object_set(G_OBJECT(mNiceAgent.get()), "ice-tcp", FALSE, nullptr);
|
||||
g_object_set(G_OBJECT(mNiceAgent.get()), "ice-tcp", config.enableIceTcp ? TRUE : FALSE,
|
||||
nullptr);
|
||||
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-initial-timeout", 200, nullptr);
|
||||
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-max-retransmissions", 3, nullptr);
|
||||
g_object_set(G_OBJECT(mNiceAgent.get()), "stun-pacing-timer", 20, nullptr);
|
||||
@ -77,6 +82,8 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
|
||||
for (auto &server : servers) {
|
||||
if (server.hostname.empty())
|
||||
continue;
|
||||
if (server.type == IceServer::Type::Turn)
|
||||
continue;
|
||||
if (server.service.empty())
|
||||
server.service = "3478"; // STUN UDP port
|
||||
|
||||
@ -128,13 +135,60 @@ IceTransport::IceTransport(const Configuration &config, Description::Role role,
|
||||
|
||||
nice_agent_attach_recv(mNiceAgent.get(), mStreamId, 1, g_main_loop_get_context(mMainLoop.get()),
|
||||
RecvCallback, this);
|
||||
|
||||
// Add TURN Servers
|
||||
for (auto &server : servers) {
|
||||
if (server.hostname.empty())
|
||||
continue;
|
||||
if (server.type == IceServer::Type::Stun)
|
||||
continue;
|
||||
if (server.service.empty())
|
||||
server.service = "3478"; // TURN UDP port
|
||||
|
||||
struct addrinfo hints = {};
|
||||
hints.ai_family = AF_INET; // IPv4
|
||||
hints.ai_socktype =
|
||||
server.relayType == IceServer::RelayType::TurnUdp ? SOCK_DGRAM : SOCK_STREAM;
|
||||
hints.ai_protocol =
|
||||
server.relayType == IceServer::RelayType::TurnUdp ? IPPROTO_UDP : IPPROTO_TCP;
|
||||
hints.ai_flags = AI_ADDRCONFIG;
|
||||
struct addrinfo *result = nullptr;
|
||||
if (getaddrinfo(server.hostname.c_str(), server.service.c_str(), &hints, &result) != 0)
|
||||
continue;
|
||||
|
||||
for (auto p = result; p; p = p->ai_next) {
|
||||
if (p->ai_family == AF_INET) {
|
||||
char nodebuffer[MAX_NUMERICNODE_LEN];
|
||||
char servbuffer[MAX_NUMERICSERV_LEN];
|
||||
if (getnameinfo(p->ai_addr, p->ai_addrlen, nodebuffer, MAX_NUMERICNODE_LEN,
|
||||
|
||||
servbuffer, MAX_NUMERICNODE_LEN,
|
||||
NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
|
||||
nice_agent_set_relay_info(mNiceAgent.get(), mStreamId, 1, nodebuffer,
|
||||
std::stoul(servbuffer), server.username.c_str(),
|
||||
server.password.c_str(),
|
||||
static_cast<NiceRelayType>(server.relayType));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
freeaddrinfo(result);
|
||||
}
|
||||
}
|
||||
|
||||
IceTransport::~IceTransport() {}
|
||||
IceTransport::~IceTransport() { stop(); }
|
||||
|
||||
void IceTransport::stop() {
|
||||
g_main_loop_quit(mMainLoop.get());
|
||||
mMainLoopThread.join();
|
||||
if (mTimeoutId) {
|
||||
g_source_remove(mTimeoutId);
|
||||
mTimeoutId = 0;
|
||||
}
|
||||
if (mMainLoopThread.joinable()) {
|
||||
PLOG_DEBUG << "Stopping ICE thread";
|
||||
g_main_loop_quit(mMainLoop.get());
|
||||
mMainLoopThread.join();
|
||||
}
|
||||
}
|
||||
|
||||
Description::Role IceTransport::role() const { return mRole; }
|
||||
@ -156,6 +210,7 @@ void IceTransport::setRemoteDescription(const Description &description) {
|
||||
mRole = description.role() == Description::Role::Active ? Description::Role::Passive
|
||||
: Description::Role::Active;
|
||||
mMid = description.mid();
|
||||
mTrickleTimeout = description.trickleEnabled() ? 30s : 0s;
|
||||
|
||||
if (nice_agent_parse_remote_sdp(mNiceAgent.get(), string(description).c_str()) < 0)
|
||||
throw std::runtime_error("Failed to parse remote SDP");
|
||||
@ -163,7 +218,6 @@ void IceTransport::setRemoteDescription(const Description &description) {
|
||||
|
||||
bool IceTransport::addRemoteCandidate(const Candidate &candidate) {
|
||||
// Don't try to pass unresolved candidates to libnice for more safety
|
||||
|
||||
if (!candidate.isResolved())
|
||||
return false;
|
||||
|
||||
@ -209,11 +263,11 @@ std::optional<string> IceTransport::getRemoteAddress() const {
|
||||
}
|
||||
|
||||
bool IceTransport::send(message_ptr message) {
|
||||
if (!message || !mStreamId)
|
||||
if (!message || (mState != State::Connected && mState != State::Completed))
|
||||
return false;
|
||||
|
||||
outgoing(message);
|
||||
return true;
|
||||
PLOG_VERBOSE << "Send size=" << message->size();
|
||||
return outgoing(message);
|
||||
}
|
||||
|
||||
void IceTransport::incoming(message_ptr message) { recv(message); }
|
||||
@ -222,9 +276,9 @@ void IceTransport::incoming(const byte *data, int size) {
|
||||
incoming(make_message(data, data + size));
|
||||
}
|
||||
|
||||
void IceTransport::outgoing(message_ptr message) {
|
||||
nice_agent_send(mNiceAgent.get(), mStreamId, 1, message->size(),
|
||||
reinterpret_cast<const char *>(message->data()));
|
||||
bool IceTransport::outgoing(message_ptr message) {
|
||||
return nice_agent_send(mNiceAgent.get(), mStreamId, 1, message->size(),
|
||||
reinterpret_cast<const char *>(message->data())) >= 0;
|
||||
}
|
||||
|
||||
void IceTransport::changeState(State state) {
|
||||
@ -232,6 +286,12 @@ void IceTransport::changeState(State state) {
|
||||
mStateChangeCallback(mState);
|
||||
}
|
||||
|
||||
void IceTransport::processTimeout() {
|
||||
PLOG_WARNING << "ICE timeout";
|
||||
mTimeoutId = 0;
|
||||
changeState(State::Failed);
|
||||
}
|
||||
|
||||
void IceTransport::changeGatheringState(GatheringState state) {
|
||||
mGatheringState = state;
|
||||
mGatheringStateChangeCallback(mGatheringState);
|
||||
@ -244,8 +304,19 @@ void IceTransport::processCandidate(const string &candidate) {
|
||||
void IceTransport::processGatheringDone() { changeGatheringState(GatheringState::Complete); }
|
||||
|
||||
void IceTransport::processStateChange(uint32_t state) {
|
||||
if (state != NICE_COMPONENT_STATE_GATHERING)
|
||||
changeState(static_cast<State>(state));
|
||||
if (state == NICE_COMPONENT_STATE_FAILED && mTrickleTimeout.count() > 0) {
|
||||
if (mTimeoutId)
|
||||
g_source_remove(mTimeoutId);
|
||||
mTimeoutId = g_timeout_add(mTrickleTimeout.count() /* ms */, TimeoutCallback, this);
|
||||
return;
|
||||
}
|
||||
|
||||
if (state == NICE_COMPONENT_STATE_CONNECTED && mTimeoutId) {
|
||||
g_source_remove(mTimeoutId);
|
||||
mTimeoutId = 0;
|
||||
}
|
||||
|
||||
changeState(static_cast<State>(state));
|
||||
}
|
||||
|
||||
string IceTransport::AddressToString(const NiceAddress &addr) {
|
||||
@ -264,7 +335,7 @@ void IceTransport::CandidateCallback(NiceAgent *agent, NiceCandidate *candidate,
|
||||
try {
|
||||
iceTransport->processCandidate(cand);
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "ICE candidate: " << e.what() << std::endl;
|
||||
PLOG_WARNING << e.what();
|
||||
}
|
||||
g_free(cand);
|
||||
}
|
||||
@ -274,17 +345,17 @@ void IceTransport::GatheringDoneCallback(NiceAgent *agent, guint streamId, gpoin
|
||||
try {
|
||||
iceTransport->processGatheringDone();
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "ICE gathering done: " << e.what() << std::endl;
|
||||
PLOG_WARNING << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
void IceTransport::StateChangeCallback(NiceAgent *agent, guint streamId, guint componentId,
|
||||
guint state, gpointer userData) {
|
||||
guint state, gpointer userData) {
|
||||
auto iceTransport = static_cast<rtc::IceTransport *>(userData);
|
||||
try {
|
||||
iceTransport->processStateChange(state);
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "ICE change state: " << e.what() << std::endl;
|
||||
PLOG_WARNING << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
@ -294,13 +365,38 @@ void IceTransport::RecvCallback(NiceAgent *agent, guint streamId, guint componen
|
||||
try {
|
||||
iceTransport->incoming(reinterpret_cast<byte *>(buf), len);
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "ICE incoming: " << e.what() << std::endl;
|
||||
PLOG_WARNING << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
gboolean IceTransport::TimeoutCallback(gpointer userData) {
|
||||
auto iceTransport = static_cast<rtc::IceTransport *>(userData);
|
||||
try {
|
||||
iceTransport->processTimeout();
|
||||
} catch (const std::exception &e) {
|
||||
PLOG_WARNING << e.what();
|
||||
}
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
void IceTransport::LogCallback(const gchar *logDomain, GLogLevelFlags logLevel,
|
||||
const gchar *message, gpointer userData) {
|
||||
std::cout << message << std::endl;
|
||||
plog::Severity severity;
|
||||
unsigned int flags = logLevel & G_LOG_LEVEL_MASK;
|
||||
if (flags & G_LOG_LEVEL_ERROR)
|
||||
severity = plog::fatal;
|
||||
else if (flags & G_LOG_LEVEL_CRITICAL)
|
||||
severity = plog::error;
|
||||
else if (flags & G_LOG_LEVEL_WARNING)
|
||||
severity = plog::warning;
|
||||
else if (flags & G_LOG_LEVEL_MESSAGE)
|
||||
severity = plog::info;
|
||||
else if (flags & G_LOG_LEVEL_INFO)
|
||||
severity = plog::info;
|
||||
else
|
||||
severity = plog::verbose; // libnice debug as verbose
|
||||
|
||||
PLOG(severity) << message;
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
@ -31,6 +31,7 @@ extern "C" {
|
||||
}
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
namespace rtc {
|
||||
@ -71,9 +72,9 @@ public:
|
||||
bool send(message_ptr message) override; // false if dropped
|
||||
|
||||
private:
|
||||
void incoming(message_ptr message);
|
||||
void incoming(message_ptr message) override;
|
||||
void incoming(const byte *data, int size);
|
||||
void outgoing(message_ptr message);
|
||||
bool outgoing(message_ptr message) override;
|
||||
|
||||
void changeState(State state);
|
||||
void changeGatheringState(GatheringState state);
|
||||
@ -81,9 +82,11 @@ private:
|
||||
void processCandidate(const string &candidate);
|
||||
void processGatheringDone();
|
||||
void processStateChange(uint32_t state);
|
||||
void processTimeout();
|
||||
|
||||
Description::Role mRole;
|
||||
string mMid;
|
||||
std::chrono::milliseconds mTrickleTimeout;
|
||||
std::atomic<State> mState;
|
||||
std::atomic<GatheringState> mGatheringState;
|
||||
|
||||
@ -91,6 +94,7 @@ private:
|
||||
std::unique_ptr<NiceAgent, void (*)(gpointer)> mNiceAgent;
|
||||
std::unique_ptr<GMainLoop, void (*)(GMainLoop *)> mMainLoop;
|
||||
std::thread mMainLoopThread;
|
||||
guint mTimeoutId = 0;
|
||||
|
||||
candidate_callback mCandidateCallback;
|
||||
state_callback mStateChangeCallback;
|
||||
@ -104,6 +108,7 @@ private:
|
||||
guint state, gpointer userData);
|
||||
static void RecvCallback(NiceAgent *agent, guint stream_id, guint component_id, guint len,
|
||||
gchar *buf, gpointer userData);
|
||||
static gboolean TimeoutCallback(gpointer userData);
|
||||
static void LogCallback(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message,
|
||||
gpointer user_data);
|
||||
};
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "certificate.hpp"
|
||||
#include "dtlstransport.hpp"
|
||||
#include "icetransport.hpp"
|
||||
#include "include.hpp"
|
||||
#include "sctptransport.hpp"
|
||||
|
||||
#include <iostream>
|
||||
@ -37,44 +38,66 @@ PeerConnection::PeerConnection(const Configuration &config)
|
||||
: mConfig(config), mCertificate(make_certificate("libdatachannel")), mState(State::New) {}
|
||||
|
||||
PeerConnection::~PeerConnection() {
|
||||
if (mIceTransport)
|
||||
mIceTransport->stop();
|
||||
if (mDtlsTransport)
|
||||
mDtlsTransport->stop();
|
||||
if (mSctpTransport)
|
||||
mSctpTransport->stop();
|
||||
|
||||
changeState(State::Destroying);
|
||||
close();
|
||||
mSctpTransport.reset();
|
||||
mDtlsTransport.reset();
|
||||
mIceTransport.reset();
|
||||
}
|
||||
|
||||
void PeerConnection::close() {
|
||||
// Close DataChannels
|
||||
closeDataChannels();
|
||||
mDataChannels.clear();
|
||||
|
||||
// Close Transports
|
||||
for (int i = 0; i < 2; ++i) { // Make sure a transport wasn't spawn behind our back
|
||||
if (auto transport = std::atomic_load(&mSctpTransport))
|
||||
transport->stop();
|
||||
if (auto transport = std::atomic_load(&mDtlsTransport))
|
||||
transport->stop();
|
||||
if (auto transport = std::atomic_load(&mIceTransport))
|
||||
transport->stop();
|
||||
}
|
||||
changeState(State::Closed);
|
||||
}
|
||||
|
||||
const Configuration *PeerConnection::config() const { return &mConfig; }
|
||||
|
||||
PeerConnection::State PeerConnection::state() const { return mState; }
|
||||
|
||||
PeerConnection::GatheringState PeerConnection::gatheringState() const { return mGatheringState; }
|
||||
|
||||
std::optional<Description> PeerConnection::localDescription() const { return mLocalDescription; }
|
||||
std::optional<Description> PeerConnection::localDescription() const {
|
||||
std::lock_guard lock(mLocalDescriptionMutex);
|
||||
return mLocalDescription;
|
||||
}
|
||||
|
||||
std::optional<Description> PeerConnection::remoteDescription() const { return mRemoteDescription; }
|
||||
std::optional<Description> PeerConnection::remoteDescription() const {
|
||||
std::lock_guard lock(mRemoteDescriptionMutex);
|
||||
return mRemoteDescription;
|
||||
}
|
||||
|
||||
void PeerConnection::setRemoteDescription(Description description) {
|
||||
std::lock_guard lock(mRemoteDescriptionMutex);
|
||||
|
||||
auto remoteCandidates = description.extractCandidates();
|
||||
mRemoteDescription.emplace(std::move(description));
|
||||
|
||||
if (!mIceTransport)
|
||||
initIceTransport(Description::Role::ActPass);
|
||||
auto iceTransport = std::atomic_load(&mIceTransport);
|
||||
if (!iceTransport)
|
||||
iceTransport = initIceTransport(Description::Role::ActPass);
|
||||
|
||||
mIceTransport->setRemoteDescription(*mRemoteDescription);
|
||||
iceTransport->setRemoteDescription(*mRemoteDescription);
|
||||
|
||||
if (mRemoteDescription->type() == Description::Type::Offer) {
|
||||
// This is an offer and we are the answerer.
|
||||
processLocalDescription(mIceTransport->getLocalDescription(Description::Type::Answer));
|
||||
mIceTransport->gatherLocalCandidates();
|
||||
processLocalDescription(iceTransport->getLocalDescription(Description::Type::Answer));
|
||||
iceTransport->gatherLocalCandidates();
|
||||
} else {
|
||||
// This is an answer and we are the offerer.
|
||||
if (!mSctpTransport && mIceTransport->role() == Description::Role::Active) {
|
||||
auto sctpTransport = std::atomic_load(&mSctpTransport);
|
||||
if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
|
||||
// Since we assumed passive role during DataChannel creation, we need to shift the
|
||||
// stream numbers by one to shift them from odd to even.
|
||||
decltype(mDataChannels) newDataChannels;
|
||||
@ -92,16 +115,19 @@ void PeerConnection::setRemoteDescription(Description description) {
|
||||
}
|
||||
|
||||
void PeerConnection::addRemoteCandidate(Candidate candidate) {
|
||||
if (!mRemoteDescription || !mIceTransport)
|
||||
std::lock_guard lock(mRemoteDescriptionMutex);
|
||||
|
||||
auto iceTransport = std::atomic_load(&mIceTransport);
|
||||
if (!mRemoteDescription || !iceTransport)
|
||||
throw std::logic_error("Remote candidate set without remote description");
|
||||
|
||||
mRemoteDescription->addCandidate(candidate);
|
||||
|
||||
if (candidate.resolve(Candidate::ResolveMode::Simple)) {
|
||||
mIceTransport->addRemoteCandidate(candidate);
|
||||
iceTransport->addRemoteCandidate(candidate);
|
||||
} else {
|
||||
// OK, we might need a lookup, do it asynchronously
|
||||
weak_ptr<IceTransport> weakIceTransport{mIceTransport};
|
||||
weak_ptr<IceTransport> weakIceTransport{iceTransport};
|
||||
std::thread t([weakIceTransport, candidate]() mutable {
|
||||
if (candidate.resolve(Candidate::ResolveMode::Lookup))
|
||||
if (auto iceTransport = weakIceTransport.lock())
|
||||
@ -112,11 +138,13 @@ void PeerConnection::addRemoteCandidate(Candidate candidate) {
|
||||
}
|
||||
|
||||
std::optional<string> PeerConnection::localAddress() const {
|
||||
return mIceTransport ? mIceTransport->getLocalAddress() : nullopt;
|
||||
auto iceTransport = std::atomic_load(&mIceTransport);
|
||||
return iceTransport ? iceTransport->getLocalAddress() : nullopt;
|
||||
}
|
||||
|
||||
std::optional<string> PeerConnection::remoteAddress() const {
|
||||
return mIceTransport ? mIceTransport->getRemoteAddress() : nullopt;
|
||||
auto iceTransport = std::atomic_load(&mIceTransport);
|
||||
return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
|
||||
}
|
||||
|
||||
shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
|
||||
@ -126,7 +154,8 @@ shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
|
||||
// setup:passive. [...] Thus, setup:active is RECOMMENDED.
|
||||
// See https://tools.ietf.org/html/rfc5763#section-5
|
||||
// Therefore, we assume passive role when we are the offerer.
|
||||
auto role = mIceTransport ? mIceTransport->role() : Description::Role::Passive;
|
||||
auto iceTransport = std::atomic_load(&mIceTransport);
|
||||
auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
|
||||
|
||||
// The active side must use streams with even identifiers, whereas the passive side must use
|
||||
// streams with odd identifiers.
|
||||
@ -142,15 +171,17 @@ shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
|
||||
std::make_shared<DataChannel>(shared_from_this(), stream, label, protocol, reliability);
|
||||
mDataChannels.insert(std::make_pair(stream, channel));
|
||||
|
||||
if (!mIceTransport) {
|
||||
if (!iceTransport) {
|
||||
// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
|
||||
// setup:actpass.
|
||||
// See https://tools.ietf.org/html/rfc5763#section-5
|
||||
initIceTransport(Description::Role::ActPass);
|
||||
processLocalDescription(mIceTransport->getLocalDescription(Description::Type::Offer));
|
||||
mIceTransport->gatherLocalCandidates();
|
||||
} else if (mSctpTransport && mSctpTransport->state() == SctpTransport::State::Connected) {
|
||||
channel->open(mSctpTransport);
|
||||
iceTransport = initIceTransport(Description::Role::ActPass);
|
||||
processLocalDescription(iceTransport->getLocalDescription(Description::Type::Offer));
|
||||
iceTransport->gatherLocalCandidates();
|
||||
} else {
|
||||
if (auto transport = std::atomic_load(&mSctpTransport))
|
||||
if (transport->state() == SctpTransport::State::Connected)
|
||||
channel->open(transport);
|
||||
}
|
||||
return channel;
|
||||
}
|
||||
@ -177,8 +208,12 @@ void PeerConnection::onGatheringStateChange(std::function<void(GatheringState st
|
||||
mGatheringStateChangeCallback = callback;
|
||||
}
|
||||
|
||||
void PeerConnection::initIceTransport(Description::Role role) {
|
||||
mIceTransport = std::make_shared<IceTransport>(
|
||||
shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
|
||||
std::lock_guard lock(mInitMutex);
|
||||
if (auto transport = std::atomic_load(&mIceTransport))
|
||||
return transport;
|
||||
|
||||
auto transport = std::make_shared<IceTransport>(
|
||||
mConfig, role, std::bind(&PeerConnection::processLocalCandidate, this, _1),
|
||||
[this](IceTransport::State state) {
|
||||
switch (state) {
|
||||
@ -191,6 +226,9 @@ void PeerConnection::initIceTransport(Description::Role role) {
|
||||
case IceTransport::State::Connected:
|
||||
initDtlsTransport();
|
||||
break;
|
||||
case IceTransport::State::Disconnected:
|
||||
changeState(State::Disconnected);
|
||||
break;
|
||||
default:
|
||||
// Ignore
|
||||
break;
|
||||
@ -202,8 +240,7 @@ void PeerConnection::initIceTransport(Description::Role role) {
|
||||
changeGatheringState(GatheringState::InProgress);
|
||||
break;
|
||||
case IceTransport::GatheringState::Complete:
|
||||
if (mLocalDescription)
|
||||
mLocalDescription->endCandidates();
|
||||
endLocalCandidates();
|
||||
changeGatheringState(GatheringState::Complete);
|
||||
break;
|
||||
default:
|
||||
@ -211,11 +248,18 @@ void PeerConnection::initIceTransport(Description::Role role) {
|
||||
break;
|
||||
}
|
||||
});
|
||||
std::atomic_store(&mIceTransport, transport);
|
||||
return transport;
|
||||
}
|
||||
|
||||
void PeerConnection::initDtlsTransport() {
|
||||
mDtlsTransport = std::make_shared<DtlsTransport>(
|
||||
mIceTransport, mCertificate, std::bind(&PeerConnection::checkFingerprint, this, _1),
|
||||
shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
|
||||
std::lock_guard lock(mInitMutex);
|
||||
if (auto transport = std::atomic_load(&mDtlsTransport))
|
||||
return transport;
|
||||
|
||||
auto lower = std::atomic_load(&mIceTransport);
|
||||
auto transport = std::make_shared<DtlsTransport>(
|
||||
lower, mCertificate, std::bind(&PeerConnection::checkFingerprint, this, _1),
|
||||
[this](DtlsTransport::State state) {
|
||||
switch (state) {
|
||||
case DtlsTransport::State::Connected:
|
||||
@ -224,17 +268,27 @@ void PeerConnection::initDtlsTransport() {
|
||||
case DtlsTransport::State::Failed:
|
||||
changeState(State::Failed);
|
||||
break;
|
||||
case DtlsTransport::State::Disconnected:
|
||||
changeState(State::Disconnected);
|
||||
break;
|
||||
default:
|
||||
// Ignore
|
||||
break;
|
||||
}
|
||||
});
|
||||
std::atomic_store(&mDtlsTransport, transport);
|
||||
return transport;
|
||||
}
|
||||
|
||||
void PeerConnection::initSctpTransport() {
|
||||
uint16_t sctpPort = mRemoteDescription->sctpPort().value_or(DEFAULT_SCTP_PORT);
|
||||
mSctpTransport = std::make_shared<SctpTransport>(
|
||||
mDtlsTransport, sctpPort, std::bind(&PeerConnection::forwardMessage, this, _1),
|
||||
shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
|
||||
std::lock_guard lock(mInitMutex);
|
||||
if (auto transport = std::atomic_load(&mSctpTransport))
|
||||
return transport;
|
||||
|
||||
uint16_t sctpPort = remoteDescription()->sctpPort().value_or(DEFAULT_SCTP_PORT);
|
||||
auto lower = std::atomic_load(&mDtlsTransport);
|
||||
auto transport = std::make_shared<SctpTransport>(
|
||||
lower, sctpPort, std::bind(&PeerConnection::forwardMessage, this, _1),
|
||||
std::bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
|
||||
[this](SctpTransport::State state) {
|
||||
switch (state) {
|
||||
@ -243,9 +297,11 @@ void PeerConnection::initSctpTransport() {
|
||||
openDataChannels();
|
||||
break;
|
||||
case SctpTransport::State::Failed:
|
||||
remoteCloseDataChannels();
|
||||
changeState(State::Failed);
|
||||
break;
|
||||
case SctpTransport::State::Disconnected:
|
||||
remoteCloseDataChannels();
|
||||
changeState(State::Disconnected);
|
||||
break;
|
||||
default:
|
||||
@ -253,9 +309,18 @@ void PeerConnection::initSctpTransport() {
|
||||
break;
|
||||
}
|
||||
});
|
||||
std::atomic_store(&mSctpTransport, transport);
|
||||
return transport;
|
||||
}
|
||||
|
||||
void PeerConnection::endLocalCandidates() {
|
||||
std::lock_guard lock(mLocalDescriptionMutex);
|
||||
if (mLocalDescription)
|
||||
mLocalDescription->endCandidates();
|
||||
}
|
||||
|
||||
bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
|
||||
std::lock_guard lock(mRemoteDescriptionMutex);
|
||||
if (auto expectedFingerprint =
|
||||
mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt) {
|
||||
return *expectedFingerprint == fingerprint;
|
||||
@ -264,11 +329,8 @@ bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
|
||||
}
|
||||
|
||||
void PeerConnection::forwardMessage(message_ptr message) {
|
||||
if (!mIceTransport || !mSctpTransport)
|
||||
throw std::logic_error("Got a DataChannel message without transport");
|
||||
|
||||
if (!message) {
|
||||
closeDataChannels();
|
||||
remoteCloseDataChannels();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -281,19 +343,24 @@ void PeerConnection::forwardMessage(message_ptr message) {
|
||||
}
|
||||
}
|
||||
|
||||
auto iceTransport = std::atomic_load(&mIceTransport);
|
||||
auto sctpTransport = std::atomic_load(&mSctpTransport);
|
||||
if (!iceTransport || !sctpTransport)
|
||||
return;
|
||||
|
||||
if (!channel) {
|
||||
const byte dataChannelOpenMessage{0x03};
|
||||
unsigned int remoteParity = (mIceTransport->role() == Description::Role::Active) ? 1 : 0;
|
||||
unsigned int remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
|
||||
if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
|
||||
message->stream % 2 == remoteParity) {
|
||||
channel =
|
||||
std::make_shared<DataChannel>(shared_from_this(), mSctpTransport, message->stream);
|
||||
std::make_shared<DataChannel>(shared_from_this(), sctpTransport, message->stream);
|
||||
channel->onOpen(std::bind(&PeerConnection::triggerDataChannel, this,
|
||||
weak_ptr<DataChannel>{channel}));
|
||||
mDataChannels.insert(std::make_pair(message->stream, channel));
|
||||
} else {
|
||||
// Invalid, close the DataChannel by resetting the stream
|
||||
mSctpTransport->reset(message->stream);
|
||||
sctpTransport->reset(message->stream);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -330,16 +397,24 @@ void PeerConnection::iterateDataChannels(
|
||||
}
|
||||
|
||||
void PeerConnection::openDataChannels() {
|
||||
iterateDataChannels([this](shared_ptr<DataChannel> channel) { channel->open(mSctpTransport); });
|
||||
if (auto transport = std::atomic_load(&mSctpTransport))
|
||||
iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
|
||||
}
|
||||
|
||||
void PeerConnection::closeDataChannels() {
|
||||
iterateDataChannels([](shared_ptr<DataChannel> channel) { channel->close(); });
|
||||
iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
|
||||
}
|
||||
|
||||
void PeerConnection::remoteCloseDataChannels() {
|
||||
iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
|
||||
}
|
||||
|
||||
void PeerConnection::processLocalDescription(Description description) {
|
||||
auto remoteSctpPort = mRemoteDescription ? mRemoteDescription->sctpPort() : nullopt;
|
||||
std::optional<uint16_t> remoteSctpPort;
|
||||
if (auto remote = remoteDescription())
|
||||
remoteSctpPort = remote->sctpPort();
|
||||
|
||||
std::lock_guard lock(mLocalDescriptionMutex);
|
||||
mLocalDescription.emplace(std::move(description));
|
||||
mLocalDescription->setFingerprint(mCertificate->fingerprint());
|
||||
mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
|
||||
@ -349,6 +424,7 @@ void PeerConnection::processLocalDescription(Description description) {
|
||||
}
|
||||
|
||||
void PeerConnection::processLocalCandidate(Candidate candidate) {
|
||||
std::lock_guard lock(mLocalDescriptionMutex);
|
||||
if (!mLocalDescription)
|
||||
throw std::logic_error("Got a local candidate without local description");
|
||||
|
||||
@ -366,7 +442,14 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
|
||||
}
|
||||
|
||||
void PeerConnection::changeState(State state) {
|
||||
if (mState.exchange(state) != state)
|
||||
State current;
|
||||
do {
|
||||
current = mState.load();
|
||||
if (current == state || current == State::Destroying)
|
||||
return;
|
||||
} while (!mState.compare_exchange_weak(current, state));
|
||||
|
||||
if (state != State::Destroying)
|
||||
mStateChangeCallback(state);
|
||||
}
|
||||
|
||||
@ -396,6 +479,12 @@ std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &st
|
||||
case State::Failed:
|
||||
str = "failed";
|
||||
break;
|
||||
case State::Closed:
|
||||
str = "closed";
|
||||
break;
|
||||
case State::Destroying:
|
||||
str = "destroying";
|
||||
break;
|
||||
default:
|
||||
str = "unknown";
|
||||
break;
|
||||
|
10
src/rtc.cpp
10
src/rtc.cpp
@ -17,12 +17,15 @@
|
||||
*/
|
||||
|
||||
#include "datachannel.hpp"
|
||||
#include "include.hpp"
|
||||
#include "peerconnection.hpp"
|
||||
|
||||
#include <rtc.h>
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
#include <plog/Appenders/ColorConsoleAppender.h>
|
||||
|
||||
using namespace rtc;
|
||||
using std::shared_ptr;
|
||||
using std::string;
|
||||
@ -41,6 +44,8 @@ void *getUserPointer(int id) {
|
||||
|
||||
} // namespace
|
||||
|
||||
void rtcInitLogger(rtc_log_level_t level) { InitLogger(static_cast<LogLevel>(level)); }
|
||||
|
||||
int rtcCreatePeerConnection(const char **iceServers, int iceServersCount) {
|
||||
Configuration config;
|
||||
for (int i = 0; i < iceServersCount; ++i) {
|
||||
@ -112,8 +117,8 @@ void rtcSetStateChangeCallback(int pc, void (*stateCallback)(rtc_state_t state,
|
||||
}
|
||||
|
||||
void rtcSetGatheringStateChangeCallback(int pc,
|
||||
void (*gatheringStateCallback)(rtc_gathering_state_t state,
|
||||
void *)) {
|
||||
void (*gatheringStateCallback)(rtc_gathering_state_t state,
|
||||
void *)) {
|
||||
auto it = peerConnectionMap.find(pc);
|
||||
if (it == peerConnectionMap.end())
|
||||
return;
|
||||
@ -209,4 +214,3 @@ void rtcSetUserPointer(int i, void *ptr) {
|
||||
else
|
||||
userPointerMap.erase(i);
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,9 @@
|
||||
|
||||
#include <arpa/inet.h>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
using namespace std::chrono;
|
||||
|
||||
using std::shared_ptr;
|
||||
|
||||
namespace rtc {
|
||||
@ -33,15 +36,23 @@ std::mutex SctpTransport::GlobalMutex;
|
||||
int SctpTransport::InstancesCount = 0;
|
||||
|
||||
void SctpTransport::GlobalInit() {
|
||||
std::unique_lock<std::mutex> lock(GlobalMutex);
|
||||
std::lock_guard lock(GlobalMutex);
|
||||
if (InstancesCount++ == 0) {
|
||||
usrsctp_init(0, &SctpTransport::WriteCallback, nullptr);
|
||||
usrsctp_sysctl_set_sctp_ecn_enable(0);
|
||||
usrsctp_sysctl_set_sctp_init_rtx_max_default(5);
|
||||
usrsctp_sysctl_set_sctp_path_rtx_max_default(5);
|
||||
usrsctp_sysctl_set_sctp_assoc_rtx_max_default(5); // single path
|
||||
usrsctp_sysctl_set_sctp_rto_min_default(1 * 1000); // ms
|
||||
usrsctp_sysctl_set_sctp_rto_max_default(10 * 1000); // ms
|
||||
usrsctp_sysctl_set_sctp_rto_initial_default(1 * 1000); // ms
|
||||
usrsctp_sysctl_set_sctp_init_rto_max_default(10 * 1000); // ms
|
||||
usrsctp_sysctl_set_sctp_heartbeat_interval_default(10 * 1000); // ms
|
||||
}
|
||||
}
|
||||
|
||||
void SctpTransport::GlobalCleanup() {
|
||||
std::unique_lock<std::mutex> lock(GlobalMutex);
|
||||
std::lock_guard lock(GlobalMutex);
|
||||
if (--InstancesCount == 0) {
|
||||
usrsctp_finish();
|
||||
}
|
||||
@ -55,6 +66,7 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
||||
mStateChangeCallback(std::move(stateChangeCallback)), mState(State::Disconnected) {
|
||||
onRecv(recvCallback);
|
||||
|
||||
PLOG_DEBUG << "Initializing SCTP transport";
|
||||
GlobalInit();
|
||||
|
||||
usrsctp_register_address(this);
|
||||
@ -143,12 +155,11 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
||||
}
|
||||
|
||||
SctpTransport::~SctpTransport() {
|
||||
if (mSock) {
|
||||
usrsctp_shutdown(mSock, SHUT_RDWR);
|
||||
usrsctp_close(mSock);
|
||||
}
|
||||
stop();
|
||||
|
||||
usrsctp_close(mSock);
|
||||
usrsctp_deregister_address(this);
|
||||
|
||||
GlobalCleanup();
|
||||
}
|
||||
|
||||
@ -156,18 +167,17 @@ SctpTransport::State SctpTransport::state() const { return mState; }
|
||||
|
||||
void SctpTransport::stop() {
|
||||
Transport::stop();
|
||||
onRecv(nullptr);
|
||||
|
||||
mSendQueue.stop();
|
||||
|
||||
// Unblock incoming
|
||||
if (!mConnectDataSent) {
|
||||
std::unique_lock<std::mutex> lock(mConnectMutex);
|
||||
mConnectDataSent = true;
|
||||
mConnectCondition.notify_all();
|
||||
if (!mShutdown.exchange(true)) {
|
||||
mSendQueue.stop();
|
||||
flush();
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
void SctpTransport::connect() {
|
||||
PLOG_DEBUG << "SCTP connect";
|
||||
changeState(State::Connecting);
|
||||
|
||||
struct sockaddr_conn sconn = {};
|
||||
@ -189,12 +199,25 @@ void SctpTransport::connect() {
|
||||
throw std::runtime_error("Connection attempt failed, errno=" + std::to_string(errno));
|
||||
}
|
||||
|
||||
bool SctpTransport::send(message_ptr message) {
|
||||
std::lock_guard<std::mutex> lock(mSendMutex);
|
||||
void SctpTransport::shutdown() {
|
||||
PLOG_DEBUG << "SCTP shutdown";
|
||||
|
||||
if (usrsctp_shutdown(mSock, SHUT_RDWR)) {
|
||||
PLOG_WARNING << "SCTP shutdown failed, errno=" << errno;
|
||||
}
|
||||
|
||||
PLOG_INFO << "SCTP disconnected";
|
||||
changeState(State::Disconnected);
|
||||
mWrittenCondition.notify_all();
|
||||
}
|
||||
|
||||
bool SctpTransport::send(message_ptr message) {
|
||||
std::lock_guard lock(mSendMutex);
|
||||
if (!message)
|
||||
return mSendQueue.empty();
|
||||
|
||||
PLOG_VERBOSE << "Send size=" << message->size();
|
||||
|
||||
// If nothing is pending, try to send directly
|
||||
if (mSendQueue.empty() && trySendMessage(message))
|
||||
return true;
|
||||
@ -204,7 +227,16 @@ bool SctpTransport::send(message_ptr message) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void SctpTransport::flush() {
|
||||
std::lock_guard lock(mSendMutex);
|
||||
trySendQueue();
|
||||
}
|
||||
|
||||
void SctpTransport::reset(unsigned int stream) {
|
||||
PLOG_DEBUG << "SCTP resetting stream " << stream;
|
||||
|
||||
std::unique_lock lock(mWriteMutex);
|
||||
mWritten = false;
|
||||
using srs_t = struct sctp_reset_streams;
|
||||
const size_t len = sizeof(srs_t) + sizeof(uint16_t);
|
||||
byte buffer[len] = {};
|
||||
@ -212,25 +244,30 @@ void SctpTransport::reset(unsigned int stream) {
|
||||
srs.srs_flags = SCTP_STREAM_RESET_OUTGOING;
|
||||
srs.srs_number_streams = 1;
|
||||
srs.srs_stream_list[0] = uint16_t(stream);
|
||||
usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len);
|
||||
if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RESET_STREAMS, &srs, len) == 0) {
|
||||
mWrittenCondition.wait_for(lock, 1000ms,
|
||||
[&]() { return mWritten || mState != State::Connected; });
|
||||
} else {
|
||||
PLOG_WARNING << "SCTP reset stream " << stream << " failed, errno=" << errno;
|
||||
}
|
||||
}
|
||||
|
||||
void SctpTransport::incoming(message_ptr message) {
|
||||
if (!message) {
|
||||
changeState(State::Disconnected);
|
||||
recv(nullptr);
|
||||
return;
|
||||
}
|
||||
|
||||
// There could be a race condition here where we receive the remote INIT before the local one is
|
||||
// sent, which would result in the connection being aborted. Therefore, we need to wait for data
|
||||
// to be sent on our side (i.e. the local INIT) before proceeding.
|
||||
if (!mConnectDataSent) {
|
||||
std::unique_lock<std::mutex> lock(mConnectMutex);
|
||||
mConnectCondition.wait(lock, [this]() -> bool { return mConnectDataSent; });
|
||||
{
|
||||
std::unique_lock lock(mWriteMutex);
|
||||
mWrittenCondition.wait(lock, [&]() { return mWrittenOnce || mState != State::Connected; });
|
||||
}
|
||||
|
||||
usrsctp_conninput(this, message->data(), message->size(), 0);
|
||||
if (message) {
|
||||
usrsctp_conninput(this, message->data(), message->size(), 0);
|
||||
} else {
|
||||
PLOG_INFO << "SCTP disconnected";
|
||||
changeState(State::Disconnected);
|
||||
recv(nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
void SctpTransport::changeState(State state) {
|
||||
@ -252,7 +289,11 @@ bool SctpTransport::trySendQueue() {
|
||||
|
||||
bool SctpTransport::trySendMessage(message_ptr message) {
|
||||
// Requires mSendMutex to be locked
|
||||
//
|
||||
if (mState != State::Connected)
|
||||
return false;
|
||||
|
||||
PLOG_VERBOSE << "SCTP try send size=" << message->size();
|
||||
|
||||
// TODO: Implement SCTP ndata specification draft when supported everywhere
|
||||
// See https://tools.ietf.org/html/draft-ietf-tsvwg-sctp-ndata-08
|
||||
|
||||
@ -284,7 +325,6 @@ bool SctpTransport::trySendMessage(message_ptr message) {
|
||||
if (reliability.unordered)
|
||||
spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
|
||||
|
||||
using std::chrono::milliseconds;
|
||||
switch (reliability.type) {
|
||||
case Reliability::TYPE_PARTIAL_RELIABLE_REXMIT:
|
||||
spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
|
||||
@ -310,12 +350,16 @@ bool SctpTransport::trySendMessage(message_ptr message) {
|
||||
ret = usrsctp_sendv(mSock, &zero, 1, nullptr, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0);
|
||||
}
|
||||
|
||||
if (ret >= 0)
|
||||
if (ret >= 0) {
|
||||
PLOG_VERBOSE << "SCTP sent size=" << message->size();
|
||||
return true;
|
||||
else if (errno == EWOULDBLOCK && errno == EAGAIN)
|
||||
} else if (errno == EWOULDBLOCK && errno == EAGAIN) {
|
||||
PLOG_VERBOSE << "SCTP sending not possible";
|
||||
return false;
|
||||
else
|
||||
} else {
|
||||
PLOG_ERROR << "SCTP sending failed, errno=" << errno;
|
||||
throw std::runtime_error("Sending failed, errno=" + std::to_string(errno));
|
||||
}
|
||||
}
|
||||
|
||||
void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
|
||||
@ -331,10 +375,8 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
|
||||
int SctpTransport::handleRecv(struct socket *sock, union sctp_sockstore addr, const byte *data,
|
||||
size_t len, struct sctp_rcvinfo info, int flags) {
|
||||
try {
|
||||
if (!data) {
|
||||
recv(nullptr);
|
||||
return 0;
|
||||
}
|
||||
if (!len)
|
||||
return -1;
|
||||
if (flags & MSG_EOR) {
|
||||
if (!mPartialRecv.empty()) {
|
||||
mPartialRecv.insert(mPartialRecv.end(), data, data + len);
|
||||
@ -353,7 +395,7 @@ int SctpTransport::handleRecv(struct socket *sock, union sctp_sockstore addr, co
|
||||
mPartialRecv.insert(mPartialRecv.end(), data, data + len);
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "SCTP recv: " << e.what() << std::endl;
|
||||
PLOG_ERROR << "SCTP recv: " << e.what();
|
||||
return -1;
|
||||
}
|
||||
return 0; // success
|
||||
@ -361,10 +403,10 @@ int SctpTransport::handleRecv(struct socket *sock, union sctp_sockstore addr, co
|
||||
|
||||
int SctpTransport::handleSend(size_t free) {
|
||||
try {
|
||||
std::lock_guard<std::mutex> lock(mSendMutex);
|
||||
std::lock_guard lock(mSendMutex);
|
||||
trySendQueue();
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "SCTP send: " << e.what() << std::endl;
|
||||
PLOG_ERROR << "SCTP send: " << e.what();
|
||||
return -1;
|
||||
}
|
||||
return 0; // success
|
||||
@ -372,15 +414,14 @@ int SctpTransport::handleSend(size_t free) {
|
||||
|
||||
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df) {
|
||||
try {
|
||||
outgoing(make_message(data, data + len));
|
||||
|
||||
if (!mConnectDataSent) {
|
||||
std::unique_lock<std::mutex> lock(mConnectMutex);
|
||||
mConnectDataSent = true;
|
||||
mConnectCondition.notify_all();
|
||||
}
|
||||
std::unique_lock lock(mWriteMutex);
|
||||
if (!outgoing(make_message(data, data + len)))
|
||||
return -1;
|
||||
mWritten = true;
|
||||
mWrittenOnce = true;
|
||||
mWrittenCondition.notify_all();
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "SCTP write: " << e.what() << std::endl;
|
||||
PLOG_ERROR << "SCTP write: " << e.what();
|
||||
return -1;
|
||||
}
|
||||
return 0; // success
|
||||
@ -441,7 +482,7 @@ void SctpTransport::processData(const byte *data, size_t len, uint16_t sid, Payl
|
||||
|
||||
default:
|
||||
// Unknown
|
||||
std::cerr << "Unknown PPID: " << uint32_t(ppid) << std::endl;
|
||||
PLOG_WARNING << "Unknown PPID: " << uint32_t(ppid);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -453,36 +494,38 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
|
||||
switch (notify->sn_header.sn_type) {
|
||||
case SCTP_ASSOC_CHANGE: {
|
||||
const struct sctp_assoc_change &assoc_change = notify->sn_assoc_change;
|
||||
std::unique_lock<std::mutex> lock(mConnectMutex);
|
||||
if (assoc_change.sac_state == SCTP_COMM_UP) {
|
||||
PLOG_INFO << "SCTP connected";
|
||||
changeState(State::Connected);
|
||||
} else {
|
||||
if (mState == State::Connecting) {
|
||||
std::cerr << "SCTP connection failed" << std::endl;
|
||||
PLOG_ERROR << "SCTP connection failed";
|
||||
changeState(State::Failed);
|
||||
} else {
|
||||
PLOG_INFO << "SCTP disconnected";
|
||||
changeState(State::Disconnected);
|
||||
}
|
||||
mWrittenCondition.notify_all();
|
||||
}
|
||||
}
|
||||
case SCTP_SENDER_DRY_EVENT: {
|
||||
// It not should be necessary since the send callback should have been called already,
|
||||
// but to be sure, let's try to send now.
|
||||
std::lock_guard<std::mutex> lock(mSendMutex);
|
||||
std::lock_guard lock(mSendMutex);
|
||||
trySendQueue();
|
||||
}
|
||||
case SCTP_STREAM_RESET_EVENT: {
|
||||
const struct sctp_stream_reset_event &reset_event = notify->sn_strreset_event;
|
||||
const int count = (reset_event.strreset_length - sizeof(reset_event)) / sizeof(uint16_t);
|
||||
const uint16_t flags = reset_event.strreset_flags;
|
||||
|
||||
if (reset_event.strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
|
||||
if (flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
|
||||
for (int i = 0; i < count; ++i) {
|
||||
uint16_t streamId = reset_event.strreset_stream_list[i];
|
||||
reset(streamId);
|
||||
}
|
||||
}
|
||||
|
||||
if (reset_event.strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
|
||||
if (flags & SCTP_STREAM_RESET_INCOMING_SSN) {
|
||||
const byte dataChannelCloseMessage{0x04};
|
||||
for (int i = 0; i < count; ++i) {
|
||||
uint16_t streamId = reset_event.strreset_stream_list[i];
|
||||
|
@ -28,7 +28,6 @@
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
#include <sys/socket.h>
|
||||
#include <sys/types.h>
|
||||
@ -52,6 +51,7 @@ public:
|
||||
|
||||
void stop() override;
|
||||
bool send(message_ptr message) override; // false if buffered
|
||||
void flush();
|
||||
void reset(unsigned int stream);
|
||||
|
||||
private:
|
||||
@ -68,7 +68,8 @@ private:
|
||||
};
|
||||
|
||||
void connect();
|
||||
void incoming(message_ptr message);
|
||||
void shutdown();
|
||||
void incoming(message_ptr message) override;
|
||||
void changeState(State state);
|
||||
|
||||
bool trySendQueue();
|
||||
@ -91,10 +92,12 @@ private:
|
||||
std::map<uint16_t, size_t> mBufferedAmount;
|
||||
amount_callback mBufferedAmountCallback;
|
||||
|
||||
std::mutex mConnectMutex;
|
||||
std::condition_variable mConnectCondition;
|
||||
std::atomic<bool> mConnectDataSent = false;
|
||||
std::atomic<bool> mStopping = false;
|
||||
std::recursive_mutex mWriteMutex;
|
||||
std::condition_variable_any mWrittenCondition;
|
||||
bool mWritten = false;
|
||||
bool mWrittenOnce = false;
|
||||
|
||||
std::atomic<bool> mShutdown = false;
|
||||
|
||||
state_callback mStateChangeCallback;
|
||||
std::atomic<State> mState;
|
||||
|
@ -33,12 +33,16 @@ using namespace std::placeholders;
|
||||
class Transport {
|
||||
public:
|
||||
Transport(std::shared_ptr<Transport> lower = nullptr) : mLower(std::move(lower)) {
|
||||
if (auto lower = std::atomic_load(&mLower))
|
||||
lower->onRecv(std::bind(&Transport::incoming, this, _1));
|
||||
if (mLower)
|
||||
mLower->onRecv(std::bind(&Transport::incoming, this, _1));
|
||||
}
|
||||
virtual ~Transport() { stop(); }
|
||||
|
||||
virtual void stop() {
|
||||
if (mLower)
|
||||
mLower->onRecv(nullptr);
|
||||
}
|
||||
virtual ~Transport() {}
|
||||
|
||||
virtual void stop() { resetLower(); }
|
||||
virtual bool send(message_ptr message) = 0;
|
||||
|
||||
void onRecv(message_callback callback) { mRecvCallback = std::move(callback); }
|
||||
@ -46,15 +50,12 @@ public:
|
||||
protected:
|
||||
void recv(message_ptr message) { mRecvCallback(message); }
|
||||
|
||||
void resetLower() {
|
||||
if (auto lower = std::atomic_exchange(&mLower, std::shared_ptr<Transport>(nullptr)))
|
||||
lower->onRecv(nullptr);
|
||||
}
|
||||
|
||||
virtual void incoming(message_ptr message) = 0;
|
||||
virtual void outgoing(message_ptr message) {
|
||||
if (auto lower = std::atomic_load(&mLower))
|
||||
lower->send(message);
|
||||
virtual bool outgoing(message_ptr message) {
|
||||
if (mLower)
|
||||
return mLower->send(message);
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -26,26 +26,35 @@
|
||||
using namespace rtc;
|
||||
using namespace std;
|
||||
|
||||
template <class T>
|
||||
weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
|
||||
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
rtc::Configuration config;
|
||||
// InitLogger(LogLevel::Debug);
|
||||
Configuration config;
|
||||
|
||||
// config.iceServers.emplace_back("stun.l.google.com:19302");
|
||||
// config.enableIceTcp = true;
|
||||
|
||||
// Add TURN Server Example
|
||||
// IceServer turnServer("TURN_SERVER_URL", "PORT_NO", "USERNAME", "PASSWORD",
|
||||
// IceServer::RelayType::TurnTls);
|
||||
// config.iceServers.push_back(turnServer);
|
||||
|
||||
auto pc1 = std::make_shared<PeerConnection>(config);
|
||||
auto pc2 = std::make_shared<PeerConnection>(config);
|
||||
|
||||
pc1->onLocalDescription([wpc2 = make_weak_ptr(pc2)](const Description &sdp) {
|
||||
auto pc2 = wpc2.lock();
|
||||
if (!pc2) return;
|
||||
auto pc2 = wpc2.lock();
|
||||
if (!pc2)
|
||||
return;
|
||||
cout << "Description 1: " << sdp << endl;
|
||||
pc2->setRemoteDescription(sdp);
|
||||
});
|
||||
|
||||
pc1->onLocalCandidate([wpc2 = make_weak_ptr(pc2)](const Candidate &candidate) {
|
||||
auto pc2 = wpc2.lock();
|
||||
if (!pc2) return;
|
||||
auto pc2 = wpc2.lock();
|
||||
if (!pc2)
|
||||
return;
|
||||
cout << "Candidate 1: " << candidate << endl;
|
||||
pc2->addRemoteCandidate(candidate);
|
||||
});
|
||||
@ -56,15 +65,17 @@ int main(int argc, char **argv) {
|
||||
});
|
||||
|
||||
pc2->onLocalDescription([wpc1 = make_weak_ptr(pc1)](const Description &sdp) {
|
||||
auto pc1 = wpc1.lock();
|
||||
if (!pc1) return;
|
||||
auto pc1 = wpc1.lock();
|
||||
if (!pc1)
|
||||
return;
|
||||
cout << "Description 2: " << sdp << endl;
|
||||
pc1->setRemoteDescription(sdp);
|
||||
});
|
||||
|
||||
pc2->onLocalCandidate([wpc1 = make_weak_ptr(pc1)](const Candidate &candidate) {
|
||||
auto pc1 = wpc1.lock();
|
||||
if (!pc1) return;
|
||||
auto pc1 = wpc1.lock();
|
||||
if (!pc1)
|
||||
return;
|
||||
cout << "Candidate 2: " << candidate << endl;
|
||||
pc1->addRemoteCandidate(candidate);
|
||||
});
|
||||
@ -88,8 +99,9 @@ int main(int argc, char **argv) {
|
||||
|
||||
auto dc1 = pc1->createDataChannel("test");
|
||||
dc1->onOpen([wdc1 = make_weak_ptr(dc1)]() {
|
||||
auto dc1 = wdc1.lock();
|
||||
if (!dc1) return;
|
||||
auto dc1 = wdc1.lock();
|
||||
if (!dc1)
|
||||
return;
|
||||
cout << "DataChannel open: " << dc1->label() << endl;
|
||||
dc1->send("Hello from 1");
|
||||
});
|
||||
@ -102,8 +114,8 @@ int main(int argc, char **argv) {
|
||||
this_thread::sleep_for(3s);
|
||||
|
||||
if (dc1->isOpen() && dc2->isOpen()) {
|
||||
dc1->close();
|
||||
dc2->close();
|
||||
pc1->close();
|
||||
pc2->close();
|
||||
|
||||
cout << "Success" << endl;
|
||||
return 0;
|
||||
@ -112,4 +124,3 @@ int main(int argc, char **argv) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
9
test/p2p/README.md
Normal file
9
test/p2p/README.md
Normal file
@ -0,0 +1,9 @@
|
||||
* Execute ```offerer``` app in console
|
||||
* Execute ```answerer``` app in another console
|
||||
* Copy "Local Description" from ```offerer```
|
||||
* Enter 1 to ```answerer```
|
||||
* Paste copied description, press enter
|
||||
* Redo same procedure for ```answerer```
|
||||
* Redo same procedure for candidates
|
||||
* Wait for "DataChannel open" message
|
||||
* Send message from one peer to another
|
143
test/p2p/answerer.cpp
Normal file
143
test/p2p/answerer.cpp
Normal file
@ -0,0 +1,143 @@
|
||||
/**
|
||||
* Copyright (c) 2019 Paul-Louis Ageneau, Murat Dogan
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2.1 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
#include "rtc/rtc.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
|
||||
using namespace rtc;
|
||||
using namespace std;
|
||||
|
||||
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
// InitLogger(LogLevel::Debug);
|
||||
Configuration config;
|
||||
// config.iceServers.emplace_back("stun.l.google.com:19302");
|
||||
// config.enableIceTcp = true;
|
||||
|
||||
// Add TURN Server Example
|
||||
// IceServer turnServer("TURN_SERVER_URL", "PORT_NO", "USERNAME", "PASSWORD",
|
||||
// IceServer::RelayType::TurnTls);
|
||||
// config.iceServers.push_back(turnServer);
|
||||
|
||||
auto pc = std::make_shared<PeerConnection>(config);
|
||||
|
||||
pc->onLocalDescription([](const Description &sdp) {
|
||||
std::string s(sdp);
|
||||
std::replace(s.begin(), s.end(), '\n', static_cast<char>(94));
|
||||
cout << "Local Description (Paste this to other peer):" << endl << s << endl << endl;
|
||||
});
|
||||
|
||||
pc->onLocalCandidate([](const Candidate &candidate) {
|
||||
cout << "Local Candidate (Paste this to other peer):" << endl << candidate << endl << endl;
|
||||
});
|
||||
|
||||
pc->onStateChange(
|
||||
[](PeerConnection::State state) { cout << "[ State: " << state << " ]" << endl; });
|
||||
pc->onGatheringStateChange([](PeerConnection::GatheringState state) {
|
||||
cout << "[ Gathering State: " << state << " ]" << endl;
|
||||
});
|
||||
|
||||
shared_ptr<DataChannel> dc = nullptr;
|
||||
pc->onDataChannel([&](shared_ptr<DataChannel> _dc) {
|
||||
cout << "[ Got a DataChannel with label: " << _dc->label() << " ]" << endl;
|
||||
dc = _dc;
|
||||
|
||||
dc->onClosed([&]() { cout << "[ DataChannel closed: " << dc->label() << " ]" << endl; });
|
||||
|
||||
dc->onMessage([](const variant<binary, string> &message) {
|
||||
if (holds_alternative<string>(message)) {
|
||||
cout << "[ Received: " << get<string>(message) << " ]" << endl;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
bool exit = false;
|
||||
while (!exit) {
|
||||
cout << endl
|
||||
<< endl
|
||||
<< "*************************************************************************" << endl
|
||||
<< "* 0: Exit /"
|
||||
<< " 1: Enter Description /"
|
||||
<< " 2: Enter Candidate /"
|
||||
<< " 3: Send Message *" << endl
|
||||
<< " [Command]: ";
|
||||
|
||||
int command;
|
||||
std::string sdp, candidate, message;
|
||||
const char *a;
|
||||
std::unique_ptr<Candidate> candidatePtr;
|
||||
std::unique_ptr<Description> descPtr;
|
||||
cin >> command;
|
||||
|
||||
switch (command) {
|
||||
case 0:
|
||||
exit = true;
|
||||
break;
|
||||
|
||||
case 1:
|
||||
// Parse Description
|
||||
cout << "[SDP]: ";
|
||||
sdp = "";
|
||||
while (sdp.length() == 0)
|
||||
getline(cin, sdp);
|
||||
|
||||
std::replace(sdp.begin(), sdp.end(), static_cast<char>(94), '\n');
|
||||
descPtr = std::make_unique<Description>(sdp, Description::Type::Offer,
|
||||
Description::Role::Passive);
|
||||
pc->setRemoteDescription(*descPtr);
|
||||
break;
|
||||
|
||||
case 2:
|
||||
// Parse Candidate
|
||||
cout << "[Candidate]: ";
|
||||
candidate = "";
|
||||
while (candidate.length() == 0)
|
||||
getline(cin, candidate);
|
||||
|
||||
candidatePtr = std::make_unique<Candidate>(candidate);
|
||||
pc->addRemoteCandidate(*candidatePtr);
|
||||
break;
|
||||
|
||||
case 3:
|
||||
// Send Message
|
||||
if (!dc || !dc->isOpen()) {
|
||||
cout << "** Channel is not Open ** ";
|
||||
break;
|
||||
}
|
||||
cout << "[Message]: ";
|
||||
message = "";
|
||||
while (message.length() == 0)
|
||||
getline(cin, message);
|
||||
dc->send(message);
|
||||
break;
|
||||
|
||||
default:
|
||||
cout << "** Invalid Command ** ";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (dc)
|
||||
dc->close();
|
||||
if (pc)
|
||||
pc->close();
|
||||
}
|
140
test/p2p/offerer.cpp
Normal file
140
test/p2p/offerer.cpp
Normal file
@ -0,0 +1,140 @@
|
||||
/**
|
||||
* Copyright (c) 2019 Paul-Louis Ageneau, Murat Dogan
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2.1 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
#include "rtc/rtc.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
|
||||
using namespace rtc;
|
||||
using namespace std;
|
||||
|
||||
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
// InitLogger(LogLevel::Debug);
|
||||
Configuration config;
|
||||
// config.iceServers.emplace_back("stun.l.google.com:19302");
|
||||
// config.enableIceTcp = true;
|
||||
|
||||
// Add TURN Server Example
|
||||
// IceServer turnServer("TURN_SERVER_URL", "PORT_NO", "USERNAME", "PASSWORD",
|
||||
// IceServer::RelayType::TurnTls);
|
||||
// config.iceServers.push_back(turnServer);
|
||||
|
||||
auto pc = std::make_shared<PeerConnection>(config);
|
||||
|
||||
pc->onLocalDescription([](const Description &sdp) {
|
||||
std::string s(sdp);
|
||||
std::replace(s.begin(), s.end(), '\n', static_cast<char>(94));
|
||||
cout << "Local Description (Paste this to other peer):" << endl << s << endl << endl;
|
||||
});
|
||||
|
||||
pc->onLocalCandidate([](const Candidate &candidate) {
|
||||
cout << "Local Candidate (Paste this to other peer):" << endl << candidate << endl << endl;
|
||||
});
|
||||
|
||||
pc->onStateChange(
|
||||
[](PeerConnection::State state) { cout << "[ State: " << state << " ]" << endl; });
|
||||
|
||||
pc->onGatheringStateChange([](PeerConnection::GatheringState state) {
|
||||
cout << "[ Gathering State: " << state << " ]" << endl;
|
||||
});
|
||||
|
||||
auto dc = pc->createDataChannel("test");
|
||||
dc->onOpen([&]() { cout << "[ DataChannel open: " << dc->label() << " ]" << endl; });
|
||||
|
||||
dc->onClosed([&]() { cout << "[ DataChannel closed: " << dc->label() << " ]" << endl; });
|
||||
|
||||
dc->onMessage([](const variant<binary, string> &message) {
|
||||
if (holds_alternative<string>(message)) {
|
||||
cout << "[ Received: " << get<string>(message) << " ]" << endl;
|
||||
}
|
||||
});
|
||||
|
||||
bool exit = false;
|
||||
while (!exit) {
|
||||
cout << endl
|
||||
<< endl
|
||||
<< "*************************************************************************" << endl
|
||||
<< "* 0: Exit /"
|
||||
<< " 1: Enter Description /"
|
||||
<< " 2: Enter Candidate /"
|
||||
<< " 3: Send Message *" << endl
|
||||
<< " [Command]: ";
|
||||
|
||||
int command;
|
||||
std::string sdp, candidate, message;
|
||||
const char *a;
|
||||
std::unique_ptr<Candidate> candidatePtr;
|
||||
std::unique_ptr<Description> descPtr;
|
||||
cin >> command;
|
||||
|
||||
switch (command) {
|
||||
case 0:
|
||||
exit = true;
|
||||
break;
|
||||
|
||||
case 1:
|
||||
// Parse Description
|
||||
cout << "[SDP]: ";
|
||||
sdp = "";
|
||||
while (sdp.length() == 0)
|
||||
getline(cin, sdp);
|
||||
|
||||
std::replace(sdp.begin(), sdp.end(), static_cast<char>(94), '\n');
|
||||
descPtr = std::make_unique<Description>(sdp);
|
||||
pc->setRemoteDescription(*descPtr);
|
||||
break;
|
||||
|
||||
case 2:
|
||||
// Parse Candidate
|
||||
cout << "[Candidate]: ";
|
||||
candidate = "";
|
||||
while (candidate.length() == 0)
|
||||
getline(cin, candidate);
|
||||
|
||||
candidatePtr = std::make_unique<Candidate>(candidate);
|
||||
pc->addRemoteCandidate(*candidatePtr);
|
||||
break;
|
||||
|
||||
case 3:
|
||||
// Send Message
|
||||
if (!dc->isOpen()) {
|
||||
cout << "** Channel is not Open ** ";
|
||||
break;
|
||||
}
|
||||
cout << "[Message]: ";
|
||||
message = "";
|
||||
while (message.length() == 0)
|
||||
getline(cin, message);
|
||||
dc->send(message);
|
||||
break;
|
||||
|
||||
default:
|
||||
cout << "** Invalid Command ** ";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (dc)
|
||||
dc->close();
|
||||
if (pc)
|
||||
pc->close();
|
||||
}
|
Reference in New Issue
Block a user