Compare commits

...

35 Commits

Author SHA1 Message Date
3f67a10720 Bumped version to 0.8.0 2020-08-10 14:17:22 +02:00
c0ea85025b Added hostname to web example server 2020-08-10 14:14:14 +02:00
b44f90b87f Added cmake flag for warnings as errors 2020-08-10 14:04:45 +02:00
498e7246a0 Fixed compilation on MSVC 2020-08-10 11:36:41 +02:00
8b1a67760c Fixed compilation with libnice 2020-08-10 11:18:20 +02:00
381d0ead1a Fixed compilation on MacOS 2020-08-10 11:06:15 +02:00
d1f77ebb10 Set warnings as errors 2020-08-10 11:01:22 +02:00
6223967bca Merge pull request #138 from paullouisageneau/reliability-c-api
Add Data Channel reliability to C API
2020-08-10 10:24:17 +02:00
e2c42ff73b Fixed unordered flag and renamed reliability type for consistency 2020-08-10 10:13:03 +02:00
980ee303c8 Fixed conversion for maxRetransmits 2020-08-08 22:10:36 +02:00
2967444678 Added Data Channel reliability to C API 2020-08-08 22:09:13 +02:00
35cad4e916 Added link to murat-dogan/node-datachannel following #122 2020-07-31 15:33:06 +02:00
ccc4d61fd3 Added Android explicitly following #44 2020-07-31 15:33:06 +02:00
aac9b101d8 Merge pull request #135 from murat-dogan/master
add NO_TESTS option
2020-07-28 13:36:13 +02:00
8d9eeda6e5 Update CMakeLists.txt 2020-07-28 13:46:55 +03:00
5eaee49e1e add NO_TESTS option 2020-07-28 10:59:55 +03:00
68fd331a9c Fixed setLocalDescription() so it doesn't require a fingerprint 2020-07-26 18:46:03 +02:00
0bb246785b Updated libjuice to v0.4.5 2020-07-26 12:55:07 +02:00
0fbdde73e7 Bumped version to 0.7.2 2020-07-25 15:08:08 +02:00
98ea6102b5 Added some verbose logging 2020-07-25 15:05:31 +02:00
ff702139e4 Merge pull request #133 from paullouisageneau/usrsctp-mitigation-fix
Extend mitigation for usrsctp send after unregistering
2020-07-25 12:53:07 +00:00
0593566ba6 Free data when instance is invalid on recv callback 2020-07-24 22:12:57 +02:00
b325100a7a Enforce usrsctp instance pointer check on every callback 2020-07-24 18:38:50 +02:00
e675ada081 Bumped version to 0.7.1 2020-07-23 22:32:18 +02:00
2f8d06db81 Fixed compilation on MSVC 2020-07-23 20:11:31 +02:00
e76d933de2 Safer callback reset strategy for PeerConnection 2020-07-23 19:52:07 +02:00
7090f2344b Merge pull request #132 from paullouisageneau/prevent-user-deadlock
Close data channels  asynchronously for safety
2020-07-23 10:23:30 +00:00
bd06ccbc83 Close data channels on processor to prevent deadlock on user re-calling close() 2020-07-23 11:47:20 +02:00
e2a2040d94 Merge pull request #130 from paullouisageneau/prevent-copy-sctp
Prevent data copy in SCTP transport
2020-07-22 16:13:41 +00:00
b6ffa13b72 Prevent data copy in SCTP transport 2020-07-22 18:02:00 +02:00
726b4c4c33 Catch exceptions in transport callbacks for safety 2020-07-22 17:15:49 +02:00
9b4c96ee18 Updated libjuice 2020-07-22 16:08:45 +02:00
faa03ce100 Fixed typo breaking bytes received for deprecated PPID_BINARY_PARTIAL 2020-07-22 16:07:54 +02:00
fa931aba64 Cosmetic fix for external resources links 2020-07-20 14:15:34 +02:00
1fb0d8923b Added link to datachannel-wasm 2020-07-20 14:14:15 +02:00
21 changed files with 307 additions and 140 deletions

View File

@ -16,7 +16,7 @@ jobs:
- name: submodules - name: submodules
run: git submodule update --init --recursive run: git submodule update --init --recursive
- name: cmake - name: cmake
run: cmake -B build -DUSE_GNUTLS=1 run: cmake -B build -DUSE_GNUTLS=1 -DWARNINGS_AS_ERRORS=1
- name: make - name: make
run: (cd build; make -j2) run: (cd build; make -j2)
- name: test - name: test
@ -30,7 +30,7 @@ jobs:
- name: submodules - name: submodules
run: git submodule update --init --recursive run: git submodule update --init --recursive
- name: cmake - name: cmake
run: cmake -B build -DUSE_GNUTLS=1 run: cmake -B build -DUSE_GNUTLS=1 -DWARNINGS_AS_ERRORS=1
- name: make - name: make
run: (cd build; make -j2) run: (cd build; make -j2)
- name: test - name: test

View File

@ -16,7 +16,7 @@ jobs:
- name: submodules - name: submodules
run: git submodule update --init --recursive run: git submodule update --init --recursive
- name: cmake - name: cmake
run: cmake -B build -DUSE_GNUTLS=1 -DUSE_NICE=1 run: cmake -B build -DUSE_GNUTLS=1 -DUSE_NICE=1 -DWARNINGS_AS_ERRORS=1
- name: make - name: make
run: (cd build; make -j2) run: (cd build; make -j2)
- name: test - name: test

View File

