mirror of
https://github.com/mii443/libdatachannel.git
synced 2025-08-23 15:48:03 +00:00
Compare commits
30 Commits
Author | SHA1 | Date | |
---|---|---|---|
125edff298 | |||
0813976a5a | |||
4642504b83 | |||
5b760532c2 | |||
69bcdade50 | |||
bd3df48c0b | |||
faf3158609 | |||
b766be1880 | |||
b3edcfa05c | |||
19e148363c | |||
7f6f178177 | |||
2db14a29a9 | |||
5cbbba2e12 | |||
93aef867d0 | |||
e99ba3c5d8 | |||
65dba2c299 | |||
6ef8f1e1a7 | |||
56dbcaad97 | |||
d748016446 | |||
e543d789a4 | |||
90e59435c0 | |||
785c3b8149 | |||
c37c88543d | |||
011bfbe46f | |||
de2ac6c0c2 | |||
75619babd7 | |||
fe9a34905b | |||
b88f1f5e72 | |||
ab7d7fefe0 | |||
e592fcf217 |
@ -1,6 +1,6 @@
|
|||||||
cmake_minimum_required(VERSION 3.7)
|
cmake_minimum_required(VERSION 3.7)
|
||||||
project(libdatachannel
|
project(libdatachannel
|
||||||
VERSION 0.11.2
|
VERSION 0.11.8
|
||||||
LANGUAGES CXX)
|
LANGUAGES CXX)
|
||||||
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")
|
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")
|
||||||
|
|
||||||
|
2
deps/usrsctp
vendored
2
deps/usrsctp
vendored
Submodule deps/usrsctp updated: 2e754d5822...07f871bda2
@ -70,6 +70,7 @@ struct RTC_CPP_EXPORT Configuration {
|
|||||||
bool enableIceTcp = false;
|
bool enableIceTcp = false;
|
||||||
uint16_t portRangeBegin = 1024;
|
uint16_t portRangeBegin = 1024;
|
||||||
uint16_t portRangeEnd = 65535;
|
uint16_t portRangeEnd = 65535;
|
||||||
|
std::optional<size_t> mtu;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rtc
|
} // namespace rtc
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
#include <functional>
|
#include <functional>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
#include <variant>
|
#include <variant>
|
||||||
|
#include <shared_mutex>
|
||||||
|
|
||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
@ -79,6 +80,8 @@ protected:
|
|||||||
string mProtocol;
|
string mProtocol;
|
||||||
std::shared_ptr<Reliability> mReliability;
|
std::shared_ptr<Reliability> mReliability;
|
||||||
|
|
||||||
|
mutable std::shared_mutex mMutex;
|
||||||
|
|
||||||
std::atomic<bool> mIsOpen = false;
|
std::atomic<bool> mIsOpen = false;
|
||||||
std::atomic<bool> mIsClosed = false;
|
std::atomic<bool> mIsClosed = false;
|
||||||
|
|
||||||
@ -88,13 +91,13 @@ private:
|
|||||||
friend class PeerConnection;
|
friend class PeerConnection;
|
||||||
};
|
};
|
||||||
|
|
||||||
class RTC_CPP_EXPORT NegociatedDataChannel final : public DataChannel {
|
class RTC_CPP_EXPORT NegotiatedDataChannel final : public DataChannel {
|
||||||
public:
|
public:
|
||||||
NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream, string label,
|
NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream, string label,
|
||||||
string protocol, Reliability reliability);
|
string protocol, Reliability reliability);
|
||||||
NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, std::weak_ptr<SctpTransport> transport,
|
NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, std::weak_ptr<SctpTransport> transport,
|
||||||
uint16_t stream);
|
uint16_t stream);
|
||||||
~NegociatedDataChannel();
|
~NegotiatedDataChannel();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void open(std::shared_ptr<SctpTransport> transport) override;
|
void open(std::shared_ptr<SctpTransport> transport) override;
|
||||||
|
@ -77,6 +77,8 @@ const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // Max per-channel queue size
|
|||||||
|
|
||||||
const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
|
const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
|
||||||
|
|
||||||
|
const size_t DEFAULT_IPV4_MTU = 1200; // IPv4 safe MTU value recommended by RFC 8261
|
||||||
|
|
||||||
// overloaded helper
|
// overloaded helper
|
||||||
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
|
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
|
||||||
template <class... Ts> overloaded(Ts...) -> overloaded<Ts...>;
|
template <class... Ts> overloaded(Ts...) -> overloaded<Ts...>;
|
||||||
|
@ -298,13 +298,16 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
uint8_t _length;
|
uint8_t _length;
|
||||||
char _text;
|
char _text[1];
|
||||||
|
|
||||||
public:
|
public:
|
||||||
inline std::string text() const { return std::string(&_text, _length); }
|
inline std::string text() const { return std::string(_text, _length); }
|
||||||
inline void setText(std::string text) {
|
inline void setText(std::string text) {
|
||||||
_length = text.length();
|
if(text.size() > 0xFF)
|
||||||
memcpy(&_text, text.data(), _length);
|
throw std::invalid_argument("text is too long");
|
||||||
|
|
||||||
|
_length = uint8_t(text.size());
|
||||||
|
memcpy(_text, text.data(), text.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
inline uint8_t length() { return _length; }
|
inline uint8_t length() { return _length; }
|
||||||
@ -334,12 +337,12 @@ public:
|
|||||||
return reinterpret_cast<RTCP_SDES_ITEM *>(base);
|
return reinterpret_cast<RTCP_SDES_ITEM *>(base);
|
||||||
}
|
}
|
||||||
|
|
||||||
long safelyCountChunkSize(unsigned int maxChunkSize) {
|
long safelyCountChunkSize(size_t maxChunkSize) {
|
||||||
if (maxChunkSize < RTCP_SDES_CHUNK::size({})) {
|
if (maxChunkSize < RTCP_SDES_CHUNK::size({})) {
|
||||||
// chunk is truncated
|
// chunk is truncated
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
unsigned int size = sizeof(SSRC);
|
size_t size = sizeof(SSRC);
|
||||||
unsigned int i = 0;
|
unsigned int i = 0;
|
||||||
// We can always access first 4 bytes of first item (in case of no items there will be 4
|
// We can always access first 4 bytes of first item (in case of no items there will be 4
|
||||||
// null bytes)
|
// null bytes)
|
||||||
@ -407,7 +410,7 @@ public:
|
|||||||
auto chunk = getChunk(i);
|
auto chunk = getChunk(i);
|
||||||
chunkSize += chunk->getSize();
|
chunkSize += chunk->getSize();
|
||||||
}
|
}
|
||||||
uint16_t length = (sizeof(header) + chunkSize) / 4 - 1;
|
uint16_t length = uint16_t((sizeof(header) + chunkSize) / 4 - 1);
|
||||||
header.prepareHeader(202, chunkCount, length);
|
header.prepareHeader(202, chunkCount, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -629,10 +632,10 @@ struct RTCP_NACK_PART {
|
|||||||
std::vector<uint16_t> getSequenceNumbers() {
|
std::vector<uint16_t> getSequenceNumbers() {
|
||||||
std::vector<uint16_t> result{};
|
std::vector<uint16_t> result{};
|
||||||
result.reserve(17);
|
result.reserve(17);
|
||||||
auto pid = getPID();
|
uint16_t pid = getPID();
|
||||||
result.push_back(pid);
|
result.push_back(pid);
|
||||||
auto bitmask = getBLP();
|
uint16_t bitmask = getBLP();
|
||||||
auto i = pid + 1;
|
uint16_t i = pid + 1;
|
||||||
while (bitmask > 0) {
|
while (bitmask > 0) {
|
||||||
if (bitmask & 0x1) {
|
if (bitmask & 0x1) {
|
||||||
result.push_back(i);
|
result.push_back(i);
|
||||||
@ -673,9 +676,9 @@ public:
|
|||||||
(*fciCount)++;
|
(*fciCount)++;
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
// TODO SPEEED!
|
// TODO SPEED!
|
||||||
auto blp = parts[(*fciCount) - 1].getBLP();
|
uint16_t blp = parts[(*fciCount) - 1].getBLP();
|
||||||
auto newBit = 1u << (unsigned int)(missingPacket - (1 + *fciPID));
|
uint16_t newBit = uint16_t(1u << (missingPacket - (1 + *fciPID)));
|
||||||
parts[(*fciCount) - 1].setBLP(blp | newBit);
|
parts[(*fciCount) - 1].setBLP(blp | newBit);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,7 @@ public:
|
|||||||
|
|
||||||
string mid() const;
|
string mid() const;
|
||||||
Description::Media description() const;
|
Description::Media description() const;
|
||||||
|
Description::Direction direction() const;
|
||||||
|
|
||||||
void setDescription(Description::Media description);
|
void setDescription(Description::Media description);
|
||||||
|
|
||||||
@ -75,13 +76,14 @@ private:
|
|||||||
bool outgoing(message_ptr message);
|
bool outgoing(message_ptr message);
|
||||||
|
|
||||||
Description::Media mMediaDescription;
|
Description::Media mMediaDescription;
|
||||||
|
std::shared_ptr<MediaHandler> mRtcpHandler;
|
||||||
|
|
||||||
|
mutable std::shared_mutex mMutex;
|
||||||
|
|
||||||
std::atomic<bool> mIsClosed = false;
|
std::atomic<bool> mIsClosed = false;
|
||||||
|
|
||||||
Queue<message_ptr> mRecvQueue;
|
Queue<message_ptr> mRecvQueue;
|
||||||
|
|
||||||
std::shared_mutex mRtcpHandlerMutex;
|
|
||||||
std::shared_ptr<MediaHandler> mRtcpHandler;
|
|
||||||
|
|
||||||
friend class PeerConnection;
|
friend class PeerConnection;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -87,21 +87,34 @@ DataChannel::~DataChannel() { close(); }
|
|||||||
|
|
||||||
uint16_t DataChannel::stream() const { return mStream; }
|
uint16_t DataChannel::stream() const { return mStream; }
|
||||||
|
|
||||||
uint16_t DataChannel::id() const { return uint16_t(mStream); }
|
uint16_t DataChannel::id() const { return mStream; }
|
||||||
|
|
||||||
string DataChannel::label() const { return mLabel; }
|
string DataChannel::label() const {
|
||||||
|
std::shared_lock lock(mMutex);
|
||||||
|
return mLabel;
|
||||||
|
}
|
||||||
|
|
||||||
string DataChannel::protocol() const { return mProtocol; }
|
string DataChannel::protocol() const {
|
||||||
|
std::shared_lock lock(mMutex);
|
||||||
|
return mProtocol;
|
||||||
|
}
|
||||||
|
|
||||||
Reliability DataChannel::reliability() const { return *mReliability; }
|
Reliability DataChannel::reliability() const {
|
||||||
|
std::shared_lock lock(mMutex);
|
||||||
|
return *mReliability;
|
||||||
|
}
|
||||||
|
|
||||||
void DataChannel::close() {
|
void DataChannel::close() {
|
||||||
mIsClosed = true;
|
std::shared_ptr<SctpTransport> transport;
|
||||||
if (mIsOpen.exchange(false))
|
{
|
||||||
if (auto transport = mSctpTransport.lock())
|
std::shared_lock lock(mMutex);
|
||||||
transport->closeStream(mStream);
|
transport = mSctpTransport.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
mIsClosed = true;
|
||||||
|
if (mIsOpen.exchange(false) && transport)
|
||||||
|
transport->closeStream(mStream);
|
||||||
|
|
||||||
mSctpTransport.reset();
|
|
||||||
resetCallbacks();
|
resetCallbacks();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,7 +123,6 @@ void DataChannel::remoteClose() {
|
|||||||
triggerClosed();
|
triggerClosed();
|
||||||
|
|
||||||
mIsOpen = false;
|
mIsOpen = false;
|
||||||
mSctpTransport.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DataChannel::send(message_variant data) { return outgoing(make_message(std::move(data))); }
|
bool DataChannel::send(message_variant data) { return outgoing(make_message(std::move(data))); }
|
||||||
@ -167,7 +179,10 @@ size_t DataChannel::maxMessageSize() const {
|
|||||||
size_t DataChannel::availableAmount() const { return mRecvQueue.amount(); }
|
size_t DataChannel::availableAmount() const { return mRecvQueue.amount(); }
|
||||||
|
|
||||||
void DataChannel::open(shared_ptr<SctpTransport> transport) {
|
void DataChannel::open(shared_ptr<SctpTransport> transport) {
|
||||||
mSctpTransport = transport;
|
{
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
|
mSctpTransport = transport;
|
||||||
|
}
|
||||||
|
|
||||||
if (!mIsOpen.exchange(true))
|
if (!mIsOpen.exchange(true))
|
||||||
triggerOpen();
|
triggerOpen();
|
||||||
@ -179,19 +194,22 @@ void DataChannel::processOpenMessage(message_ptr) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool DataChannel::outgoing(message_ptr message) {
|
bool DataChannel::outgoing(message_ptr message) {
|
||||||
if (mIsClosed)
|
std::shared_ptr<SctpTransport> transport;
|
||||||
throw std::runtime_error("DataChannel is closed");
|
{
|
||||||
|
std::shared_lock lock(mMutex);
|
||||||
|
transport = mSctpTransport.lock();
|
||||||
|
|
||||||
if (message->size() > maxMessageSize())
|
if (!transport || mIsClosed)
|
||||||
throw std::runtime_error("Message size exceeds limit");
|
throw std::runtime_error("DataChannel is closed");
|
||||||
|
|
||||||
auto transport = mSctpTransport.lock();
|
if (message->size() > maxMessageSize())
|
||||||
if (!transport)
|
throw std::runtime_error("Message size exceeds limit");
|
||||||
throw std::runtime_error("DataChannel transport is not open");
|
|
||||||
|
// Before the ACK has been received on a DataChannel, all messages must be sent ordered
|
||||||
|
message->reliability = mIsOpen ? mReliability : nullptr;
|
||||||
|
message->stream = mStream;
|
||||||
|
}
|
||||||
|
|
||||||
// Before the ACK has been received on a DataChannel, all messages must be sent ordered
|
|
||||||
message->reliability = mIsOpen ? mReliability : nullptr;
|
|
||||||
message->stream = mStream;
|
|
||||||
return transport->send(message);
|
return transport->send(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,20 +253,21 @@ void DataChannel::incoming(message_ptr message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
NegociatedDataChannel::NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream,
|
NegotiatedDataChannel::NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream,
|
||||||
string label, string protocol, Reliability reliability)
|
string label, string protocol, Reliability reliability)
|
||||||
: DataChannel(pc, stream, std::move(label), std::move(protocol), std::move(reliability)) {}
|
: DataChannel(pc, stream, std::move(label), std::move(protocol), std::move(reliability)) {}
|
||||||
|
|
||||||
NegociatedDataChannel::NegociatedDataChannel(std::weak_ptr<PeerConnection> pc,
|
NegotiatedDataChannel::NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc,
|
||||||
std::weak_ptr<SctpTransport> transport,
|
std::weak_ptr<SctpTransport> transport,
|
||||||
uint16_t stream)
|
uint16_t stream)
|
||||||
: DataChannel(pc, stream, "", "", {}) {
|
: DataChannel(pc, stream, "", "", {}) {
|
||||||
mSctpTransport = transport;
|
mSctpTransport = transport;
|
||||||
}
|
}
|
||||||
|
|
||||||
NegociatedDataChannel::~NegociatedDataChannel() {}
|
NegotiatedDataChannel::~NegotiatedDataChannel() {}
|
||||||
|
|
||||||
void NegociatedDataChannel::open(shared_ptr<SctpTransport> transport) {
|
void NegotiatedDataChannel::open(shared_ptr<SctpTransport> transport) {
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
mSctpTransport = transport;
|
mSctpTransport = transport;
|
||||||
|
|
||||||
uint8_t channelType;
|
uint8_t channelType;
|
||||||
@ -287,10 +306,13 @@ void NegociatedDataChannel::open(shared_ptr<SctpTransport> transport) {
|
|||||||
std::copy(mLabel.begin(), mLabel.end(), end);
|
std::copy(mLabel.begin(), mLabel.end(), end);
|
||||||
std::copy(mProtocol.begin(), mProtocol.end(), end + mLabel.size());
|
std::copy(mProtocol.begin(), mProtocol.end(), end + mLabel.size());
|
||||||
|
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
transport->send(make_message(buffer.begin(), buffer.end(), Message::Control, mStream));
|
transport->send(make_message(buffer.begin(), buffer.end(), Message::Control, mStream));
|
||||||
}
|
}
|
||||||
|
|
||||||
void NegociatedDataChannel::processOpenMessage(message_ptr message) {
|
void NegotiatedDataChannel::processOpenMessage(message_ptr message) {
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
auto transport = mSctpTransport.lock();
|
auto transport = mSctpTransport.lock();
|
||||||
if (!transport)
|
if (!transport)
|
||||||
throw std::runtime_error("DataChannel has no transport");
|
throw std::runtime_error("DataChannel has no transport");
|
||||||
@ -326,6 +348,8 @@ void NegociatedDataChannel::processOpenMessage(message_ptr message) {
|
|||||||
mReliability->rexmit = int(0);
|
mReliability->rexmit = int(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
binary buffer(sizeof(AckMessage), byte(0));
|
binary buffer(sizeof(AckMessage), byte(0));
|
||||||
auto &ack = *reinterpret_cast<AckMessage *>(buffer.data());
|
auto &ack = *reinterpret_cast<AckMessage *>(buffer.data());
|
||||||
ack.type = MESSAGE_ACK;
|
ack.type = MESSAGE_ACK;
|
||||||
|
@ -58,10 +58,11 @@ void DtlsSrtpTransport::Cleanup() { srtp_shutdown(); }
|
|||||||
|
|
||||||
DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower,
|
DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower,
|
||||||
shared_ptr<Certificate> certificate,
|
shared_ptr<Certificate> certificate,
|
||||||
|
std::optional<size_t> mtu,
|
||||||
verifier_callback verifierCallback,
|
verifier_callback verifierCallback,
|
||||||
message_callback srtpRecvCallback,
|
message_callback srtpRecvCallback,
|
||||||
state_callback stateChangeCallback)
|
state_callback stateChangeCallback)
|
||||||
: DtlsTransport(lower, certificate, std::move(verifierCallback),
|
: DtlsTransport(lower, certificate, mtu, std::move(verifierCallback),
|
||||||
std::move(stateChangeCallback)),
|
std::move(stateChangeCallback)),
|
||||||
mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback
|
mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback
|
||||||
|
|
||||||
|
@ -39,9 +39,9 @@ public:
|
|||||||
static void Init();
|
static void Init();
|
||||||
static void Cleanup();
|
static void Cleanup();
|
||||||
|
|
||||||
DtlsSrtpTransport(std::shared_ptr<IceTransport> lower, std::shared_ptr<Certificate> certificate,
|
DtlsSrtpTransport(std::shared_ptr<IceTransport> lower, certificate_ptr certificate,
|
||||||
verifier_callback verifierCallback, message_callback srtpRecvCallback,
|
std::optional<size_t> mtu, verifier_callback verifierCallback,
|
||||||
state_callback stateChangeCallback);
|
message_callback srtpRecvCallback, state_callback stateChangeCallback);
|
||||||
~DtlsSrtpTransport();
|
~DtlsSrtpTransport();
|
||||||
|
|
||||||
bool sendMedia(message_ptr message);
|
bool sendMedia(message_ptr message);
|
||||||
|
@ -50,8 +50,9 @@ void DtlsTransport::Init() {
|
|||||||
void DtlsTransport::Cleanup() { gnutls_global_deinit(); }
|
void DtlsTransport::Cleanup() { gnutls_global_deinit(); }
|
||||||
|
|
||||||
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr certificate,
|
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr certificate,
|
||||||
verifier_callback verifierCallback, state_callback stateChangeCallback)
|
std::optional<size_t> mtu, verifier_callback verifierCallback,
|
||||||
: Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate),
|
state_callback stateChangeCallback)
|
||||||
|
: Transport(lower, std::move(stateChangeCallback)), mMtu(mtu), mCertificate(certificate),
|
||||||
mVerifierCallback(std::move(verifierCallback)),
|
mVerifierCallback(std::move(verifierCallback)),
|
||||||
mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
|
mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
|
||||||
|
|
||||||
@ -156,11 +157,15 @@ void DtlsTransport::postHandshake() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void DtlsTransport::runRecvLoop() {
|
void DtlsTransport::runRecvLoop() {
|
||||||
const size_t maxMtu = 4096;
|
const size_t bufferSize = 4096;
|
||||||
|
|
||||||
// Handshake loop
|
// Handshake loop
|
||||||
try {
|
try {
|
||||||
changeState(State::Connecting);
|
changeState(State::Connecting);
|
||||||
gnutls_dtls_set_mtu(mSession, 1280 - 40 - 8); // min MTU over UDP/IPv6
|
|
||||||
|
size_t mtu = mMtu.value_or(DEFAULT_IPV4_MTU + 20) - 8 - 40; // UDP/IPv6
|
||||||
|
gnutls_dtls_set_mtu(mSession, static_cast<unsigned int>(mtu));
|
||||||
|
PLOG_VERBOSE << "SSL MTU set to " << mtu;
|
||||||
|
|
||||||
int ret;
|
int ret;
|
||||||
do {
|
do {
|
||||||
@ -174,7 +179,7 @@ void DtlsTransport::runRecvLoop() {
|
|||||||
|
|
||||||
// RFC 8261: DTLS MUST support sending messages larger than the current path MTU
|
// RFC 8261: DTLS MUST support sending messages larger than the current path MTU
|
||||||
// See https://tools.ietf.org/html/rfc8261#section-5
|
// See https://tools.ietf.org/html/rfc8261#section-5
|
||||||
gnutls_dtls_set_mtu(mSession, maxMtu + 1);
|
gnutls_dtls_set_mtu(mSession, bufferSize + 1);
|
||||||
|
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
PLOG_ERROR << "DTLS handshake: " << e.what();
|
PLOG_ERROR << "DTLS handshake: " << e.what();
|
||||||
@ -188,7 +193,6 @@ void DtlsTransport::runRecvLoop() {
|
|||||||
postHandshake();
|
postHandshake();
|
||||||
changeState(State::Connected);
|
changeState(State::Connected);
|
||||||
|
|
||||||
const size_t bufferSize = maxMtu;
|
|
||||||
char buffer[bufferSize];
|
char buffer[bufferSize];
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -314,8 +318,9 @@ void DtlsTransport::Cleanup() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certificate> certificate,
|
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certificate> certificate,
|
||||||
verifier_callback verifierCallback, state_callback stateChangeCallback)
|
std::optional<size_t> mtu, verifier_callback verifierCallback,
|
||||||
: Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate),
|
state_callback stateChangeCallback)
|
||||||
|
: Transport(lower, std::move(stateChangeCallback)), mMtu(mtu), mCertificate(certificate),
|
||||||
mVerifierCallback(std::move(verifierCallback)),
|
mVerifierCallback(std::move(verifierCallback)),
|
||||||
mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
|
mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
|
||||||
PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)";
|
PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)";
|
||||||
@ -440,16 +445,18 @@ void DtlsTransport::postHandshake() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void DtlsTransport::runRecvLoop() {
|
void DtlsTransport::runRecvLoop() {
|
||||||
const size_t maxMtu = 4096;
|
const size_t bufferSize = 4096;
|
||||||
try {
|
try {
|
||||||
changeState(State::Connecting);
|
changeState(State::Connecting);
|
||||||
SSL_set_mtu(mSsl, 1280 - 40 - 8); // min MTU over UDP/IPv6
|
|
||||||
|
size_t mtu = mMtu.value_or(DEFAULT_IPV4_MTU + 20) - 8 - 40; // UDP/IPv6
|
||||||
|
SSL_set_mtu(mSsl, static_cast<unsigned int>(mtu));
|
||||||
|
PLOG_VERBOSE << "SSL MTU set to " << mtu;
|
||||||
|
|
||||||
// Initiate the handshake
|
// Initiate the handshake
|
||||||
int ret = SSL_do_handshake(mSsl);
|
int ret = SSL_do_handshake(mSsl);
|
||||||
openssl::check(mSsl, ret, "Handshake failed");
|
openssl::check(mSsl, ret, "Handshake failed");
|
||||||
|
|
||||||
const size_t bufferSize = maxMtu;
|
|
||||||
byte buffer[bufferSize];
|
byte buffer[bufferSize];
|
||||||
while (mIncomingQueue.running()) {
|
while (mIncomingQueue.running()) {
|
||||||
// Process pending messages
|
// Process pending messages
|
||||||
@ -466,7 +473,7 @@ void DtlsTransport::runRecvLoop() {
|
|||||||
if (SSL_is_init_finished(mSsl)) {
|
if (SSL_is_init_finished(mSsl)) {
|
||||||
// RFC 8261: DTLS MUST support sending messages larger than the current path
|
// RFC 8261: DTLS MUST support sending messages larger than the current path
|
||||||
// MTU See https://tools.ietf.org/html/rfc8261#section-5
|
// MTU See https://tools.ietf.org/html/rfc8261#section-5
|
||||||
SSL_set_mtu(mSsl, maxMtu + 1);
|
SSL_set_mtu(mSsl, bufferSize + 1);
|
||||||
|
|
||||||
PLOG_INFO << "DTLS handshake finished";
|
PLOG_INFO << "DTLS handshake finished";
|
||||||
postHandshake();
|
postHandshake();
|
||||||
|
@ -44,7 +44,8 @@ public:
|
|||||||
using verifier_callback = std::function<bool(const std::string &fingerprint)>;
|
using verifier_callback = std::function<bool(const std::string &fingerprint)>;
|
||||||
|
|
||||||
DtlsTransport(std::shared_ptr<IceTransport> lower, certificate_ptr certificate,
|
DtlsTransport(std::shared_ptr<IceTransport> lower, certificate_ptr certificate,
|
||||||
verifier_callback verifierCallback, state_callback stateChangeCallback);
|
std::optional<size_t> mtu, verifier_callback verifierCallback,
|
||||||
|
state_callback stateChangeCallback);
|
||||||
~DtlsTransport();
|
~DtlsTransport();
|
||||||
|
|
||||||
virtual void start() override;
|
virtual void start() override;
|
||||||
@ -57,6 +58,7 @@ protected:
|
|||||||
virtual void postHandshake();
|
virtual void postHandshake();
|
||||||
void runRecvLoop();
|
void runRecvLoop();
|
||||||
|
|
||||||
|
const std::optional<size_t> mMtu;
|
||||||
const certificate_ptr mCertificate;
|
const certificate_ptr mCertificate;
|
||||||
const verifier_callback mVerifierCallback;
|
const verifier_callback mVerifierCallback;
|
||||||
const bool mIsClient;
|
const bool mIsClient;
|
||||||
|
@ -22,8 +22,8 @@
|
|||||||
#include "include.hpp"
|
#include "include.hpp"
|
||||||
#include "logcounter.hpp"
|
#include "logcounter.hpp"
|
||||||
#include "processor.hpp"
|
#include "processor.hpp"
|
||||||
#include "threadpool.hpp"
|
|
||||||
#include "rtp.hpp"
|
#include "rtp.hpp"
|
||||||
|
#include "threadpool.hpp"
|
||||||
|
|
||||||
#include "dtlstransport.hpp"
|
#include "dtlstransport.hpp"
|
||||||
#include "icetransport.hpp"
|
#include "icetransport.hpp"
|
||||||
@ -75,6 +75,17 @@ PeerConnection::PeerConnection(const Configuration &config)
|
|||||||
|
|
||||||
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
|
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
|
||||||
throw std::invalid_argument("Invalid port range");
|
throw std::invalid_argument("Invalid port range");
|
||||||
|
|
||||||
|
if (config.mtu) {
|
||||||
|
if (*config.mtu < 576) // Min MTU for IPv4
|
||||||
|
throw std::invalid_argument("Invalid MTU value");
|
||||||
|
|
||||||
|
if (*config.mtu > 1500) { // Standard Ethernet
|
||||||
|
PLOG_WARNING << "MTU set to " << *config.mtu;
|
||||||
|
} else {
|
||||||
|
PLOG_VERBOSE << "MTU set to " << *config.mtu;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PeerConnection::~PeerConnection() {
|
PeerConnection::~PeerConnection() {
|
||||||
@ -515,7 +526,7 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
|
|||||||
|
|
||||||
// DTLS-SRTP
|
// DTLS-SRTP
|
||||||
transport = std::make_shared<DtlsSrtpTransport>(
|
transport = std::make_shared<DtlsSrtpTransport>(
|
||||||
lower, certificate, verifierCallback,
|
lower, certificate, mConfig.mtu, verifierCallback,
|
||||||
weak_bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback);
|
weak_bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback);
|
||||||
#else
|
#else
|
||||||
PLOG_WARNING << "Ignoring media support (not compiled with media support)";
|
PLOG_WARNING << "Ignoring media support (not compiled with media support)";
|
||||||
@ -524,8 +535,8 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
|
|||||||
|
|
||||||
if (!transport) {
|
if (!transport) {
|
||||||
// DTLS only
|
// DTLS only
|
||||||
transport = std::make_shared<DtlsTransport>(lower, certificate, verifierCallback,
|
transport = std::make_shared<DtlsTransport>(lower, certificate, mConfig.mtu,
|
||||||
stateChangeCallback);
|
verifierCallback, stateChangeCallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::atomic_store(&mDtlsTransport, transport);
|
std::atomic_store(&mDtlsTransport, transport);
|
||||||
@ -557,7 +568,7 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
|
|||||||
uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
|
uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
|
||||||
auto lower = std::atomic_load(&mDtlsTransport);
|
auto lower = std::atomic_load(&mDtlsTransport);
|
||||||
auto transport = std::make_shared<SctpTransport>(
|
auto transport = std::make_shared<SctpTransport>(
|
||||||
lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
|
lower, sctpPort, mConfig.mtu, weak_bind(&PeerConnection::forwardMessage, this, _1),
|
||||||
weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
|
weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
|
||||||
[this, weak_this = weak_from_this()](SctpTransport::State state) {
|
[this, weak_this = weak_from_this()](SctpTransport::State state) {
|
||||||
auto shared_this = weak_this.lock();
|
auto shared_this = weak_this.lock();
|
||||||
@ -663,10 +674,12 @@ void PeerConnection::forwardMessage(message_ptr message) {
|
|||||||
if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
|
if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
|
||||||
stream % 2 == remoteParity) {
|
stream % 2 == remoteParity) {
|
||||||
|
|
||||||
channel = std::make_shared<NegociatedDataChannel>(shared_from_this(), sctpTransport,
|
channel =
|
||||||
stream);
|
std::make_shared<NegotiatedDataChannel>(shared_from_this(), sctpTransport, stream);
|
||||||
channel->onOpen(weak_bind(&PeerConnection::triggerDataChannel, this,
|
channel->onOpen(weak_bind(&PeerConnection::triggerDataChannel, this,
|
||||||
weak_ptr<DataChannel>{channel}));
|
weak_ptr<DataChannel>{channel}));
|
||||||
|
|
||||||
|
std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
|
||||||
mDataChannels.emplace(stream, channel);
|
mDataChannels.emplace(stream, channel);
|
||||||
} else {
|
} else {
|
||||||
// Invalid, close the DataChannel
|
// Invalid, close the DataChannel
|
||||||
@ -833,7 +846,7 @@ shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role rol
|
|||||||
init.negotiated
|
init.negotiated
|
||||||
? std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
|
? std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
|
||||||
std::move(init.protocol), std::move(init.reliability))
|
std::move(init.protocol), std::move(init.reliability))
|
||||||
: std::make_shared<NegociatedDataChannel>(shared_from_this(), stream, std::move(label),
|
: std::make_shared<NegotiatedDataChannel>(shared_from_this(), stream, std::move(label),
|
||||||
std::move(init.protocol),
|
std::move(init.protocol),
|
||||||
std::move(init.reliability));
|
std::move(init.reliability));
|
||||||
mDataChannels.emplace(std::make_pair(stream, channel));
|
mDataChannels.emplace(std::make_pair(stream, channel));
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sctptransport.hpp"
|
#include "sctptransport.hpp"
|
||||||
|
#include "dtlstransport.hpp"
|
||||||
#include "logcounter.hpp"
|
#include "logcounter.hpp"
|
||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
@ -27,8 +28,7 @@
|
|||||||
|
|
||||||
// The IETF draft says:
|
// The IETF draft says:
|
||||||
// SCTP MUST support performing Path MTU discovery without relying on ICMP or ICMPv6 as specified in
|
// SCTP MUST support performing Path MTU discovery without relying on ICMP or ICMPv6 as specified in
|
||||||
// [RFC4821] using probing messages specified in [RFC4820]. The initial Path MTU at the IP layer
|
// [RFC4821] using probing messages specified in [RFC4820].
|
||||||
// SHOULD NOT exceed 1200 bytes for IPv4 and 1280 for IPv6.
|
|
||||||
// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-5
|
// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-5
|
||||||
//
|
//
|
||||||
// However, usrsctp does not implement Path MTU discovery, so we need to disable it for now.
|
// However, usrsctp does not implement Path MTU discovery, so we need to disable it for now.
|
||||||
@ -54,8 +54,6 @@
|
|||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
|
|
||||||
using std::shared_ptr;
|
|
||||||
|
|
||||||
namespace rtc {
|
namespace rtc {
|
||||||
|
|
||||||
static LogCounter COUNTER_UNKNOWN_PPID(plog::warning,
|
static LogCounter COUNTER_UNKNOWN_PPID(plog::warning,
|
||||||
@ -83,11 +81,12 @@ void SctpTransport::Init() {
|
|||||||
|
|
||||||
usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
|
usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
|
||||||
|
|
||||||
// Change congestion control from the default TCP Reno (RFC 2581) to H-TCP
|
// Use default congestion control (RFC 4960)
|
||||||
usrsctp_sysctl_set_sctp_default_cc_module(SCTP_CC_HTCP);
|
// See https://github.com/paullouisageneau/libdatachannel/issues/354
|
||||||
|
usrsctp_sysctl_set_sctp_default_cc_module(0);
|
||||||
|
|
||||||
// Enable Non-Renegable Selective Acknowledgments (NR-SACKs)
|
// Enable Partial Reliability Extension (RFC 3758)
|
||||||
usrsctp_sysctl_set_sctp_nrsack_enable(1);
|
usrsctp_sysctl_set_sctp_pr_enable(1);
|
||||||
|
|
||||||
// Increase the initial window size to 10 MTUs (RFC 6928)
|
// Increase the initial window size to 10 MTUs (RFC 6928)
|
||||||
usrsctp_sysctl_set_sctp_initial_cwnd(10);
|
usrsctp_sysctl_set_sctp_initial_cwnd(10);
|
||||||
@ -102,9 +101,10 @@ void SctpTransport::Cleanup() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
||||||
message_callback recvCallback, amount_callback bufferedAmountCallback,
|
std::optional<size_t> mtu, message_callback recvCallback,
|
||||||
|
amount_callback bufferedAmountCallback,
|
||||||
state_callback stateChangeCallback)
|
state_callback stateChangeCallback)
|
||||||
: Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
|
: Transport(lower, std::move(stateChangeCallback)), mPort(port),
|
||||||
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
|
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
|
||||||
onRecv(recvCallback);
|
onRecv(recvCallback);
|
||||||
|
|
||||||
@ -180,13 +180,24 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
|
|||||||
// 1200 bytes.
|
// 1200 bytes.
|
||||||
// See https://tools.ietf.org/html/rfc8261#section-5
|
// See https://tools.ietf.org/html/rfc8261#section-5
|
||||||
#if USE_PMTUD
|
#if USE_PMTUD
|
||||||
// Enable SCTP path MTU discovery
|
if (!mtu.has_value()) {
|
||||||
spp.spp_flags |= SPP_PMTUD_ENABLE;
|
|
||||||
#else
|
#else
|
||||||
// Fall back to a safe MTU value.
|
if (false) {
|
||||||
spp.spp_flags |= SPP_PMTUD_DISABLE;
|
|
||||||
spp.spp_pathmtu = 1200;
|
|
||||||
#endif
|
#endif
|
||||||
|
// Enable SCTP path MTU discovery
|
||||||
|
spp.spp_flags |= SPP_PMTUD_ENABLE;
|
||||||
|
PLOG_VERBOSE << "Path MTU discovery enabled";
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// Fall back to a safe MTU value.
|
||||||
|
spp.spp_flags |= SPP_PMTUD_DISABLE;
|
||||||
|
// The MTU value provided specifies the space available for chunks in the
|
||||||
|
// packet, so we also subtract the SCTP header size.
|
||||||
|
size_t pmtu = mtu.value_or(DEFAULT_IPV4_MTU + 20) - 12 - 37 - 8 - 40; // SCTP/DTLS/UDP/IPv6
|
||||||
|
spp.spp_pathmtu = uint32_t(pmtu);
|
||||||
|
PLOG_VERBOSE << "Path MTU discovery disabled, SCTP MTU set to " << pmtu;
|
||||||
|
}
|
||||||
|
|
||||||
if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
|
if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
|
||||||
throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
|
throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
|
||||||
std::to_string(errno));
|
std::to_string(errno));
|
||||||
@ -249,7 +260,7 @@ bool SctpTransport::stop() {
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
mSendQueue.stop();
|
mSendQueue.stop();
|
||||||
safeFlush();
|
flush();
|
||||||
shutdown();
|
shutdown();
|
||||||
onRecv(nullptr);
|
onRecv(nullptr);
|
||||||
return true;
|
return true;
|
||||||
@ -323,13 +334,20 @@ bool SctpTransport::send(message_ptr message) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SctpTransport::closeStream(unsigned int stream) {
|
bool SctpTransport::flush() {
|
||||||
send(make_message(0, Message::Reset, uint16_t(stream)));
|
try {
|
||||||
|
std::lock_guard lock(mSendMutex);
|
||||||
|
trySendQueue();
|
||||||
|
return true;
|
||||||
|
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
PLOG_WARNING << "SCTP flush: " << e.what();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SctpTransport::flush() {
|
void SctpTransport::closeStream(unsigned int stream) {
|
||||||
std::lock_guard lock(mSendMutex);
|
send(make_message(0, Message::Reset, uint16_t(stream)));
|
||||||
trySendQueue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SctpTransport::incoming(message_ptr message) {
|
void SctpTransport::incoming(message_ptr message) {
|
||||||
@ -349,6 +367,10 @@ void SctpTransport::incoming(message_ptr message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
PLOG_VERBOSE << "Incoming size=" << message->size();
|
PLOG_VERBOSE << "Incoming size=" << message->size();
|
||||||
|
|
||||||
|
// TODO: There seems to be a possible data race between usrsctp_sendv() and usrsctp_conninput()
|
||||||
|
// As a mitigation, lock the send mutex before calling usrsctp_conninput()
|
||||||
|
std::lock_guard lock(mSendMutex);
|
||||||
usrsctp_conninput(this, message->data(), message->size(), 0);
|
usrsctp_conninput(this, message->data(), message->size(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -413,6 +435,16 @@ void SctpTransport::doRecv() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SctpTransport::doFlush() {
|
||||||
|
std::lock_guard lock(mSendMutex);
|
||||||
|
--mPendingFlushCount;
|
||||||
|
try {
|
||||||
|
trySendQueue();
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
PLOG_WARNING << e.what();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool SctpTransport::trySendQueue() {
|
bool SctpTransport::trySendQueue() {
|
||||||
// Requires mSendMutex to be locked
|
// Requires mSendMutex to be locked
|
||||||
while (auto next = mSendQueue.peek()) {
|
while (auto next = mSendQueue.peek()) {
|
||||||
@ -519,13 +551,16 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
|
|||||||
else
|
else
|
||||||
it->second = amount;
|
it->second = amount;
|
||||||
|
|
||||||
mSendMutex.unlock();
|
// Synchronously call the buffered amount callback
|
||||||
|
triggerBufferedAmount(streamId, amount);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SctpTransport::triggerBufferedAmount(uint16_t streamId, size_t amount) {
|
||||||
try {
|
try {
|
||||||
mBufferedAmountCallback(streamId, amount);
|
mBufferedAmountCallback(streamId, amount);
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
PLOG_DEBUG << "SCTP buffered amount callback: " << e.what();
|
PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
|
||||||
}
|
}
|
||||||
mSendMutex.lock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SctpTransport::sendReset(uint16_t streamId) {
|
void SctpTransport::sendReset(uint16_t streamId) {
|
||||||
@ -555,17 +590,6 @@ void SctpTransport::sendReset(uint16_t streamId) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SctpTransport::safeFlush() {
|
|
||||||
try {
|
|
||||||
flush();
|
|
||||||
return true;
|
|
||||||
|
|
||||||
} catch (const std::exception &e) {
|
|
||||||
PLOG_WARNING << "SCTP flush: " << e.what();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void SctpTransport::handleUpcall() {
|
void SctpTransport::handleUpcall() {
|
||||||
if (!mSock)
|
if (!mSock)
|
||||||
return;
|
return;
|
||||||
@ -579,8 +603,10 @@ void SctpTransport::handleUpcall() {
|
|||||||
mProcessor.enqueue(&SctpTransport::doRecv, this);
|
mProcessor.enqueue(&SctpTransport::doRecv, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (events & SCTP_EVENT_WRITE)
|
if (events & SCTP_EVENT_WRITE && mPendingFlushCount == 0) {
|
||||||
mProcessor.enqueue(&SctpTransport::safeFlush, this);
|
++mPendingFlushCount;
|
||||||
|
mProcessor.enqueue(&SctpTransport::doFlush, this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
|
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
|
||||||
@ -695,7 +721,7 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
|
|||||||
PLOG_VERBOSE << "SCTP dry event";
|
PLOG_VERBOSE << "SCTP dry event";
|
||||||
// It should not be necessary since the send callback should have been called already,
|
// It should not be necessary since the send callback should have been called already,
|
||||||
// but to be sure, let's try to send now.
|
// but to be sure, let's try to send now.
|
||||||
safeFlush();
|
flush();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,15 +43,16 @@ public:
|
|||||||
|
|
||||||
using amount_callback = std::function<void(uint16_t streamId, size_t amount)>;
|
using amount_callback = std::function<void(uint16_t streamId, size_t amount)>;
|
||||||
|
|
||||||
SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, message_callback recvCallback,
|
SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, std::optional<size_t> mtu,
|
||||||
amount_callback bufferedAmountCallback, state_callback stateChangeCallback);
|
message_callback recvCallback, amount_callback bufferedAmountCallback,
|
||||||
|
state_callback stateChangeCallback);
|
||||||
~SctpTransport();
|
~SctpTransport();
|
||||||
|
|
||||||
void start() override;
|
void start() override;
|
||||||
bool stop() override;
|
bool stop() override;
|
||||||
bool send(message_ptr message) override; // false if buffered
|
bool send(message_ptr message) override; // false if buffered
|
||||||
|
bool flush();
|
||||||
void closeStream(unsigned int stream);
|
void closeStream(unsigned int stream);
|
||||||
void flush();
|
|
||||||
|
|
||||||
// Stats
|
// Stats
|
||||||
void clearStats();
|
void clearStats();
|
||||||
@ -79,11 +80,12 @@ private:
|
|||||||
bool outgoing(message_ptr message) override;
|
bool outgoing(message_ptr message) override;
|
||||||
|
|
||||||
void doRecv();
|
void doRecv();
|
||||||
|
void doFlush();
|
||||||
bool trySendQueue();
|
bool trySendQueue();
|
||||||
bool trySendMessage(message_ptr message);
|
bool trySendMessage(message_ptr message);
|
||||||
void updateBufferedAmount(uint16_t streamId, long delta);
|
void updateBufferedAmount(uint16_t streamId, long delta);
|
||||||
|
void triggerBufferedAmount(uint16_t streamId, size_t amount);
|
||||||
void sendReset(uint16_t streamId);
|
void sendReset(uint16_t streamId);
|
||||||
bool safeFlush();
|
|
||||||
|
|
||||||
void handleUpcall();
|
void handleUpcall();
|
||||||
int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df);
|
int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df);
|
||||||
@ -95,8 +97,10 @@ private:
|
|||||||
struct socket *mSock;
|
struct socket *mSock;
|
||||||
|
|
||||||
Processor mProcessor;
|
Processor mProcessor;
|
||||||
std::atomic<int> mPendingRecvCount;
|
std::atomic<int> mPendingRecvCount = 0;
|
||||||
std::mutex mRecvMutex, mSendMutex;
|
std::atomic<int> mPendingFlushCount = 0;
|
||||||
|
std::mutex mRecvMutex;
|
||||||
|
std::recursive_mutex mSendMutex; // buffered amount callback is synchronous
|
||||||
Queue<message_ptr> mSendQueue;
|
Queue<message_ptr> mSendQueue;
|
||||||
std::map<uint16_t, size_t> mBufferedAmount;
|
std::map<uint16_t, size_t> mBufferedAmount;
|
||||||
amount_callback mBufferedAmountCallback;
|
amount_callback mBufferedAmountCallback;
|
||||||
|
@ -43,8 +43,7 @@ int ThreadPool::count() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ThreadPool::spawn(int count) {
|
void ThreadPool::spawn(int count) {
|
||||||
std::scoped_lock lock(mMutex, mWorkersMutex);
|
std::unique_lock lock(mWorkersMutex);
|
||||||
mJoining = false;
|
|
||||||
while (count-- > 0)
|
while (count-- > 0)
|
||||||
mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
|
mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
|
||||||
}
|
}
|
||||||
@ -52,7 +51,7 @@ void ThreadPool::spawn(int count) {
|
|||||||
void ThreadPool::join() {
|
void ThreadPool::join() {
|
||||||
{
|
{
|
||||||
std::unique_lock lock(mMutex);
|
std::unique_lock lock(mMutex);
|
||||||
mWaitingCondition.wait(lock, [&]() { return mWaitingWorkers == int(mWorkers.size()); });
|
mWaitingCondition.wait(lock, [&]() { return mBusyWorkers == 0; });
|
||||||
mJoining = true;
|
mJoining = true;
|
||||||
mTasksCondition.notify_all();
|
mTasksCondition.notify_all();
|
||||||
}
|
}
|
||||||
@ -62,9 +61,13 @@ void ThreadPool::join() {
|
|||||||
w.join();
|
w.join();
|
||||||
|
|
||||||
mWorkers.clear();
|
mWorkers.clear();
|
||||||
|
|
||||||
|
mJoining = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadPool::run() {
|
void ThreadPool::run() {
|
||||||
|
++mBusyWorkers;
|
||||||
|
scope_guard([&]() { --mBusyWorkers; });
|
||||||
while (runOne()) {
|
while (runOne()) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -80,24 +83,23 @@ bool ThreadPool::runOne() {
|
|||||||
std::function<void()> ThreadPool::dequeue() {
|
std::function<void()> ThreadPool::dequeue() {
|
||||||
std::unique_lock lock(mMutex);
|
std::unique_lock lock(mMutex);
|
||||||
while (!mJoining) {
|
while (!mJoining) {
|
||||||
|
std::optional<clock::time_point> time;
|
||||||
if (!mTasks.empty()) {
|
if (!mTasks.empty()) {
|
||||||
if (mTasks.top().time <= clock::now()) {
|
time = mTasks.top().time;
|
||||||
|
if (*time <= clock::now()) {
|
||||||
auto func = std::move(mTasks.top().func);
|
auto func = std::move(mTasks.top().func);
|
||||||
mTasks.pop();
|
mTasks.pop();
|
||||||
return func;
|
return func;
|
||||||
}
|
}
|
||||||
|
|
||||||
++mWaitingWorkers;
|
|
||||||
mWaitingCondition.notify_all();
|
|
||||||
mTasksCondition.wait_until(lock, mTasks.top().time);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
++mWaitingWorkers;
|
|
||||||
mWaitingCondition.notify_all();
|
|
||||||
mTasksCondition.wait(lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
--mWaitingWorkers;
|
--mBusyWorkers;
|
||||||
|
scope_guard([&]() { ++mBusyWorkers; });
|
||||||
|
mWaitingCondition.notify_all();
|
||||||
|
if(time)
|
||||||
|
mTasksCondition.wait_until(lock, *time);
|
||||||
|
else
|
||||||
|
mTasksCondition.wait(lock);
|
||||||
}
|
}
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
@ -72,8 +72,8 @@ protected:
|
|||||||
std::function<void()> dequeue(); // returns null function if joining
|
std::function<void()> dequeue(); // returns null function if joining
|
||||||
|
|
||||||
std::vector<std::thread> mWorkers;
|
std::vector<std::thread> mWorkers;
|
||||||
int mWaitingWorkers = 0;
|
int mBusyWorkers = 0;
|
||||||
bool mJoining = false;
|
std::atomic<bool> mJoining = false;
|
||||||
|
|
||||||
struct Task {
|
struct Task {
|
||||||
clock::time_point time;
|
clock::time_point time;
|
||||||
|
@ -35,11 +35,23 @@ using std::weak_ptr;
|
|||||||
Track::Track(Description::Media description)
|
Track::Track(Description::Media description)
|
||||||
: mMediaDescription(std::move(description)), mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
|
: mMediaDescription(std::move(description)), mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
|
||||||
|
|
||||||
string Track::mid() const { return mMediaDescription.mid(); }
|
string Track::mid() const {
|
||||||
|
std::shared_lock lock(mMutex);
|
||||||
|
return mMediaDescription.mid();
|
||||||
|
}
|
||||||
|
|
||||||
Description::Media Track::description() const { return mMediaDescription; }
|
Description::Media Track::description() const {
|
||||||
|
std::shared_lock lock(mMutex);
|
||||||
|
return mMediaDescription;
|
||||||
|
}
|
||||||
|
|
||||||
|
Description::Direction Track::direction() const {
|
||||||
|
std::shared_lock lock(mMutex);
|
||||||
|
return mMediaDescription.direction();
|
||||||
|
}
|
||||||
|
|
||||||
void Track::setDescription(Description::Media description) {
|
void Track::setDescription(Description::Media description) {
|
||||||
|
std::unique_lock lock(mMutex);
|
||||||
if (description.mid() != mMediaDescription.mid())
|
if (description.mid() != mMediaDescription.mid())
|
||||||
throw std::logic_error("Media description mid does not match track mid");
|
throw std::logic_error("Media description mid does not match track mid");
|
||||||
|
|
||||||
@ -48,17 +60,17 @@ void Track::setDescription(Description::Media description) {
|
|||||||
|
|
||||||
void Track::close() {
|
void Track::close() {
|
||||||
mIsClosed = true;
|
mIsClosed = true;
|
||||||
resetCallbacks();
|
|
||||||
setRtcpHandler(nullptr);
|
setRtcpHandler(nullptr);
|
||||||
|
resetCallbacks();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Track::send(message_variant data) {
|
bool Track::send(message_variant data) {
|
||||||
if (mIsClosed)
|
if (mIsClosed)
|
||||||
throw std::runtime_error("Track is closed");
|
throw std::runtime_error("Track is closed");
|
||||||
|
|
||||||
auto direction = mMediaDescription.direction();
|
auto dir = direction();
|
||||||
if ((direction == Description::Direction::RecvOnly ||
|
if ((dir == Description::Direction::RecvOnly || dir == Description::Direction::Inactive)) {
|
||||||
direction == Description::Direction::Inactive)) {
|
|
||||||
COUNTER_MEDIA_BAD_DIRECTION++;
|
COUNTER_MEDIA_BAD_DIRECTION++;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -92,6 +104,7 @@ std::optional<message_variant> Track::peek() {
|
|||||||
|
|
||||||
bool Track::isOpen(void) const {
|
bool Track::isOpen(void) const {
|
||||||
#if RTC_ENABLE_MEDIA
|
#if RTC_ENABLE_MEDIA
|
||||||
|
std::shared_lock lock(mMutex);
|
||||||
return !mIsClosed && mDtlsSrtpTransport.lock();
|
return !mIsClosed && mDtlsSrtpTransport.lock();
|
||||||
#else
|
#else
|
||||||
return !mIsClosed;
|
return !mIsClosed;
|
||||||
@ -101,14 +114,18 @@ bool Track::isOpen(void) const {
|
|||||||
bool Track::isClosed(void) const { return mIsClosed; }
|
bool Track::isClosed(void) const { return mIsClosed; }
|
||||||
|
|
||||||
size_t Track::maxMessageSize() const {
|
size_t Track::maxMessageSize() const {
|
||||||
return 65535 - 12 - 4; // SRTP/UDP
|
return DEFAULT_IPV4_MTU - 12 - 8 - 20; // SRTP/UDP/IPv4
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t Track::availableAmount() const { return mRecvQueue.amount(); }
|
size_t Track::availableAmount() const { return mRecvQueue.amount(); }
|
||||||
|
|
||||||
#if RTC_ENABLE_MEDIA
|
#if RTC_ENABLE_MEDIA
|
||||||
void Track::open(shared_ptr<DtlsSrtpTransport> transport) {
|
void Track::open(shared_ptr<DtlsSrtpTransport> transport) {
|
||||||
mDtlsSrtpTransport = transport;
|
{
|
||||||
|
std::lock_guard lock(mMutex);
|
||||||
|
mDtlsSrtpTransport = transport;
|
||||||
|
}
|
||||||
|
|
||||||
triggerOpen();
|
triggerOpen();
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -117,9 +134,8 @@ void Track::incoming(message_ptr message) {
|
|||||||
if (!message)
|
if (!message)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto direction = mMediaDescription.direction();
|
auto dir = direction();
|
||||||
if ((direction == Description::Direction::SendOnly ||
|
if ((dir == Description::Direction::SendOnly || dir == Description::Direction::Inactive) &&
|
||||||
direction == Description::Direction::Inactive) &&
|
|
||||||
message->type != Message::Control) {
|
message->type != Message::Control) {
|
||||||
COUNTER_MEDIA_BAD_DIRECTION++;
|
COUNTER_MEDIA_BAD_DIRECTION++;
|
||||||
return;
|
return;
|
||||||
@ -143,16 +159,20 @@ void Track::incoming(message_ptr message) {
|
|||||||
|
|
||||||
bool Track::outgoing([[maybe_unused]] message_ptr message) {
|
bool Track::outgoing([[maybe_unused]] message_ptr message) {
|
||||||
#if RTC_ENABLE_MEDIA
|
#if RTC_ENABLE_MEDIA
|
||||||
auto transport = mDtlsSrtpTransport.lock();
|
std::shared_ptr<DtlsSrtpTransport> transport;
|
||||||
if (!transport)
|
{
|
||||||
throw std::runtime_error("Track transport is not open");
|
std::shared_lock lock(mMutex);
|
||||||
|
transport = mDtlsSrtpTransport.lock();
|
||||||
|
if (!transport)
|
||||||
|
throw std::runtime_error("Track is closed");
|
||||||
|
|
||||||
// Set recommended medium-priority DSCP value
|
// Set recommended medium-priority DSCP value
|
||||||
// See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18
|
// See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18
|
||||||
if (mMediaDescription.type() == "audio")
|
if (mMediaDescription.type() == "audio")
|
||||||
message->dscp = 46; // EF: Expedited Forwarding
|
message->dscp = 46; // EF: Expedited Forwarding
|
||||||
else
|
else
|
||||||
message->dscp = 36; // AF42: Assured Forwarding class 4, medium drop probability
|
message->dscp = 36; // AF42: Assured Forwarding class 4, medium drop probability
|
||||||
|
}
|
||||||
|
|
||||||
return transport->sendMedia(message);
|
return transport->sendMedia(message);
|
||||||
#else
|
#else
|
||||||
@ -162,24 +182,23 @@ bool Track::outgoing([[maybe_unused]] message_ptr message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Track::setRtcpHandler(std::shared_ptr<MediaHandler> handler) {
|
void Track::setRtcpHandler(std::shared_ptr<MediaHandler> handler) {
|
||||||
std::unique_lock lock(mRtcpHandlerMutex);
|
{
|
||||||
mRtcpHandler = std::move(handler);
|
std::unique_lock lock(mMutex);
|
||||||
if (mRtcpHandler) {
|
mRtcpHandler = handler;
|
||||||
auto copy = mRtcpHandler;
|
|
||||||
lock.unlock();
|
|
||||||
copy->onOutgoing(std::bind(&Track::outgoing, this, std::placeholders::_1));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handler->onOutgoing(std::bind(&Track::outgoing, this, std::placeholders::_1));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Track::requestKeyframe() {
|
bool Track::requestKeyframe() {
|
||||||
if (auto handler = getRtcpHandler()) {
|
if (auto handler = getRtcpHandler())
|
||||||
return handler->requestKeyframe();
|
return handler->requestKeyframe();
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<MediaHandler> Track::getRtcpHandler() {
|
std::shared_ptr<MediaHandler> Track::getRtcpHandler() {
|
||||||
std::shared_lock lock(mRtcpHandlerMutex);
|
std::shared_lock lock(mMutex);
|
||||||
return mRtcpHandler;
|
return mRtcpHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,11 +40,13 @@ size_t benchmark(milliseconds duration) {
|
|||||||
|
|
||||||
Configuration config1;
|
Configuration config1;
|
||||||
// config1.iceServers.emplace_back("stun:stun.l.google.com:19302");
|
// config1.iceServers.emplace_back("stun:stun.l.google.com:19302");
|
||||||
|
// config1.mtu = 1500;
|
||||||
|
|
||||||
auto pc1 = std::make_shared<PeerConnection>(config1);
|
auto pc1 = std::make_shared<PeerConnection>(config1);
|
||||||
|
|
||||||
Configuration config2;
|
Configuration config2;
|
||||||
// config2.iceServers.emplace_back("stun:stun.l.google.com:19302");
|
// config2.iceServers.emplace_back("stun:stun.l.google.com:19302");
|
||||||
|
// config2.mtu = 1500;
|
||||||
|
|
||||||
auto pc2 = std::make_shared<PeerConnection>(config2);
|
auto pc2 = std::make_shared<PeerConnection>(config2);
|
||||||
|
|
||||||
@ -125,8 +127,12 @@ size_t benchmark(milliseconds duration) {
|
|||||||
openTime = steady_clock::now();
|
openTime = steady_clock::now();
|
||||||
|
|
||||||
cout << "DataChannel open, sending data..." << endl;
|
cout << "DataChannel open, sending data..." << endl;
|
||||||
while (dc1->bufferedAmount() == 0) {
|
try {
|
||||||
dc1->send(messageData);
|
while (dc1->bufferedAmount() == 0) {
|
||||||
|
dc1->send(messageData);
|
||||||
|
}
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
std::cout << "Send failed: " << e.what() << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
// When sent data is buffered in the DataChannel,
|
// When sent data is buffered in the DataChannel,
|
||||||
@ -139,8 +145,12 @@ size_t benchmark(milliseconds duration) {
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
// Continue sending
|
// Continue sending
|
||||||
while (dc1->bufferedAmount() == 0) {
|
try {
|
||||||
dc1->send(messageData);
|
while (dc1->isOpen() && dc1->bufferedAmount() == 0) {
|
||||||
|
dc1->send(messageData);
|
||||||
|
}
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
std::cout << "Send failed: " << e.what() << std::endl;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -36,6 +36,8 @@ void test_connectivity() {
|
|||||||
// STUN server example (not necessary to connect locally)
|
// STUN server example (not necessary to connect locally)
|
||||||
// Please do not use outside of libdatachannel tests
|
// Please do not use outside of libdatachannel tests
|
||||||
config1.iceServers.emplace_back("stun:stun.ageneau.net:3478");
|
config1.iceServers.emplace_back("stun:stun.ageneau.net:3478");
|
||||||
|
// Custom MTU example
|
||||||
|
config1.mtu = 1500;
|
||||||
|
|
||||||
auto pc1 = std::make_shared<PeerConnection>(config1);
|
auto pc1 = std::make_shared<PeerConnection>(config1);
|
||||||
|
|
||||||
@ -43,6 +45,8 @@ void test_connectivity() {
|
|||||||
// STUN server example (not necessary to connect locally)
|
// STUN server example (not necessary to connect locally)
|
||||||
// Please do not use outside of libdatachannel tests
|
// Please do not use outside of libdatachannel tests
|
||||||
config2.iceServers.emplace_back("stun:stun.ageneau.net:3478");
|
config2.iceServers.emplace_back("stun:stun.ageneau.net:3478");
|
||||||
|
// Custom MTU example
|
||||||
|
config2.mtu = 1500;
|
||||||
// Port range example
|
// Port range example
|
||||||
config2.portRangeBegin = 5000;
|
config2.portRangeBegin = 5000;
|
||||||
config2.portRangeEnd = 6000;
|
config2.portRangeEnd = 6000;
|
||||||
@ -221,7 +225,7 @@ void test_connectivity() {
|
|||||||
auto negotiated2 = pc2->createDataChannel("negoctated", init);
|
auto negotiated2 = pc2->createDataChannel("negoctated", init);
|
||||||
|
|
||||||
if (!negotiated1->isOpen() || !negotiated2->isOpen())
|
if (!negotiated1->isOpen() || !negotiated2->isOpen())
|
||||||
throw runtime_error("Negociated DataChannel is not open");
|
throw runtime_error("Negotiated DataChannel is not open");
|
||||||
|
|
||||||
std::atomic<bool> received = false;
|
std::atomic<bool> received = false;
|
||||||
negotiated2->onMessage([&received](const variant<binary, string> &message) {
|
negotiated2->onMessage([&received](const variant<binary, string> &message) {
|
||||||
@ -239,7 +243,7 @@ void test_connectivity() {
|
|||||||
this_thread::sleep_for(1s);
|
this_thread::sleep_for(1s);
|
||||||
|
|
||||||
if (!received)
|
if (!received)
|
||||||
throw runtime_error("Negociated DataChannel failed");
|
throw runtime_error("Negotiated DataChannel failed");
|
||||||
|
|
||||||
// Delay close of peer 2 to check closing works properly
|
// Delay close of peer 2 to check closing works properly
|
||||||
pc1->close();
|
pc1->close();
|
||||||
|
@ -232,7 +232,7 @@ void test_turn_connectivity() {
|
|||||||
auto negotiated2 = pc2->createDataChannel("negoctated", init);
|
auto negotiated2 = pc2->createDataChannel("negoctated", init);
|
||||||
|
|
||||||
if (!negotiated1->isOpen() || !negotiated2->isOpen())
|
if (!negotiated1->isOpen() || !negotiated2->isOpen())
|
||||||
throw runtime_error("Negociated DataChannel is not open");
|
throw runtime_error("Negotiated DataChannel is not open");
|
||||||
|
|
||||||
std::atomic<bool> received = false;
|
std::atomic<bool> received = false;
|
||||||
negotiated2->onMessage([&received](const variant<binary, string> &message) {
|
negotiated2->onMessage([&received](const variant<binary, string> &message) {
|
||||||
@ -250,7 +250,7 @@ void test_turn_connectivity() {
|
|||||||
this_thread::sleep_for(1s);
|
this_thread::sleep_for(1s);
|
||||||
|
|
||||||
if (!received)
|
if (!received)
|
||||||
throw runtime_error("Negociated DataChannel failed");
|
throw runtime_error("Negotiated DataChannel failed");
|
||||||
|
|
||||||
// Delay close of peer 2 to check closing works properly
|
// Delay close of peer 2 to check closing works properly
|
||||||
pc1->close();
|
pc1->close();
|
||||||
|
Reference in New Issue
Block a user