Compare commits

..

22 Commits

Author SHA1 Message Date
b3edcfa05c Bumped version to 0.11.7 2021-03-04 12:08:43 +01:00
19e148363c Merge pull request #353 from paullouisageneau/fix-buffered-amount-callback
Fix buffered amount callback synchronization
2021-03-04 12:05:11 +01:00
7f6f178177 Fixed buffered amount callback synchronization 2021-03-03 19:27:54 +01:00
2db14a29a9 Bumped version to 0.11.6 2021-03-01 12:33:38 +01:00
5cbbba2e12 Fixed Track::maxMessageSize() 2021-03-01 12:33:38 +01:00
93aef867d0 Fixed track outgoing no media support issue 2021-03-01 12:25:34 +01:00
e99ba3c5d8 Bumped version to 0.11.5 2021-02-28 18:11:12 +01:00
65dba2c299 Merge pull request #346 from paullouisageneau/fix-mtu
Fix path MTU
2021-02-27 11:39:22 +01:00
6ef8f1e1a7 Added optional MTU setting in configuration 2021-02-27 11:17:49 +01:00
56dbcaad97 Fixed path MTU 2021-02-26 14:12:12 +01:00
d748016446 Merge pull request #344 from paullouisageneau/fix-datachannel-data-race
Fix possible data race in DataChannel
2021-02-25 19:25:40 +01:00
e543d789a4 Refactored Track to follow DataChannel 2021-02-23 22:53:04 +01:00
90e59435c0 Added synchronization to DataChannel 2021-02-23 22:52:56 +01:00
785c3b8149 Renamed "Negociated" to "Negotiated" 2021-02-23 18:34:23 +01:00
c37c88543d Bumped version to 0.11.4 2021-02-22 21:04:31 +01:00
011bfbe46f Merge pull request #342 from paullouisageneau/fix-usrsctp-data-race
Update usrsctp and mitigate possible data race
2021-02-22 19:28:06 +01:00
de2ac6c0c2 Mitigation for data race 2021-02-22 19:05:45 +01:00
75619babd7 Updated usrsctp to v0.9.5.0 2021-02-22 19:00:35 +01:00
fe9a34905b Fixed missing data channels mutex lock 2021-02-22 09:49:03 +01:00
b88f1f5e72 Bumped version to 0.11.3 2021-02-21 20:49:30 +01:00
ab7d7fefe0 Prevent lock order inversion 2021-02-21 20:46:04 +01:00
e592fcf217 Fixed compilation warnings 2021-02-21 15:46:03 +01:00
21 changed files with 229 additions and 125 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.7) cmake_minimum_required(VERSION 3.7)
project(libdatachannel project(libdatachannel
VERSION 0.11.2 VERSION 0.11.7
LANGUAGES CXX) LANGUAGES CXX)
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library") set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")

2
deps/usrsctp vendored

View File

@ -70,6 +70,7 @@ struct RTC_CPP_EXPORT Configuration {
bool enableIceTcp = false; bool enableIceTcp = false;
uint16_t portRangeBegin = 1024; uint16_t portRangeBegin = 1024;
uint16_t portRangeEnd = 65535; uint16_t portRangeEnd = 65535;
std::optional<size_t> mtu;
}; };
} // namespace rtc } // namespace rtc

View File

@ -30,6 +30,7 @@
#include <functional> #include <functional>
#include <type_traits> #include <type_traits>
#include <variant> #include <variant>
#include <shared_mutex>
namespace rtc { namespace rtc {
@ -79,6 +80,8 @@ protected:
string mProtocol; string mProtocol;
std::shared_ptr<Reliability> mReliability; std::shared_ptr<Reliability> mReliability;
mutable std::shared_mutex mMutex;
std::atomic<bool> mIsOpen = false; std::atomic<bool> mIsOpen = false;
std::atomic<bool> mIsClosed = false; std::atomic<bool> mIsClosed = false;
@ -88,13 +91,13 @@ private:
friend class PeerConnection; friend class PeerConnection;
}; };
class RTC_CPP_EXPORT NegociatedDataChannel final : public DataChannel { class RTC_CPP_EXPORT NegotiatedDataChannel final : public DataChannel {
public: public:
NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream, string label, NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream, string label,
string protocol, Reliability reliability); string protocol, Reliability reliability);
NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, std::weak_ptr<SctpTransport> transport, NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, std::weak_ptr<SctpTransport> transport,
uint16_t stream); uint16_t stream);
~NegociatedDataChannel(); ~NegotiatedDataChannel();
private: private:
void open(std::shared_ptr<SctpTransport> transport) override; void open(std::shared_ptr<SctpTransport> transport) override;

View File

@ -77,6 +77,8 @@ const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // Max per-channel queue size
const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
const size_t DEFAULT_IPV4_MTU = 1200; // IPv4 safe MTU value recommended by RFC 8261
// overloaded helper // overloaded helper
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; }; template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
template <class... Ts> overloaded(Ts...) -> overloaded<Ts...>; template <class... Ts> overloaded(Ts...) -> overloaded<Ts...>;

View File

@ -298,13 +298,16 @@ public:
private: private:
uint8_t _length; uint8_t _length;
char _text; char _text[1];
public: public:
inline std::string text() const { return std::string(&_text, _length); } inline std::string text() const { return std::string(_text, _length); }
inline void setText(std::string text) { inline void setText(std::string text) {
_length = text.length(); if(text.size() > 0xFF)
memcpy(&_text, text.data(), _length); throw std::invalid_argument("text is too long");
_length = uint8_t(text.size());
memcpy(_text, text.data(), text.size());
} }
inline uint8_t length() { return _length; } inline uint8_t length() { return _length; }
@ -334,12 +337,12 @@ public:
return reinterpret_cast<RTCP_SDES_ITEM *>(base); return reinterpret_cast<RTCP_SDES_ITEM *>(base);
} }
long safelyCountChunkSize(unsigned int maxChunkSize) { long safelyCountChunkSize(size_t maxChunkSize) {
if (maxChunkSize < RTCP_SDES_CHUNK::size({})) { if (maxChunkSize < RTCP_SDES_CHUNK::size({})) {
// chunk is truncated // chunk is truncated
return -1; return -1;
} else { } else {
unsigned int size = sizeof(SSRC); size_t size = sizeof(SSRC);
unsigned int i = 0; unsigned int i = 0;
// We can always access first 4 bytes of first item (in case of no items there will be 4 // We can always access first 4 bytes of first item (in case of no items there will be 4
// null bytes) // null bytes)
@ -407,7 +410,7 @@ public:
auto chunk = getChunk(i); auto chunk = getChunk(i);
chunkSize += chunk->getSize(); chunkSize += chunk->getSize();
} }
uint16_t length = (sizeof(header) + chunkSize) / 4 - 1; uint16_t length = uint16_t((sizeof(header) + chunkSize) / 4 - 1);
header.prepareHeader(202, chunkCount, length); header.prepareHeader(202, chunkCount, length);
} }
@ -629,10 +632,10 @@ struct RTCP_NACK_PART {
std::vector<uint16_t> getSequenceNumbers() { std::vector<uint16_t> getSequenceNumbers() {
std::vector<uint16_t> result{}; std::vector<uint16_t> result{};
result.reserve(17); result.reserve(17);
auto pid = getPID(); uint16_t pid = getPID();
result.push_back(pid); result.push_back(pid);
auto bitmask = getBLP(); uint16_t bitmask = getBLP();
auto i = pid + 1; uint16_t i = pid + 1;
while (bitmask > 0) { while (bitmask > 0) {
if (bitmask & 0x1) { if (bitmask & 0x1) {
result.push_back(i); result.push_back(i);
@ -673,9 +676,9 @@ public:
(*fciCount)++; (*fciCount)++;
return true; return true;
} else { } else {
// TODO SPEEED! // TODO SPEED!
auto blp = parts[(*fciCount) - 1].getBLP(); uint16_t blp = parts[(*fciCount) - 1].getBLP();
auto newBit = 1u << (unsigned int)(missingPacket - (1 + *fciPID)); uint16_t newBit = uint16_t(1u << (missingPacket - (1 + *fciPID)));
parts[(*fciCount) - 1].setBLP(blp | newBit); parts[(*fciCount) - 1].setBLP(blp | newBit);
return false; return false;
} }

View File

@ -43,6 +43,7 @@ public:
string mid() const; string mid() const;
Description::Media description() const; Description::Media description() const;
Description::Direction direction() const;
void setDescription(Description::Media description); void setDescription(Description::Media description);
@ -75,13 +76,14 @@ private:
bool outgoing(message_ptr message); bool outgoing(message_ptr message);
Description::Media mMediaDescription; Description::Media mMediaDescription;
std::shared_ptr<MediaHandler> mRtcpHandler;
mutable std::shared_mutex mMutex;
std::atomic<bool> mIsClosed = false; std::atomic<bool> mIsClosed = false;
Queue<message_ptr> mRecvQueue; Queue<message_ptr> mRecvQueue;
std::shared_mutex mRtcpHandlerMutex;
std::shared_ptr<MediaHandler> mRtcpHandler;
friend class PeerConnection; friend class PeerConnection;
}; };

View File

@ -87,21 +87,34 @@ DataChannel::~DataChannel() { close(); }
uint16_t DataChannel::stream() const { return mStream; } uint16_t DataChannel::stream() const { return mStream; }
uint16_t DataChannel::id() const { return uint16_t(mStream); } uint16_t DataChannel::id() const { return mStream; }
string DataChannel::label() const { return mLabel; } string DataChannel::label() const {
std::shared_lock lock(mMutex);
return mLabel;
}
string DataChannel::protocol() const { return mProtocol; } string DataChannel::protocol() const {
std::shared_lock lock(mMutex);
return mProtocol;
}
Reliability DataChannel::reliability() const { return *mReliability; } Reliability DataChannel::reliability() const {
std::shared_lock lock(mMutex);
return *mReliability;
}
void DataChannel::close() { void DataChannel::close() {
mIsClosed = true; std::shared_ptr<SctpTransport> transport;
if (mIsOpen.exchange(false)) {
if (auto transport = mSctpTransport.lock()) std::shared_lock lock(mMutex);
transport->closeStream(mStream); transport = mSctpTransport.lock();
}
mIsClosed = true;
if (mIsOpen.exchange(false) && transport)
transport->closeStream(mStream);
mSctpTransport.reset();
resetCallbacks(); resetCallbacks();
} }
@ -110,7 +123,6 @@ void DataChannel::remoteClose() {
triggerClosed(); triggerClosed();
mIsOpen = false; mIsOpen = false;
mSctpTransport.reset();
} }
bool DataChannel::send(message_variant data) { return outgoing(make_message(std::move(data))); } bool DataChannel::send(message_variant data) { return outgoing(make_message(std::move(data))); }
@ -167,7 +179,10 @@ size_t DataChannel::maxMessageSize() const {
size_t DataChannel::availableAmount() const { return mRecvQueue.amount(); } size_t DataChannel::availableAmount() const { return mRecvQueue.amount(); }
void DataChannel::open(shared_ptr<SctpTransport> transport) { void DataChannel::open(shared_ptr<SctpTransport> transport) {
mSctpTransport = transport; {
std::unique_lock lock(mMutex);
mSctpTransport = transport;
}
if (!mIsOpen.exchange(true)) if (!mIsOpen.exchange(true))
triggerOpen(); triggerOpen();
@ -179,19 +194,22 @@ void DataChannel::processOpenMessage(message_ptr) {
} }
bool DataChannel::outgoing(message_ptr message) { bool DataChannel::outgoing(message_ptr message) {
if (mIsClosed) std::shared_ptr<SctpTransport> transport;
throw std::runtime_error("DataChannel is closed"); {
std::shared_lock lock(mMutex);
transport = mSctpTransport.lock();
if (message->size() > maxMessageSize()) if (!transport || mIsClosed)
throw std::runtime_error("Message size exceeds limit"); throw std::runtime_error("DataChannel is closed");
auto transport = mSctpTransport.lock(); if (message->size() > maxMessageSize())
if (!transport) throw std::runtime_error("Message size exceeds limit");
throw std::runtime_error("DataChannel transport is not open");
// Before the ACK has been received on a DataChannel, all messages must be sent ordered
message->reliability = mIsOpen ? mReliability : nullptr;
message->stream = mStream;
}
// Before the ACK has been received on a DataChannel, all messages must be sent ordered
message->reliability = mIsOpen ? mReliability : nullptr;
message->stream = mStream;
return transport->send(message); return transport->send(message);
} }
@ -235,20 +253,21 @@ void DataChannel::incoming(message_ptr message) {
} }
} }
NegociatedDataChannel::NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream, NegotiatedDataChannel::NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream,
string label, string protocol, Reliability reliability) string label, string protocol, Reliability reliability)
: DataChannel(pc, stream, std::move(label), std::move(protocol), std::move(reliability)) {} : DataChannel(pc, stream, std::move(label), std::move(protocol), std::move(reliability)) {}
NegociatedDataChannel::NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, NegotiatedDataChannel::NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc,
std::weak_ptr<SctpTransport> transport, std::weak_ptr<SctpTransport> transport,
uint16_t stream) uint16_t stream)
: DataChannel(pc, stream, "", "", {}) { : DataChannel(pc, stream, "", "", {}) {
mSctpTransport = transport; mSctpTransport = transport;
} }
NegociatedDataChannel::~NegociatedDataChannel() {} NegotiatedDataChannel::~NegotiatedDataChannel() {}
void NegociatedDataChannel::open(shared_ptr<SctpTransport> transport) { void NegotiatedDataChannel::open(shared_ptr<SctpTransport> transport) {
std::unique_lock lock(mMutex);
mSctpTransport = transport; mSctpTransport = transport;
uint8_t channelType; uint8_t channelType;
@ -287,10 +306,13 @@ void NegociatedDataChannel::open(shared_ptr<SctpTransport> transport) {
std::copy(mLabel.begin(), mLabel.end(), end); std::copy(mLabel.begin(), mLabel.end(), end);
std::copy(mProtocol.begin(), mProtocol.end(), end + mLabel.size()); std::copy(mProtocol.begin(), mProtocol.end(), end + mLabel.size());
lock.unlock();
transport->send(make_message(buffer.begin(), buffer.end(), Message::Control, mStream)); transport->send(make_message(buffer.begin(), buffer.end(), Message::Control, mStream));
} }
void NegociatedDataChannel::processOpenMessage(message_ptr message) { void NegotiatedDataChannel::processOpenMessage(message_ptr message) {
std::unique_lock lock(mMutex);
auto transport = mSctpTransport.lock(); auto transport = mSctpTransport.lock();
if (!transport) if (!transport)
throw std::runtime_error("DataChannel has no transport"); throw std::runtime_error("DataChannel has no transport");
@ -326,6 +348,8 @@ void NegociatedDataChannel::processOpenMessage(message_ptr message) {
mReliability->rexmit = int(0); mReliability->rexmit = int(0);
} }
lock.unlock();
binary buffer(sizeof(AckMessage), byte(0)); binary buffer(sizeof(AckMessage), byte(0));
auto &ack = *reinterpret_cast<AckMessage *>(buffer.data()); auto &ack = *reinterpret_cast<AckMessage *>(buffer.data());
ack.type = MESSAGE_ACK; ack.type = MESSAGE_ACK;

View File

@ -58,10 +58,11 @@ void DtlsSrtpTransport::Cleanup() { srtp_shutdown(); }
DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower, DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower,
shared_ptr<Certificate> certificate, shared_ptr<Certificate> certificate,
std::optional<size_t> mtu,
verifier_callback verifierCallback, verifier_callback verifierCallback,
message_callback srtpRecvCallback, message_callback srtpRecvCallback,
state_callback stateChangeCallback) state_callback stateChangeCallback)
: DtlsTransport(lower, certificate, std::move(verifierCallback), : DtlsTransport(lower, certificate, mtu, std::move(verifierCallback),
std::move(stateChangeCallback)), std::move(stateChangeCallback)),
mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback

View File

@ -39,9 +39,9 @@ public:
static void Init(); static void Init();
static void Cleanup(); static void Cleanup();
DtlsSrtpTransport(std::shared_ptr<IceTransport> lower, std::shared_ptr<Certificate> certificate, DtlsSrtpTransport(std::shared_ptr<IceTransport> lower, certificate_ptr certificate,
verifier_callback verifierCallback, message_callback srtpRecvCallback, std::optional<size_t> mtu, verifier_callback verifierCallback,
state_callback stateChangeCallback); message_callback srtpRecvCallback, state_callback stateChangeCallback);
~DtlsSrtpTransport(); ~DtlsSrtpTransport();
bool sendMedia(message_ptr message); bool sendMedia(message_ptr message);

View File

@ -50,8 +50,9 @@ void DtlsTransport::Init() {
void DtlsTransport::Cleanup() { gnutls_global_deinit(); } void DtlsTransport::Cleanup() { gnutls_global_deinit(); }
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr certificate, DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr certificate,
verifier_callback verifierCallback, state_callback stateChangeCallback) std::optional<size_t> mtu, verifier_callback verifierCallback,
: Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate), state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mMtu(mtu), mCertificate(certificate),
mVerifierCallback(std::move(verifierCallback)), mVerifierCallback(std::move(verifierCallback)),
mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) { mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
@ -156,11 +157,15 @@ void DtlsTransport::postHandshake() {
} }
void DtlsTransport::runRecvLoop() { void DtlsTransport::runRecvLoop() {
const size_t maxMtu = 4096; const size_t bufferSize = 4096;
// Handshake loop // Handshake loop
try { try {
changeState(State::Connecting); changeState(State::Connecting);
gnutls_dtls_set_mtu(mSession, 1280 - 40 - 8); // min MTU over UDP/IPv6
size_t mtu = mMtu.value_or(DEFAULT_IPV4_MTU + 20) - 8 - 40; // UDP/IPv6
gnutls_dtls_set_mtu(mSession, static_cast<unsigned int>(mtu));
PLOG_VERBOSE << "SSL MTU set to " << mtu;
int ret; int ret;
do { do {
@ -174,7 +179,7 @@ void DtlsTransport::runRecvLoop() {
// RFC 8261: DTLS MUST support sending messages larger than the current path MTU // RFC 8261: DTLS MUST support sending messages larger than the current path MTU
// See https://tools.ietf.org/html/rfc8261#section-5 // See https://tools.ietf.org/html/rfc8261#section-5
gnutls_dtls_set_mtu(mSession, maxMtu + 1); gnutls_dtls_set_mtu(mSession, bufferSize + 1);
} catch (const std::exception &e) { } catch (const std::exception &e) {
PLOG_ERROR << "DTLS handshake: " << e.what(); PLOG_ERROR << "DTLS handshake: " << e.what();
@ -188,7 +193,6 @@ void DtlsTransport::runRecvLoop() {
postHandshake(); postHandshake();
changeState(State::Connected); changeState(State::Connected);
const size_t bufferSize = maxMtu;
char buffer[bufferSize]; char buffer[bufferSize];
while (true) { while (true) {
@ -314,8 +318,9 @@ void DtlsTransport::Cleanup() {
} }
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certificate> certificate, DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certificate> certificate,
verifier_callback verifierCallback, state_callback stateChangeCallback) std::optional<size_t> mtu, verifier_callback verifierCallback,
: Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate), state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mMtu(mtu), mCertificate(certificate),
mVerifierCallback(std::move(verifierCallback)), mVerifierCallback(std::move(verifierCallback)),
mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) { mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)"; PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)";
@ -440,16 +445,18 @@ void DtlsTransport::postHandshake() {
} }
void DtlsTransport::runRecvLoop() { void DtlsTransport::runRecvLoop() {
const size_t maxMtu = 4096; const size_t bufferSize = 4096;
try { try {
changeState(State::Connecting); changeState(State::Connecting);
SSL_set_mtu(mSsl, 1280 - 40 - 8); // min MTU over UDP/IPv6
size_t mtu = mMtu.value_or(DEFAULT_IPV4_MTU + 20) - 8 - 40; // UDP/IPv6
SSL_set_mtu(mSsl, static_cast<unsigned int>(mtu));
PLOG_VERBOSE << "SSL MTU set to " << mtu;
// Initiate the handshake // Initiate the handshake
int ret = SSL_do_handshake(mSsl); int ret = SSL_do_handshake(mSsl);
openssl::check(mSsl, ret, "Handshake failed"); openssl::check(mSsl, ret, "Handshake failed");
const size_t bufferSize = maxMtu;
byte buffer[bufferSize]; byte buffer[bufferSize];
while (mIncomingQueue.running()) { while (mIncomingQueue.running()) {
// Process pending messages // Process pending messages
@ -466,7 +473,7 @@ void DtlsTransport::runRecvLoop() {
if (SSL_is_init_finished(mSsl)) { if (SSL_is_init_finished(mSsl)) {
// RFC 8261: DTLS MUST support sending messages larger than the current path // RFC 8261: DTLS MUST support sending messages larger than the current path
// MTU See https://tools.ietf.org/html/rfc8261#section-5 // MTU See https://tools.ietf.org/html/rfc8261#section-5
SSL_set_mtu(mSsl, maxMtu + 1); SSL_set_mtu(mSsl, bufferSize + 1);
PLOG_INFO << "DTLS handshake finished"; PLOG_INFO << "DTLS handshake finished";
postHandshake(); postHandshake();

View File

@ -44,7 +44,8 @@ public:
using verifier_callback = std::function<bool(const std::string &fingerprint)>; using verifier_callback = std::function<bool(const std::string &fingerprint)>;
DtlsTransport(std::shared_ptr<IceTransport> lower, certificate_ptr certificate, DtlsTransport(std::shared_ptr<IceTransport> lower, certificate_ptr certificate,
verifier_callback verifierCallback, state_callback stateChangeCallback); std::optional<size_t> mtu, verifier_callback verifierCallback,
state_callback stateChangeCallback);
~DtlsTransport(); ~DtlsTransport();
virtual void start() override; virtual void start() override;
@ -57,6 +58,7 @@ protected:
virtual void postHandshake(); virtual void postHandshake();
void runRecvLoop(); void runRecvLoop();
const std::optional<size_t> mMtu;
const certificate_ptr mCertificate; const certificate_ptr mCertificate;
const verifier_callback mVerifierCallback; const verifier_callback mVerifierCallback;
const bool mIsClient; const bool mIsClient;

View File

@ -22,8 +22,8 @@
#include "include.hpp" #include "include.hpp"
#include "logcounter.hpp" #include "logcounter.hpp"
#include "processor.hpp" #include "processor.hpp"
#include "threadpool.hpp"
#include "rtp.hpp" #include "rtp.hpp"
#include "threadpool.hpp"
#include "dtlstransport.hpp" #include "dtlstransport.hpp"
#include "icetransport.hpp" #include "icetransport.hpp"
@ -75,6 +75,17 @@ PeerConnection::PeerConnection(const Configuration &config)
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd) if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
throw std::invalid_argument("Invalid port range"); throw std::invalid_argument("Invalid port range");
if (config.mtu) {
if (*config.mtu < 576) // Min MTU for IPv4
throw std::invalid_argument("Invalid MTU value");
if (*config.mtu > 1500) { // Standard Ethernet
PLOG_WARNING << "MTU set to " << *config.mtu;
} else {
PLOG_VERBOSE << "MTU set to " << *config.mtu;
}
}
} }
PeerConnection::~PeerConnection() { PeerConnection::~PeerConnection() {
@ -515,7 +526,7 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
// DTLS-SRTP // DTLS-SRTP
transport = std::make_shared<DtlsSrtpTransport>( transport = std::make_shared<DtlsSrtpTransport>(
lower, certificate, verifierCallback, lower, certificate, mConfig.mtu, verifierCallback,
weak_bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback); weak_bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback);
#else #else
PLOG_WARNING << "Ignoring media support (not compiled with media support)"; PLOG_WARNING << "Ignoring media support (not compiled with media support)";
@ -524,8 +535,8 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
if (!transport) { if (!transport) {
// DTLS only // DTLS only
transport = std::make_shared<DtlsTransport>(lower, certificate, verifierCallback, transport = std::make_shared<DtlsTransport>(lower, certificate, mConfig.mtu,
stateChangeCallback); verifierCallback, stateChangeCallback);
} }
std::atomic_store(&mDtlsTransport, transport); std::atomic_store(&mDtlsTransport, transport);
@ -557,7 +568,7 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT); uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
auto lower = std::atomic_load(&mDtlsTransport); auto lower = std::atomic_load(&mDtlsTransport);
auto transport = std::make_shared<SctpTransport>( auto transport = std::make_shared<SctpTransport>(
lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1), lower, sctpPort, mConfig.mtu, weak_bind(&PeerConnection::forwardMessage, this, _1),
weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2), weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
[this, weak_this = weak_from_this()](SctpTransport::State state) { [this, weak_this = weak_from_this()](SctpTransport::State state) {
auto shared_this = weak_this.lock(); auto shared_this = weak_this.lock();
@ -663,10 +674,12 @@ void PeerConnection::forwardMessage(message_ptr message) {
if (message->type == Message::Control && *message->data() == dataChannelOpenMessage && if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
stream % 2 == remoteParity) { stream % 2 == remoteParity) {
channel = std::make_shared<NegociatedDataChannel>(shared_from_this(), sctpTransport, channel =
stream); std::make_shared<NegotiatedDataChannel>(shared_from_this(), sctpTransport, stream);
channel->onOpen(weak_bind(&PeerConnection::triggerDataChannel, this, channel->onOpen(weak_bind(&PeerConnection::triggerDataChannel, this,
weak_ptr<DataChannel>{channel})); weak_ptr<DataChannel>{channel}));
std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
mDataChannels.emplace(stream, channel); mDataChannels.emplace(stream, channel);
} else { } else {
// Invalid, close the DataChannel // Invalid, close the DataChannel
@ -833,7 +846,7 @@ shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role rol
init.negotiated init.negotiated
? std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label), ? std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
std::move(init.protocol), std::move(init.reliability)) std::move(init.protocol), std::move(init.reliability))
: std::make_shared<NegociatedDataChannel>(shared_from_this(), stream, std::move(label), : std::make_shared<NegotiatedDataChannel>(shared_from_this(), stream, std::move(label),
std::move(init.protocol), std::move(init.protocol),
std::move(init.reliability)); std::move(init.reliability));
mDataChannels.emplace(std::make_pair(stream, channel)); mDataChannels.emplace(std::make_pair(stream, channel));

View File

@ -17,6 +17,7 @@
*/ */
#include "sctptransport.hpp" #include "sctptransport.hpp"
#include "dtlstransport.hpp"
#include "logcounter.hpp" #include "logcounter.hpp"
#include <chrono> #include <chrono>
@ -27,8 +28,7 @@
// The IETF draft says: // The IETF draft says:
// SCTP MUST support performing Path MTU discovery without relying on ICMP or ICMPv6 as specified in // SCTP MUST support performing Path MTU discovery without relying on ICMP or ICMPv6 as specified in
// [RFC4821] using probing messages specified in [RFC4820]. The initial Path MTU at the IP layer // [RFC4821] using probing messages specified in [RFC4820].
// SHOULD NOT exceed 1200 bytes for IPv4 and 1280 for IPv6.
// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-5 // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-5
// //
// However, usrsctp does not implement Path MTU discovery, so we need to disable it for now. // However, usrsctp does not implement Path MTU discovery, so we need to disable it for now.
@ -54,8 +54,6 @@
using namespace std::chrono_literals; using namespace std::chrono_literals;
using namespace std::chrono; using namespace std::chrono;
using std::shared_ptr;
namespace rtc { namespace rtc {
static LogCounter COUNTER_UNKNOWN_PPID(plog::warning, static LogCounter COUNTER_UNKNOWN_PPID(plog::warning,
@ -102,7 +100,8 @@ void SctpTransport::Cleanup() {
} }
SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
message_callback recvCallback, amount_callback bufferedAmountCallback, std::optional<size_t> mtu, message_callback recvCallback,
amount_callback bufferedAmountCallback,
state_callback stateChangeCallback) state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0), : Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) { mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
@ -180,13 +179,24 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
// 1200 bytes. // 1200 bytes.
// See https://tools.ietf.org/html/rfc8261#section-5 // See https://tools.ietf.org/html/rfc8261#section-5
#if USE_PMTUD #if USE_PMTUD
// Enable SCTP path MTU discovery if (!mtu.has_value()) {
spp.spp_flags |= SPP_PMTUD_ENABLE;
#else #else
// Fall back to a safe MTU value. if (false) {
spp.spp_flags |= SPP_PMTUD_DISABLE;
spp.spp_pathmtu = 1200;
#endif #endif
// Enable SCTP path MTU discovery
spp.spp_flags |= SPP_PMTUD_ENABLE;
PLOG_VERBOSE << "Path MTU discovery enabled";
} else {
// Fall back to a safe MTU value.
spp.spp_flags |= SPP_PMTUD_DISABLE;
// The MTU value provided specifies the space available for chunks in the
// packet, so we also subtract the SCTP header size.
size_t pmtu = mtu.value_or(DEFAULT_IPV4_MTU + 20) - 12 - 37 - 8 - 40; // SCTP/DTLS/UDP/IPv6
spp.spp_pathmtu = uint32_t(pmtu);
PLOG_VERBOSE << "Path MTU discovery disabled, SCTP MTU set to " << pmtu;
}
if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp))) if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" + throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
std::to_string(errno)); std::to_string(errno));
@ -349,6 +359,10 @@ void SctpTransport::incoming(message_ptr message) {
} }
PLOG_VERBOSE << "Incoming size=" << message->size(); PLOG_VERBOSE << "Incoming size=" << message->size();
// TODO: There seems to be a possible data race between usrsctp_sendv() and usrsctp_conninput()
// As a mitigation, lock the send mutex before calling usrsctp_conninput()
std::lock_guard lock(mSendMutex);
usrsctp_conninput(this, message->data(), message->size(), 0); usrsctp_conninput(this, message->data(), message->size(), 0);
} }
@ -519,13 +533,16 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
else else
it->second = amount; it->second = amount;
mSendMutex.unlock(); // Synchronously call the buffered amount callback
triggerBufferedAmount(streamId, amount);
}
void SctpTransport::triggerBufferedAmount(uint16_t streamId, size_t amount) {
try { try {
mBufferedAmountCallback(streamId, amount); mBufferedAmountCallback(streamId, amount);
} catch (const std::exception &e) { } catch (const std::exception &e) {
PLOG_DEBUG << "SCTP buffered amount callback: " << e.what(); PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
} }
mSendMutex.lock();
} }
void SctpTransport::sendReset(uint16_t streamId) { void SctpTransport::sendReset(uint16_t streamId) {

View File

@ -43,8 +43,9 @@ public:
using amount_callback = std::function<void(uint16_t streamId, size_t amount)>; using amount_callback = std::function<void(uint16_t streamId, size_t amount)>;
SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, message_callback recvCallback, SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, std::optional<size_t> mtu,
amount_callback bufferedAmountCallback, state_callback stateChangeCallback); message_callback recvCallback, amount_callback bufferedAmountCallback,
state_callback stateChangeCallback);
~SctpTransport(); ~SctpTransport();
void start() override; void start() override;
@ -82,6 +83,7 @@ private:
bool trySendQueue(); bool trySendQueue();
bool trySendMessage(message_ptr message); bool trySendMessage(message_ptr message);
void updateBufferedAmount(uint16_t streamId, long delta); void updateBufferedAmount(uint16_t streamId, long delta);
void triggerBufferedAmount(uint16_t streamId, size_t amount);
void sendReset(uint16_t streamId); void sendReset(uint16_t streamId);
bool safeFlush(); bool safeFlush();
@ -96,7 +98,8 @@ private:
Processor mProcessor; Processor mProcessor;
std::atomic<int> mPendingRecvCount; std::atomic<int> mPendingRecvCount;
std::mutex mRecvMutex, mSendMutex; std::mutex mRecvMutex;
std::recursive_mutex mSendMutex; // buffered amount callback is synchronous
Queue<message_ptr> mSendQueue; Queue<message_ptr> mSendQueue;
std::map<uint16_t, size_t> mBufferedAmount; std::map<uint16_t, size_t> mBufferedAmount;
amount_callback mBufferedAmountCallback; amount_callback mBufferedAmountCallback;

View File

@ -43,8 +43,7 @@ int ThreadPool::count() const {
} }
void ThreadPool::spawn(int count) { void ThreadPool::spawn(int count) {
std::scoped_lock lock(mMutex, mWorkersMutex); std::unique_lock lock(mWorkersMutex);
mJoining = false;
while (count-- > 0) while (count-- > 0)
mWorkers.emplace_back(std::bind(&ThreadPool::run, this)); mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
} }
@ -62,6 +61,8 @@ void ThreadPool::join() {
w.join(); w.join();
mWorkers.clear(); mWorkers.clear();
mJoining = false;
} }
void ThreadPool::run() { void ThreadPool::run() {

View File

@ -73,7 +73,7 @@ protected:
std::vector<std::thread> mWorkers; std::vector<std::thread> mWorkers;
int mWaitingWorkers = 0; int mWaitingWorkers = 0;
bool mJoining = false; std::atomic<bool> mJoining = false;
struct Task { struct Task {
clock::time_point time; clock::time_point time;

View File

@ -35,11 +35,23 @@ using std::weak_ptr;
Track::Track(Description::Media description) Track::Track(Description::Media description)
: mMediaDescription(std::move(description)), mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {} : mMediaDescription(std::move(description)), mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
string Track::mid() const { return mMediaDescription.mid(); } string Track::mid() const {
std::shared_lock lock(mMutex);
return mMediaDescription.mid();
}
Description::Media Track::description() const { return mMediaDescription; } Description::Media Track::description() const {
std::shared_lock lock(mMutex);
return mMediaDescription;
}
Description::Direction Track::direction() const {
std::shared_lock lock(mMutex);
return mMediaDescription.direction();
}
void Track::setDescription(Description::Media description) { void Track::setDescription(Description::Media description) {
std::unique_lock lock(mMutex);
if (description.mid() != mMediaDescription.mid()) if (description.mid() != mMediaDescription.mid())
throw std::logic_error("Media description mid does not match track mid"); throw std::logic_error("Media description mid does not match track mid");
@ -48,17 +60,17 @@ void Track::setDescription(Description::Media description) {
void Track::close() { void Track::close() {
mIsClosed = true; mIsClosed = true;
resetCallbacks();
setRtcpHandler(nullptr); setRtcpHandler(nullptr);
resetCallbacks();
} }
bool Track::send(message_variant data) { bool Track::send(message_variant data) {
if (mIsClosed) if (mIsClosed)
throw std::runtime_error("Track is closed"); throw std::runtime_error("Track is closed");
auto direction = mMediaDescription.direction(); auto dir = direction();
if ((direction == Description::Direction::RecvOnly || if ((dir == Description::Direction::RecvOnly || dir == Description::Direction::Inactive)) {
direction == Description::Direction::Inactive)) {
COUNTER_MEDIA_BAD_DIRECTION++; COUNTER_MEDIA_BAD_DIRECTION++;
return false; return false;
} }
@ -92,6 +104,7 @@ std::optional<message_variant> Track::peek() {
bool Track::isOpen(void) const { bool Track::isOpen(void) const {
#if RTC_ENABLE_MEDIA #if RTC_ENABLE_MEDIA
std::shared_lock lock(mMutex);
return !mIsClosed && mDtlsSrtpTransport.lock(); return !mIsClosed && mDtlsSrtpTransport.lock();
#else #else
return !mIsClosed; return !mIsClosed;
@ -101,14 +114,18 @@ bool Track::isOpen(void) const {
bool Track::isClosed(void) const { return mIsClosed; } bool Track::isClosed(void) const { return mIsClosed; }
size_t Track::maxMessageSize() const { size_t Track::maxMessageSize() const {
return 65535 - 12 - 4; // SRTP/UDP return DEFAULT_IPV4_MTU - 12 - 8 - 20; // SRTP/UDP/IPv4
} }
size_t Track::availableAmount() const { return mRecvQueue.amount(); } size_t Track::availableAmount() const { return mRecvQueue.amount(); }
#if RTC_ENABLE_MEDIA #if RTC_ENABLE_MEDIA
void Track::open(shared_ptr<DtlsSrtpTransport> transport) { void Track::open(shared_ptr<DtlsSrtpTransport> transport) {
mDtlsSrtpTransport = transport; {
std::lock_guard lock(mMutex);
mDtlsSrtpTransport = transport;
}
triggerOpen(); triggerOpen();
} }
#endif #endif
@ -117,9 +134,8 @@ void Track::incoming(message_ptr message) {
if (!message) if (!message)
return; return;
auto direction = mMediaDescription.direction(); auto dir = direction();
if ((direction == Description::Direction::SendOnly || if ((dir == Description::Direction::SendOnly || dir == Description::Direction::Inactive) &&
direction == Description::Direction::Inactive) &&
message->type != Message::Control) { message->type != Message::Control) {
COUNTER_MEDIA_BAD_DIRECTION++; COUNTER_MEDIA_BAD_DIRECTION++;
return; return;
@ -143,16 +159,20 @@ void Track::incoming(message_ptr message) {
bool Track::outgoing([[maybe_unused]] message_ptr message) { bool Track::outgoing([[maybe_unused]] message_ptr message) {
#if RTC_ENABLE_MEDIA #if RTC_ENABLE_MEDIA
auto transport = mDtlsSrtpTransport.lock(); std::shared_ptr<DtlsSrtpTransport> transport;
if (!transport) {
throw std::runtime_error("Track transport is not open"); std::shared_lock lock(mMutex);
transport = mDtlsSrtpTransport.lock();
if (!transport)
throw std::runtime_error("Track is closed");
// Set recommended medium-priority DSCP value // Set recommended medium-priority DSCP value
// See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18 // See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18
if (mMediaDescription.type() == "audio") if (mMediaDescription.type() == "audio")
message->dscp = 46; // EF: Expedited Forwarding message->dscp = 46; // EF: Expedited Forwarding
else else
message->dscp = 36; // AF42: Assured Forwarding class 4, medium drop probability message->dscp = 36; // AF42: Assured Forwarding class 4, medium drop probability
}
return transport->sendMedia(message); return transport->sendMedia(message);
#else #else
@ -162,24 +182,23 @@ bool Track::outgoing([[maybe_unused]] message_ptr message) {
} }
void Track::setRtcpHandler(std::shared_ptr<MediaHandler> handler) { void Track::setRtcpHandler(std::shared_ptr<MediaHandler> handler) {
std::unique_lock lock(mRtcpHandlerMutex); {
mRtcpHandler = std::move(handler); std::unique_lock lock(mMutex);
if (mRtcpHandler) { mRtcpHandler = handler;
auto copy = mRtcpHandler;
lock.unlock();
copy->onOutgoing(std::bind(&Track::outgoing, this, std::placeholders::_1));
} }
handler->onOutgoing(std::bind(&Track::outgoing, this, std::placeholders::_1));
} }
bool Track::requestKeyframe() { bool Track::requestKeyframe() {
if (auto handler = getRtcpHandler()) { if (auto handler = getRtcpHandler())
return handler->requestKeyframe(); return handler->requestKeyframe();
}
return false; return false;
} }
std::shared_ptr<MediaHandler> Track::getRtcpHandler() { std::shared_ptr<MediaHandler> Track::getRtcpHandler() {
std::shared_lock lock(mRtcpHandlerMutex); std::shared_lock lock(mMutex);
return mRtcpHandler; return mRtcpHandler;
} }

View File

@ -40,11 +40,13 @@ size_t benchmark(milliseconds duration) {
Configuration config1; Configuration config1;
// config1.iceServers.emplace_back("stun:stun.l.google.com:19302"); // config1.iceServers.emplace_back("stun:stun.l.google.com:19302");
// config1.mtu = 1500;
auto pc1 = std::make_shared<PeerConnection>(config1); auto pc1 = std::make_shared<PeerConnection>(config1);
Configuration config2; Configuration config2;
// config2.iceServers.emplace_back("stun:stun.l.google.com:19302"); // config2.iceServers.emplace_back("stun:stun.l.google.com:19302");
// config2.mtu = 1500;
auto pc2 = std::make_shared<PeerConnection>(config2); auto pc2 = std::make_shared<PeerConnection>(config2);

View File

@ -36,6 +36,8 @@ void test_connectivity() {
// STUN server example (not necessary to connect locally) // STUN server example (not necessary to connect locally)
// Please do not use outside of libdatachannel tests // Please do not use outside of libdatachannel tests
config1.iceServers.emplace_back("stun:stun.ageneau.net:3478"); config1.iceServers.emplace_back("stun:stun.ageneau.net:3478");
// Custom MTU example
config1.mtu = 1500;
auto pc1 = std::make_shared<PeerConnection>(config1); auto pc1 = std::make_shared<PeerConnection>(config1);
@ -43,6 +45,8 @@ void test_connectivity() {
// STUN server example (not necessary to connect locally) // STUN server example (not necessary to connect locally)
// Please do not use outside of libdatachannel tests // Please do not use outside of libdatachannel tests
config2.iceServers.emplace_back("stun:stun.ageneau.net:3478"); config2.iceServers.emplace_back("stun:stun.ageneau.net:3478");
// Custom MTU example
config2.mtu = 1500;
// Port range example // Port range example
config2.portRangeBegin = 5000; config2.portRangeBegin = 5000;
config2.portRangeEnd = 6000; config2.portRangeEnd = 6000;
@ -221,7 +225,7 @@ void test_connectivity() {
auto negotiated2 = pc2->createDataChannel("negoctated", init); auto negotiated2 = pc2->createDataChannel("negoctated", init);
if (!negotiated1->isOpen() || !negotiated2->isOpen()) if (!negotiated1->isOpen() || !negotiated2->isOpen())
throw runtime_error("Negociated DataChannel is not open"); throw runtime_error("Negotiated DataChannel is not open");
std::atomic<bool> received = false; std::atomic<bool> received = false;
negotiated2->onMessage([&received](const variant<binary, string> &message) { negotiated2->onMessage([&received](const variant<binary, string> &message) {
@ -239,7 +243,7 @@ void test_connectivity() {
this_thread::sleep_for(1s); this_thread::sleep_for(1s);
if (!received) if (!received)
throw runtime_error("Negociated DataChannel failed"); throw runtime_error("Negotiated DataChannel failed");
// Delay close of peer 2 to check closing works properly // Delay close of peer 2 to check closing works properly
pc1->close(); pc1->close();

View File

@ -232,7 +232,7 @@ void test_turn_connectivity() {
auto negotiated2 = pc2->createDataChannel("negoctated", init); auto negotiated2 = pc2->createDataChannel("negoctated", init);
if (!negotiated1->isOpen() || !negotiated2->isOpen()) if (!negotiated1->isOpen() || !negotiated2->isOpen())
throw runtime_error("Negociated DataChannel is not open"); throw runtime_error("Negotiated DataChannel is not open");
std::atomic<bool> received = false; std::atomic<bool> received = false;
negotiated2->onMessage([&received](const variant<binary, string> &message) { negotiated2->onMessage([&received](const variant<binary, string> &message) {
@ -250,7 +250,7 @@ void test_turn_connectivity() {
this_thread::sleep_for(1s); this_thread::sleep_for(1s);
if (!received) if (!received)
throw runtime_error("Negociated DataChannel failed"); throw runtime_error("Negotiated DataChannel failed");
// Delay close of peer 2 to check closing works properly // Delay close of peer 2 to check closing works properly
pc1->close(); pc1->close();