@ -16,7 +16,7 @@ jobs:
- name: submodules - name: submodules
run: git submodule update --init --recursive run: git submodule update --init --recursive
- name: cmake - name: cmake
run: cmake -B build -DUSE_GNUTLS=0 run: cmake -B build -DUSE_GNUTLS=0 -DWARNINGS_AS_ERRORS=1
- name: make - name: make
run: (cd build; make -j2) run: (cd build; make -j2)
- name: test - name: test
@ -30,7 +30,7 @@ jobs:
- name: submodules - name: submodules
run: git submodule update --init --recursive run: git submodule update --init --recursive
- name: cmake - name: cmake
run: cmake -B build -DUSE_GNUTLS=0 run: cmake -B build -DUSE_GNUTLS=0 -WARNINGS_AS_ERRORS=1
env: env:
OPENSSL_ROOT_DIR: /usr/local/opt/openssl OPENSSL_ROOT_DIR: /usr/local/opt/openssl
OPENSSL_LIBRARIES: /usr/local/opt/openssl/lib OPENSSL_LIBRARIES: /usr/local/opt/openssl/lib
@ -48,7 +48,7 @@ jobs:
- name: submodules - name: submodules
run: git submodule update --init --recursive run: git submodule update --init --recursive
- name: cmake - name: cmake
run: cmake -B build -G "NMake Makefiles" -DUSE_GNUTLS=0 run: cmake -B build -G "NMake Makefiles" -DUSE_GNUTLS=0 -WARNINGS_AS_ERRORS=1
- name: nmake - name: nmake
run: | run: |
cd build cd build

View File

@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.7) cmake_minimum_required(VERSION 3.7)
project(libdatachannel project(libdatachannel
DESCRIPTION "WebRTC Data Channels Library" DESCRIPTION "WebRTC Data Channels Library"
VERSION 0.7.0 VERSION 0.8.0
LANGUAGES CXX) LANGUAGES CXX)
# Options # Options
@ -10,6 +10,8 @@ option(USE_NICE "Use libnice instead of libjuice" OFF)
option(USE_SRTP "Enable SRTP for media support" OFF) option(USE_SRTP "Enable SRTP for media support" OFF)
option(NO_WEBSOCKET "Disable WebSocket support" OFF) option(NO_WEBSOCKET "Disable WebSocket support" OFF)
option(NO_EXAMPLES "Disable examples" OFF) option(NO_EXAMPLES "Disable examples" OFF)
option(NO_TESTS "Disable tests build" OFF)
option(WARNINGS_AS_ERRORS "Treat warnings as errors" OFF)
if(USE_NICE) if(USE_NICE)
option(USE_JUICE "Use libjuice" OFF) option(USE_JUICE "Use libjuice" OFF)
@ -213,7 +215,23 @@ add_library(LibDataChannel::LibDataChannelStatic ALIAS datachannel-static)
install(TARGETS datachannel LIBRARY DESTINATION lib) install(TARGETS datachannel LIBRARY DESTINATION lib)
install(FILES ${LIBDATACHANNEL_HEADERS} DESTINATION include/rtc) install(FILES ${LIBDATACHANNEL_HEADERS} DESTINATION include/rtc)
if(NOT MSVC)
target_compile_options(datachannel PRIVATE -Wall -Wextra)
target_compile_options(datachannel-static PRIVATE -Wall -Wextra)
endif()
if(WARNINGS_AS_ERRORS)
if(MSVC)
target_compile_options(datachannel PRIVATE /WX)
target_compile_options(datachannel-static PRIVATE /WX)
else()
target_compile_options(datachannel PRIVATE -Werror)
target_compile_options(datachannel-static PRIVATE -Werror)
endif()
endif()
# Tests # Tests
if(NOT NO_TESTS)
add_executable(datachannel-tests ${TESTS_SOURCES}) add_executable(datachannel-tests ${TESTS_SOURCES})
set_target_properties(datachannel-tests PROPERTIES set_target_properties(datachannel-tests PROPERTIES
VERSION ${PROJECT_VERSION} VERSION ${PROJECT_VERSION}
@ -239,6 +257,7 @@ if(WIN32)
else() else()
target_link_libraries(datachannel-benchmark datachannel) target_link_libraries(datachannel-benchmark datachannel)
endif() endif()
endif()
# Examples # Examples
if(NOT NO_EXAMPLES) if(NOT NO_EXAMPLES)

View File

@ -1,6 +1,6 @@
# libdatachannel - C/C++ WebRTC Data Channels # libdatachannel - C/C++ WebRTC Data Channels
libdatachannel is a standalone implementation of WebRTC Data Channels and WebSockets in C++17 with C bindings for POSIX platforms (including Linux and Apple macOS) and Microsoft Windows. It enables direct connectivity between native applications and web browsers without the pain of importing the entire WebRTC stack. The interface consists of simplified versions of the JavaScript WebRTC and WebSocket APIs present in browsers, in order to ease the design of cross-environment applications. libdatachannel is a standalone implementation of WebRTC Data Channels and WebSockets in C++17 with C bindings for POSIX platforms (including GNU/Linux, Android, and Apple macOS) and Microsoft Windows. It enables direct connectivity between native applications and web browsers without the pain of importing the entire WebRTC stack. The interface consists of simplified versions of the JavaScript WebRTC and WebSocket APIs present in browsers, in order to ease the design of cross-environment applications.
It can be compiled with multiple backends: It can be compiled with multiple backends:
- The security layer can be provided through [OpenSSL](https://www.openssl.org/) or [GnuTLS](https://www.gnutls.org/). - The security layer can be provided through [OpenSSL](https://www.openssl.org/) or [GnuTLS](https://www.gnutls.org/).
- The connectivity for WebRTC can be provided through my ad-hoc ICE library [libjuice](https://github.com/paullouisageneau/libjuice) as submodule or through [libnice](https://github.com/libnice/libnice). - The connectivity for WebRTC can be provided through my ad-hoc ICE library [libjuice](https://github.com/paullouisageneau/libjuice) as submodule or through [libnice](https://github.com/libnice/libnice).
@ -197,5 +197,7 @@ ws->open("wss://my.websocket/service");
``` ```
## External resources ## External resources
- Rust wrappers for libdatachannel: [lerouxrgd/datachannel-rs](https://github.com/lerouxrgd/datachannel-rs) - Rust wrapper for libdatachannel: [datachannel-rs](https://github.com/lerouxrgd/datachannel-rs)
- Node.js wrapper for libdatachannel: [node-datachannel](https://github.com/murat-dogan/node-datachannel)
- WebAssembly wrapper compatible with libdatachannel: [datachannel-wasm](https://github.com/paullouisageneau/datachannel-wasm)

2
deps/libjuice vendored

View File

@ -98,5 +98,10 @@ wsServer.on('request', (req) => {
clients[id] = conn; clients[id] = conn;
}); });
httpServer.listen(8000); const hostname = '127.0.0.1';
const port = 8000;
httpServer.listen(port, hostname, () => {
console.log(`Server listening on ${hostname}:${port}`);
});

View File

@ -84,9 +84,24 @@ template <typename F, typename T, typename... Args> auto weak_bind(F &&f, T *t,
template <typename... P> class synchronized_callback { template <typename... P> class synchronized_callback {
public: public:
synchronized_callback() = default; synchronized_callback() = default;
synchronized_callback(synchronized_callback &&cb) { *this = std::move(cb); }
synchronized_callback(const synchronized_callback &cb) { *this = cb; }
synchronized_callback(std::function<void(P...)> func) { *this = std::move(func); } synchronized_callback(std::function<void(P...)> func) { *this = std::move(func); }
~synchronized_callback() { *this = nullptr; } ~synchronized_callback() { *this = nullptr; }
synchronized_callback &operator=(synchronized_callback &&cb) {
std::scoped_lock lock(mutex, cb.mutex);
callback = std::move(cb.callback);
cb.callback = nullptr;
return *this;
}
synchronized_callback &operator=(const synchronized_callback &cb) {
std::scoped_lock lock(mutex, cb.mutex);
callback = cb.callback;
return *this;
}
synchronized_callback &operator=(std::function<void(P...)> func) { synchronized_callback &operator=(std::function<void(P...)> func) {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
callback = std::move(func); callback = std::move(func);

View File

@ -37,6 +37,8 @@ struct Message : binary {
Message(Iterator begin_, Iterator end_, Type type_ = Binary) Message(Iterator begin_, Iterator end_, Type type_ = Binary)
: binary(begin_, end_), type(type_) {} : binary(begin_, end_), type(type_) {}
Message(binary &&data, Type type_ = Binary) : binary(std::move(data)), type(type_) {}
Type type; Type type;
unsigned int stream = 0; unsigned int stream = 0;
std::shared_ptr<Reliability> reliability; std::shared_ptr<Reliability> reliability;
@ -68,6 +70,15 @@ inline message_ptr make_message(size_t size, Message::Type type = Message::Binar
return message; return message;
} }
inline message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
unsigned int stream = 0,
std::shared_ptr<Reliability> reliability = nullptr) {
auto message = std::make_shared<Message>(std::move(data), type);
message->stream = stream;
message->reliability = reliability;
return message;
}
} // namespace rtc } // namespace rtc
#endif #endif

View File

@ -27,13 +27,9 @@
namespace rtc { namespace rtc {
struct Reliability { struct Reliability {
enum Type : uint8_t { enum class Type { Reliable = 0, Rexmit, Timed };
TYPE_RELIABLE = 0x00,
TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
};
Type type = TYPE_RELIABLE; Type type = Type::Reliable;
bool unordered = false; bool unordered = false;
std::variant<int, std::chrono::milliseconds> rexmit = 0; std::variant<int, std::chrono::milliseconds> rexmit = 0;
}; };

View File

@ -78,6 +78,13 @@ typedef struct {
uint16_t portRangeEnd; uint16_t portRangeEnd;
} rtcConfiguration; } rtcConfiguration;
typedef struct {
bool unordered;
bool unreliable;
unsigned int maxPacketLifeTime; // ignored if reliable
unsigned int maxRetransmits; // ignored if reliable
} rtcReliability;
typedef void (*rtcLogCallbackFunc)(rtcLogLevel level, const char *message); typedef void (*rtcLogCallbackFunc)(rtcLogLevel level, const char *message);
typedef void (*rtcDataChannelCallbackFunc)(int dc, void *ptr); typedef void (*rtcDataChannelCallbackFunc)(int dc, void *ptr);
typedef void (*rtcDescriptionCallbackFunc)(const char *sdp, const char *type, void *ptr); typedef void (*rtcDescriptionCallbackFunc)(const char *sdp, const char *type, void *ptr);
@ -115,9 +122,13 @@ RTC_EXPORT int rtcGetRemoteAddress(int pc, char *buffer, int size);
// DataChannel // DataChannel
RTC_EXPORT int rtcCreateDataChannel(int pc, const char *label); // returns dc id RTC_EXPORT int rtcCreateDataChannel(int pc, const char *label); // returns dc id
RTC_EXPORT int rtcCreateDataChannelExt(int pc, const char *label, const char *protocol,
const rtcReliability *reliability); // returns dc id
RTC_EXPORT int rtcDeleteDataChannel(int dc); RTC_EXPORT int rtcDeleteDataChannel(int dc);
RTC_EXPORT int rtcGetDataChannelLabel(int dc, char *buffer, int size); RTC_EXPORT int rtcGetDataChannelLabel(int dc, char *buffer, int size);
RTC_EXPORT int rtcGetDataChannelProtocol(int dc, char *buffer, int size);
RTC_EXPORT int rtcGetDataChannelReliability(int dc, rtcReliability *reliability);
// WebSocket // WebSocket
#if RTC_ENABLE_WEBSOCKET #if RTC_ENABLE_WEBSOCKET

View File

@ -31,6 +31,7 @@ namespace rtc {
using std::shared_ptr; using std::shared_ptr;
using std::weak_ptr; using std::weak_ptr;
using std::chrono::milliseconds;
// Messages for the DataChannel establishment protocol // Messages for the DataChannel establishment protocol
// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09 // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09
@ -43,6 +44,12 @@ enum MessageType : uint8_t {
MESSAGE_CLOSE = 0x04 MESSAGE_CLOSE = 0x04
}; };
enum ChannelType : uint8_t {
CHANNEL_RELIABLE = 0x00,
CHANNEL_PARTIAL_RELIABLE_REXMIT = 0x01,
CHANNEL_PARTIAL_RELIABLE_TIMED = 0x02
};
#pragma pack(push, 1) #pragma pack(push, 1)
struct OpenMessage { struct OpenMessage {
uint8_t type = MESSAGE_OPEN; uint8_t type = MESSAGE_OPEN;
@ -168,22 +175,33 @@ size_t DataChannel::availableAmount() const { return mRecvQueue.amount(); }
void DataChannel::open(shared_ptr<SctpTransport> transport) { void DataChannel::open(shared_ptr<SctpTransport> transport) {
mSctpTransport = transport; mSctpTransport = transport;
uint8_t channelType = static_cast<uint8_t>(mReliability->type); uint8_t channelType;
if (mReliability->unordered) uint32_t reliabilityParameter;
channelType &= 0x80; switch (mReliability->type) {
case Reliability::Type::Rexmit:
using std::chrono::milliseconds; channelType = CHANNEL_PARTIAL_RELIABLE_REXMIT;
uint32_t reliabilityParameter = 0;
if (mReliability->type == Reliability::TYPE_PARTIAL_RELIABLE_REXMIT)
reliabilityParameter = uint32_t(std::get<int>(mReliability->rexmit)); reliabilityParameter = uint32_t(std::get<int>(mReliability->rexmit));
else if (mReliability->type == Reliability::TYPE_PARTIAL_RELIABLE_TIMED) break;
case Reliability::Type::Timed:
channelType = CHANNEL_PARTIAL_RELIABLE_TIMED;
reliabilityParameter = uint32_t(std::get<milliseconds>(mReliability->rexmit).count()); reliabilityParameter = uint32_t(std::get<milliseconds>(mReliability->rexmit).count());
break;
default:
channelType = CHANNEL_RELIABLE;
reliabilityParameter = 0;
break;
}
if (mReliability->unordered)
channelType |= 0x80;
const size_t len = sizeof(OpenMessage) + mLabel.size() + mProtocol.size(); const size_t len = sizeof(OpenMessage) + mLabel.size() + mProtocol.size();
binary buffer(len, byte(0)); binary buffer(len, byte(0));
auto &open = *reinterpret_cast<OpenMessage *>(buffer.data()); auto &open = *reinterpret_cast<OpenMessage *>(buffer.data());
open.type = MESSAGE_OPEN; open.type = MESSAGE_OPEN;
open.channelType = mReliability->type; open.channelType = channelType;
open.priority = htons(0); open.priority = htons(0);
open.reliabilityParameter = htonl(reliabilityParameter); open.reliabilityParameter = htonl(reliabilityParameter);
open.labelLength = htons(uint16_t(mLabel.size())); open.labelLength = htons(uint16_t(mLabel.size()));
@ -272,19 +290,18 @@ void DataChannel::processOpenMessage(message_ptr message) {
mLabel.assign(end, open.labelLength); mLabel.assign(end, open.labelLength);
mProtocol.assign(end + open.labelLength, open.protocolLength); mProtocol.assign(end + open.labelLength, open.protocolLength);
using std::chrono::milliseconds;
mReliability->unordered = (open.reliabilityParameter & 0x80) != 0; mReliability->unordered = (open.reliabilityParameter & 0x80) != 0;
switch (open.channelType & 0x7F) { switch (open.channelType & 0x7F) {
case Reliability::TYPE_PARTIAL_RELIABLE_REXMIT: case CHANNEL_PARTIAL_RELIABLE_REXMIT:
mReliability->type = Reliability::TYPE_PARTIAL_RELIABLE_REXMIT; mReliability->type = Reliability::Type::Rexmit;
mReliability->rexmit = int(open.reliabilityParameter); mReliability->rexmit = int(open.reliabilityParameter);
break; break;
case Reliability::TYPE_PARTIAL_RELIABLE_TIMED: case CHANNEL_PARTIAL_RELIABLE_TIMED:
mReliability->type = Reliability::TYPE_PARTIAL_RELIABLE_TIMED; mReliability->type = Reliability::Type::Timed;
mReliability->rexmit = milliseconds(open.reliabilityParameter); mReliability->rexmit = milliseconds(open.reliabilityParameter);
break; break;
default: default:
mReliability->type = Reliability::TYPE_RELIABLE; mReliability->type = Reliability::Type::Reliable;
mReliability->rexmit = int(0); mReliability->rexmit = int(0);
} }

View File

@ -622,7 +622,8 @@ void IceTransport::CandidateCallback(NiceAgent *agent, NiceCandidate *candidate,
g_free(cand); g_free(cand);
} }
void IceTransport::GatheringDoneCallback(NiceAgent *agent, guint streamId, gpointer userData) { void IceTransport::GatheringDoneCallback(NiceAgent * /*agent*/, guint /*streamId*/,
gpointer userData) {
auto iceTransport = static_cast<rtc::IceTransport *>(userData); auto iceTransport = static_cast<rtc::IceTransport *>(userData);
try { try {
iceTransport->processGatheringDone(); iceTransport->processGatheringDone();
@ -631,8 +632,8 @@ void IceTransport::GatheringDoneCallback(NiceAgent *agent, guint streamId, gpoin
} }
} }
void IceTransport::StateChangeCallback(NiceAgent *agent, guint streamId, guint componentId, void IceTransport::StateChangeCallback(NiceAgent * /*agent*/, guint /*streamId*/,
guint state, gpointer userData) { guint /*componentId*/, guint state, gpointer userData) {
auto iceTransport = static_cast<rtc::IceTransport *>(userData); auto iceTransport = static_cast<rtc::IceTransport *>(userData);
try { try {
iceTransport->processStateChange(state); iceTransport->processStateChange(state);
@ -641,8 +642,8 @@ void IceTransport::StateChangeCallback(NiceAgent *agent, guint streamId, guint c
} }
} }
void IceTransport::RecvCallback(NiceAgent *agent, guint streamId, guint componentId, guint len, void IceTransport::RecvCallback(NiceAgent * /*agent*/, guint /*streamId*/, guint /*componentId*/,
gchar *buf, gpointer userData) { guint len, gchar *buf, gpointer userData) {
auto iceTransport = static_cast<rtc::IceTransport *>(userData); auto iceTransport = static_cast<rtc::IceTransport *>(userData);
try { try {
PLOG_VERBOSE << "Incoming size=" << len; PLOG_VERBOSE << "Incoming size=" << len;
@ -663,8 +664,8 @@ gboolean IceTransport::TimeoutCallback(gpointer userData) {
return FALSE; return FALSE;
} }
void IceTransport::LogCallback(const gchar *logDomain, GLogLevelFlags logLevel, void IceTransport::LogCallback(const gchar * /*logDomain*/, GLogLevelFlags logLevel,
const gchar *message, gpointer userData) { const gchar *message, gpointer /*userData*/) {
plog::Severity severity; plog::Severity severity;
unsigned int flags = logLevel & G_LOG_LEVEL_MASK; unsigned int flags = logLevel & G_LOG_LEVEL_MASK;
if (flags & G_LOG_LEVEL_ERROR) if (flags & G_LOG_LEVEL_ERROR)
@ -708,7 +709,7 @@ bool IceTransport::getSelectedCandidatePair(CandidateInfo *localInfo, CandidateI
return true; return true;
} }
const CandidateType IceTransport::NiceTypeToCandidateType(NiceCandidateType type) { CandidateType IceTransport::NiceTypeToCandidateType(NiceCandidateType type) {
switch (type) { switch (type) {
case NiceCandidateType::NICE_CANDIDATE_TYPE_PEER_REFLEXIVE: case NiceCandidateType::NICE_CANDIDATE_TYPE_PEER_REFLEXIVE:
return CandidateType::PeerReflexive; return CandidateType::PeerReflexive;
@ -721,7 +722,7 @@ const CandidateType IceTransport::NiceTypeToCandidateType(NiceCandidateType type
} }
} }
const CandidateTransportType CandidateTransportType
IceTransport::NiceTransportTypeToCandidateTransportType(NiceCandidateTransport type) { IceTransport::NiceTransportTypeToCandidateTransportType(NiceCandidateTransport type) {
switch (type) { switch (type) {
case NiceCandidateTransport::NICE_CANDIDATE_TRANSPORT_TCP_ACTIVE: case NiceCandidateTransport::NICE_CANDIDATE_TRANSPORT_TCP_ACTIVE:

View File

@ -113,8 +113,9 @@ private:
static gboolean TimeoutCallback(gpointer userData); static gboolean TimeoutCallback(gpointer userData);
static void LogCallback(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, static void LogCallback(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message,
gpointer user_data); gpointer user_data);
static const CandidateType NiceTypeToCandidateType(NiceCandidateType type); static CandidateType NiceTypeToCandidateType(NiceCandidateType type);
static const CandidateTransportType NiceTransportTypeToCandidateTransportType(NiceCandidateTransport type); static CandidateTransportType
NiceTransportTypeToCandidateTransportType(NiceCandidateTransport type);
#endif #endif
}; };

View File

@ -51,13 +51,17 @@ PeerConnection::PeerConnection(const Configuration &config)
} }
PeerConnection::~PeerConnection() { PeerConnection::~PeerConnection() {
close();
PLOG_VERBOSE << "Destroying PeerConnection"; PLOG_VERBOSE << "Destroying PeerConnection";
close();
mProcessor->join();
} }
void PeerConnection::close() { void PeerConnection::close() {
PLOG_VERBOSE << "Closing PeerConnection"; PLOG_VERBOSE << "Closing PeerConnection";
closeDataChannels();
// Close data channels asynchronously
mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this));
closeTransports(); closeTransports();
} }
@ -78,6 +82,8 @@ std::optional<Description> PeerConnection::remoteDescription() const {
} }
void PeerConnection::setLocalDescription(std::optional<Description> description) { void PeerConnection::setLocalDescription(std::optional<Description> description) {
PLOG_VERBOSE << "Setting local description";
if (auto iceTransport = std::atomic_load(&mIceTransport)) { if (auto iceTransport = std::atomic_load(&mIceTransport)) {
throw std::logic_error("Local description is already set"); throw std::logic_error("Local description is already set");
} else { } else {
@ -94,6 +100,8 @@ void PeerConnection::setLocalDescription(std::optional<Description> description)
} }
void PeerConnection::setRemoteDescription(Description description) { void PeerConnection::setRemoteDescription(Description description) {
PLOG_VERBOSE << "Setting remote description: " << string(description);
description.hintType(localDescription() ? Description::Type::Answer : Description::Type::Offer); description.hintType(localDescription() ? Description::Type::Answer : Description::Type::Offer);
auto type = description.type(); auto type = description.type();
auto remoteCandidates = description.extractCandidates(); // Candidates will be added at the end auto remoteCandidates = description.extractCandidates(); // Candidates will be added at the end
@ -139,6 +147,7 @@ void PeerConnection::setRemoteDescription(Description description) {
} }
void PeerConnection::addRemoteCandidate(Candidate candidate) { void PeerConnection::addRemoteCandidate(Candidate candidate) {
PLOG_VERBOSE << "Adding remote candidate: " << string(candidate);
auto iceTransport = std::atomic_load(&mIceTransport); auto iceTransport = std::atomic_load(&mIceTransport);
if (!mRemoteDescription || !iceTransport) if (!mRemoteDescription || !iceTransport)
@ -239,7 +248,7 @@ void PeerConnection::onMedia(std::function<void(const binary &packet)> callback)
mMediaCallback = callback; mMediaCallback = callback;
} }
void PeerConnection::outgoingMedia(message_ptr message) { void PeerConnection::outgoingMedia([[maybe_unused]] message_ptr message) {
if (!hasMedia()) if (!hasMedia())
throw std::runtime_error("PeerConnection has no media support"); throw std::runtime_error("PeerConnection has no media support");
@ -439,13 +448,15 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
void PeerConnection::closeTransports() { void PeerConnection::closeTransports() {
PLOG_VERBOSE << "Closing transports"; PLOG_VERBOSE << "Closing transports";
// Change state to sink state Closed to block init methods // Change state to sink state Closed
changeState(State::Closed); changeState(State::Closed);
// Reset callbacks now that state is changed // Reset callbacks now that state is changed
resetCallbacks(); resetCallbacks();
// Pass the references to a thread, allowing to terminate a transport from its own thread // Initiate transport stop on the processor after closing the data channels
mProcessor->enqueue([this]() {
// Pass the pointers to a thread
auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr)); auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr)); auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr)); auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
@ -461,6 +472,7 @@ void PeerConnection::closeTransports() {
dtls.reset(); dtls.reset();
ice.reset(); ice.reset();
}); });
});
} }
void PeerConnection::endLocalCandidates() { void PeerConnection::endLocalCandidates() {
@ -648,7 +660,12 @@ bool PeerConnection::changeState(State state) {
} while (!mState.compare_exchange_weak(current, state)); } while (!mState.compare_exchange_weak(current, state));
if (state == State::Closed)
// This is the last state change, so we may steal the callback
mProcessor->enqueue([cb = std::move(mStateChangeCallback)]() { cb(State::Closed); });
else
mProcessor->enqueue([this, state]() { mStateChangeCallback(state); }); mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
return true; return true;
} }

View File

@ -29,6 +29,7 @@
#include "plog/Formatters/FuncMessageFormatter.h" #include "plog/Formatters/FuncMessageFormatter.h"
#include <chrono>
#include <exception> #include <exception>
#include <mutex> #include <mutex>
#include <type_traits> #include <type_traits>
@ -43,6 +44,7 @@
using namespace rtc; using namespace rtc;
using std::shared_ptr; using std::shared_ptr;
using std::string; using std::string;
using std::chrono::milliseconds;
namespace { namespace {
@ -241,9 +243,30 @@ int rtcDeletePeerConnection(int pc) {
} }
int rtcCreateDataChannel(int pc, const char *label) { int rtcCreateDataChannel(int pc, const char *label) {
return rtcCreateDataChannelExt(pc, label, nullptr, nullptr);
}
int rtcCreateDataChannelExt(int pc, const char *label, const char *protocol,
const rtcReliability *reliability) {
return WRAP({ return WRAP({
Reliability r = {};
if (reliability) {
r.unordered = reliability->unordered;
if (reliability->unreliable) {
if (reliability->maxPacketLifeTime > 0) {
r.type = Reliability::Type::Timed;
r.rexmit = milliseconds(reliability->maxPacketLifeTime);
} else {
r.type = Reliability::Type::Rexmit;
r.rexmit = int(reliability->maxRetransmits);
}
} else {
r.type = Reliability::Type::Reliable;
}
}
auto peerConnection = getPeerConnection(pc); auto peerConnection = getPeerConnection(pc);
int dc = emplaceDataChannel(peerConnection->createDataChannel(string(label))); int dc = emplaceDataChannel(peerConnection->createDataChannel(
string(label ? label : ""), string(protocol ? protocol : ""), r));
if (auto ptr = getUserPointer(pc)) if (auto ptr = getUserPointer(pc))
rtcSetUserPointer(dc, *ptr); rtcSetUserPointer(dc, *ptr);
return dc; return dc;
@ -447,6 +470,48 @@ int rtcGetDataChannelLabel(int dc, char *buffer, int size) {
}); });
} }
int rtcGetDataChannelProtocol(int dc, char *buffer, int size) {
return WRAP({
auto dataChannel = getDataChannel(dc);
if (!buffer)
throw std::invalid_argument("Unexpected null pointer");
if (size <= 0)
return 0;
string protocol = dataChannel->protocol();
const char *data = protocol.data();
size = std::min(size - 1, int(protocol.size()));
std::copy(data, data + size, buffer);
buffer[size] = '\0';
return int(size + 1);
});
}
int rtcGetDataChannelReliability(int dc, rtcReliability *reliability) {
return WRAP({
auto dataChannel = getDataChannel(dc);
if (!reliability)
throw std::invalid_argument("Unexpected null pointer");
Reliability r = dataChannel->reliability();
std::memset(reliability, 0, sizeof(*reliability));
reliability->unordered = r.unordered;
if (r.type == Reliability::Type::Timed) {
reliability->unreliable = true;
reliability->maxPacketLifeTime = unsigned(std::get<milliseconds>(r.rexmit).count());
} else if (r.type == Reliability::Type::Rexmit) {
reliability->unreliable = true;
reliability->maxRetransmits = unsigned(std::get<int>(r.rexmit));
} else {
reliability->unreliable = false;
}
return 0;
});
}
int rtcSetOpenCallback(int id, rtcOpenCallbackFunc cb) { int rtcSetOpenCallback(int id, rtcOpenCallbackFunc cb) {
return WRAP({ return WRAP({
auto channel = getChannel(id); auto channel = getChannel(id);

View File

@ -372,12 +372,12 @@ bool SctpTransport::trySendMessage(message_ptr message) {
spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED; spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
switch (reliability.type) { switch (reliability.type) {
case Reliability::TYPE_PARTIAL_RELIABLE_REXMIT: case Reliability::Type::Rexmit:
spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX; spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
spa.sendv_prinfo.pr_value = uint32_t(std::get<int>(reliability.rexmit)); spa.sendv_prinfo.pr_value = uint32_t(std::get<int>(reliability.rexmit));
break; break;
case Reliability::TYPE_PARTIAL_RELIABLE_TIMED: case Reliability::Type::Timed:
spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
spa.sendv_prinfo.pr_value = uint32_t(std::get<milliseconds>(reliability.rexmit).count()); spa.sendv_prinfo.pr_value = uint32_t(std::get<milliseconds>(reliability.rexmit).count());
@ -451,7 +451,7 @@ void SctpTransport::sendReset(uint16_t streamId) {
mWrittenCondition.wait_for(lock, 1000ms, mWrittenCondition.wait_for(lock, 1000ms,
[&]() { return mWritten || state() != State::Connected; }); [&]() { return mWritten || state() != State::Connected; });
} else if (errno == EINVAL) { } else if (errno == EINVAL) {
PLOG_VERBOSE << "SCTP stream " << streamId << " already reset"; PLOG_DEBUG << "SCTP stream " << streamId << " already reset";
} else { } else {
PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno; PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno;
} }
@ -479,32 +479,22 @@ int SctpTransport::handleRecv(struct socket * /*sock*/, union sctp_sockstore /*a
// therefore partial notifications and messages need to be handled separately. // therefore partial notifications and messages need to be handled separately.
if (flags & MSG_NOTIFICATION) { if (flags & MSG_NOTIFICATION) {
// SCTP event notification // SCTP event notification
mPartialNotification.insert(mPartialNotification.end(), data, data + len);
if (flags & MSG_EOR) { if (flags & MSG_EOR) {
if (!mPartialNotification.empty()) {
mPartialNotification.insert(mPartialNotification.end(), data, data + len);
data = mPartialNotification.data();
len = mPartialNotification.size();
}
// Notification is complete, process it // Notification is complete, process it
processNotification(reinterpret_cast<const union sctp_notification *>(data), len); processNotification(
reinterpret_cast<const union sctp_notification *>(mPartialNotification.data()),
mPartialNotification.size());
mPartialNotification.clear(); mPartialNotification.clear();
} else {
mPartialNotification.insert(mPartialNotification.end(), data, data + len);
} }
} else { } else {
// SCTP message // SCTP message
mPartialMessage.insert(mPartialMessage.end(), data, data + len);
if (flags & MSG_EOR) { if (flags & MSG_EOR) {
if (!mPartialMessage.empty()) {
mPartialMessage.insert(mPartialMessage.end(), data, data + len);
data = mPartialMessage.data();
len = mPartialMessage.size();
}
// Message is complete, process it // Message is complete, process it
processData(data, len, info.rcv_sid, PayloadId(htonl(info.rcv_ppid))); processData(std::move(mPartialMessage), info.rcv_sid,
PayloadId(htonl(info.rcv_ppid)));
mPartialMessage.clear(); mPartialMessage.clear();
} else {
mPartialMessage.insert(mPartialMessage.end(), data, data + len);
} }
} }
@ -539,62 +529,56 @@ int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t
return 0; // success return 0; // success
} }
void SctpTransport::processData(const byte *data, size_t len, uint16_t sid, PayloadId ppid) { void SctpTransport::processData(binary &&data, uint16_t sid, PayloadId ppid) {
PLOG_VERBOSE << "Process data, len=" << len; PLOG_VERBOSE << "Process data, size=" << data.size();
// The usage of the PPIDs "WebRTC String Partial" and "WebRTC Binary Partial" is deprecated. // The usage of the PPIDs "WebRTC String Partial" and "WebRTC Binary Partial" is deprecated.
// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6 // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.6
// We handle them at reception for compatibility reasons but should never send them. // We handle them at reception for compatibility reasons but should never send them.
switch (ppid) { switch (ppid) {
case PPID_CONTROL: case PPID_CONTROL:
recv(make_message(data, data + len, Message::Control, sid)); recv(make_message(std::move(data), Message::Control, sid));
break; break;
case PPID_STRING_PARTIAL: // deprecated case PPID_STRING_PARTIAL: // deprecated
mPartialStringData.insert(mPartialStringData.end(), data, data + len); mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
break; break;
case PPID_STRING: case PPID_STRING:
if (mPartialStringData.empty()) { if (mPartialStringData.empty()) {
mBytesReceived += len; mBytesReceived += data.size();
recv(make_message(data, data + len, Message::String, sid)); recv(make_message(std::move(data), Message::String, sid));
} else { } else {
mPartialStringData.insert(mPartialStringData.end(), data, data + len); mPartialStringData.insert(mPartialStringData.end(), data.begin(), data.end());
mBytesReceived += mPartialStringData.size(); mBytesReceived += mPartialStringData.size();
recv(make_message(mPartialStringData.begin(), mPartialStringData.end(), Message::String, recv(make_message(std::move(mPartialStringData), Message::String, sid));
sid));
mPartialStringData.clear(); mPartialStringData.clear();
} }
break; break;
case PPID_STRING_EMPTY: case PPID_STRING_EMPTY:
// This only accounts for when the partial data is empty recv(make_message(std::move(mPartialStringData), Message::String, sid));
recv(make_message(mPartialStringData.begin(), mPartialStringData.end(), Message::String,
sid));
mPartialStringData.clear(); mPartialStringData.clear();
break; break;
case PPID_BINARY_PARTIAL: // deprecated case PPID_BINARY_PARTIAL: // deprecated
mPartialBinaryData.insert(mPartialBinaryData.end(), data, data + len); mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
break; break;
case PPID_BINARY: case PPID_BINARY:
if (mPartialBinaryData.empty()) { if (mPartialBinaryData.empty()) {
mBytesReceived += len; mBytesReceived += data.size();
recv(make_message(data, data + len, Message::Binary, sid)); recv(make_message(std::move(data), Message::Binary, sid));
} else { } else {
mPartialBinaryData.insert(mPartialBinaryData.end(), data, data + len); mPartialBinaryData.insert(mPartialBinaryData.end(), data.begin(), data.end());
mBytesReceived += mPartialStringData.size(); mBytesReceived += mPartialBinaryData.size();
recv(make_message(mPartialBinaryData.begin(), mPartialBinaryData.end(), Message::Binary, recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
sid));
mPartialBinaryData.clear(); mPartialBinaryData.clear();
} }
break; break;
case PPID_BINARY_EMPTY: case PPID_BINARY_EMPTY:
// This only accounts for when the partial data is empty recv(make_message(std::move(mPartialBinaryData), Message::Binary, sid));
recv(make_message(mPartialBinaryData.begin(), mPartialBinaryData.end(), Message::Binary,
sid));
mPartialBinaryData.clear(); mPartialBinaryData.clear();
break; break;
@ -692,8 +676,16 @@ std::optional<milliseconds> SctpTransport::rtt() {
int SctpTransport::RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data, int SctpTransport::RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data,
size_t len, struct sctp_rcvinfo recv_info, int flags, void *ptr) { size_t len, struct sctp_rcvinfo recv_info, int flags, void *ptr) {
int ret = static_cast<SctpTransport *>(ptr)->handleRecv( auto *transport = static_cast<SctpTransport *>(ptr);
sock, addr, static_cast<const byte *>(data), len, recv_info, flags);
std::shared_lock lock(InstancesMutex);
if (Instances.find(transport) == Instances.end()) {
free(data);
return -1;
}
int ret =
transport->handleRecv(sock, addr, static_cast<const byte *>(data), len, recv_info, flags);
free(data); free(data);
return ret; return ret;
} }
@ -708,8 +700,6 @@ int SctpTransport::SendCallback(struct socket *sock, uint32_t sb_free) {
void *ptr = sconn->sconn_addr; void *ptr = sconn->sconn_addr;
auto *transport = static_cast<SctpTransport *>(ptr); auto *transport = static_cast<SctpTransport *>(ptr);
// Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
// https://github.com/sctplab/usrsctp/issues/405
std::shared_lock lock(InstancesMutex); std::shared_lock lock(InstancesMutex);
if (Instances.find(transport) == Instances.end()) if (Instances.find(transport) == Instances.end())
return -1; return -1;
@ -718,8 +708,15 @@ int SctpTransport::SendCallback(struct socket *sock, uint32_t sb_free) {
} }
int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) { int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {
return static_cast<SctpTransport *>(ptr)->handleWrite(static_cast<byte *>(data), len, tos, auto *transport = static_cast<SctpTransport *>(ptr);
set_df);
// Workaround for sctplab/usrsctp#405: Send callback is invoked on already closed socket
// https://github.com/sctplab/usrsctp/issues/405
std::shared_lock lock(InstancesMutex);
if (Instances.find(transport) == Instances.end())
return -1;
return transport->handleWrite(static_cast<byte *>(data), len, tos, set_df);
} }
} // namespace rtc } // namespace rtc

View File

@ -86,7 +86,7 @@ private:
int handleSend(size_t free); int handleSend(size_t free);
int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df); int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df);
void processData(const byte *data, size_t len, uint16_t streamId, PayloadId ppid); void processData(binary &&data, uint16_t streamId, PayloadId ppid);
void processNotification(const union sctp_notification *notify, size_t len); void processNotification(const union sctp_notification *notify, size_t len);
const uint16_t mPort; const uint16_t mPort;

View File

@ -54,7 +54,7 @@ SelectInterrupter::~SelectInterrupter() {
#endif #endif
} }
int SelectInterrupter::prepare(fd_set &readfds, fd_set &writefds) { int SelectInterrupter::prepare(fd_set &readfds, [[maybe_unused]] fd_set &writefds) {
std::lock_guard lock(mMutex); std::lock_guard lock(mMutex);
#ifdef _WIN32 #ifdef _WIN32
if (mDummySock == INVALID_SOCKET) if (mDummySock == INVALID_SOCKET)

View File

@ -65,10 +65,20 @@ public:
virtual bool send(message_ptr message) { return outgoing(message); } virtual bool send(message_ptr message) { return outgoing(message); }
protected: protected:
void recv(message_ptr message) { mRecvCallback(message); } void recv(message_ptr message) {
try {
mRecvCallback(message);
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
void changeState(State state) { void changeState(State state) {
try {
if (mState.exchange(state) != state) if (mState.exchange(state) != state)
mStateChangeCallback(state); mStateChangeCallback(state);
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
} }
virtual void incoming(message_ptr message) { recv(message); } virtual void incoming(message_ptr message) { recv(message); }

View File

@ -317,7 +317,7 @@ void WebSocket::closeTransports() {
// Reset callbacks now that state is changed // Reset callbacks now that state is changed
resetCallbacks(); resetCallbacks();
// Pass the references to a thread, allowing to terminate a transport from its own thread // Pass the pointers to a thread, allowing to terminate a transport from its own thread
auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr)); auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr));
auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr)); auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr));
auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr)); auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr));