From a67ca9da9b3b9b81cbfe35228cb5f7231d49dd2f Mon Sep 17 00:00:00 2001 From: Paul-Louis Ageneau Date: Sat, 21 Nov 2020 00:32:10 +0100 Subject: [PATCH] Implemented support for DSCP --- deps/libjuice | 2 +- include/rtc/message.hpp | 3 ++- src/dtlssrtptransport.cpp | 2 ++ src/dtlstransport.cpp | 16 ++++++++++++++-- src/dtlstransport.hpp | 2 ++ src/icetransport.cpp | 15 ++++++++++++--- src/icetransport.hpp | 3 +++ src/sctptransport.cpp | 8 ++++++++ src/sctptransport.hpp | 1 + src/track.cpp | 10 +++++++--- 10 files changed, 52 insertions(+), 10 deletions(-) diff --git a/deps/libjuice b/deps/libjuice index 25e6baa..11c8961 160000 --- a/deps/libjuice +++ b/deps/libjuice @@ -1 +1 @@ -Subproject commit 25e6baaaa2326c811ea87b8742669bc1f1161dc6 +Subproject commit 11c89614cfb44f0a89422ac780931da235e883be diff --git a/include/rtc/message.hpp b/include/rtc/message.hpp index a036aab..4b470fd 100644 --- a/include/rtc/message.hpp +++ b/include/rtc/message.hpp @@ -42,7 +42,8 @@ struct RTC_CPP_EXPORT Message : binary { Message(binary &&data, Type type_ = Binary) : binary(std::move(data)), type(type_) {} Type type; - unsigned int stream = 0; + unsigned int stream = 0; // Stream id (SCTP stream or SSRC) + int dscp = 0; // Differentiated Services Code Point std::shared_ptr reliability; }; diff --git a/src/dtlssrtptransport.cpp b/src/dtlssrtptransport.cpp index c0dfa0c..3621625 100644 --- a/src/dtlssrtptransport.cpp +++ b/src/dtlssrtptransport.cpp @@ -141,6 +141,8 @@ bool DtlsSrtpTransport::sendMedia(message_ptr message) { } message->resize(size); + + // DSCP is set by Track according to the type return outgoing(message); } diff --git a/src/dtlstransport.cpp b/src/dtlstransport.cpp index 01b74b7..adfef65 100644 --- a/src/dtlstransport.cpp +++ b/src/dtlstransport.cpp @@ -53,7 +53,7 @@ DtlsTransport::DtlsTransport(shared_ptr lower, certificate_ptr cer verifier_callback verifierCallback, state_callback stateChangeCallback) : Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate), mVerifierCallback(std::move(verifierCallback)), - mIsClient(lower->role() == Description::Role::Active) { + mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) { PLOG_DEBUG << "Initializing DTLS transport (GnuTLS)"; @@ -122,6 +122,7 @@ bool DtlsTransport::send(message_ptr message) { PLOG_VERBOSE << "Send size=" << message->size(); + mCurrentDscp = message->dscp; ssize_t ret; do { ret = gnutls_record_send(mSession, message->data(), message->size()); @@ -143,6 +144,11 @@ void DtlsTransport::incoming(message_ptr message) { mIncomingQueue.push(message); } +bool DtlsTransport::outgoing(message_ptr message) { + message->dscp = mCurrentDscp; + return Transport::outgoing(std::move(message)); +} + void DtlsTransport::postHandshake() { // Dummy } @@ -309,7 +315,7 @@ DtlsTransport::DtlsTransport(shared_ptr lower, shared_ptrrole() == Description::Role::Active) { + mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) { PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)"; try { @@ -405,6 +411,7 @@ bool DtlsTransport::send(message_ptr message) { PLOG_VERBOSE << "Send size=" << message->size(); + mCurrentDscp = message->dscp; int ret = SSL_write(mSsl, message->data(), int(message->size())); return openssl::check(mSsl, ret); } @@ -419,6 +426,11 @@ void DtlsTransport::incoming(message_ptr message) { mIncomingQueue.push(message); } +bool DtlsTransport::outgoing(message_ptr message) { + message->dscp = mCurrentDscp; + return Transport::outgoing(std::move(message)); +} + void DtlsTransport::postHandshake() { // Dummy } diff --git a/src/dtlstransport.hpp b/src/dtlstransport.hpp index d262be1..89a51d4 100644 --- a/src/dtlstransport.hpp +++ b/src/dtlstransport.hpp @@ -53,6 +53,7 @@ public: protected: virtual void incoming(message_ptr message) override; + virtual bool outgoing(message_ptr message) override; virtual void postHandshake(); void runRecvLoop(); @@ -62,6 +63,7 @@ protected: Queue mIncomingQueue; std::thread mRecvThread; + std::atomic mCurrentDscp; #if USE_GNUTLS gnutls_session_t mSession; diff --git a/src/icetransport.cpp b/src/icetransport.cpp index 24bb60c..34d5e5b 100644 --- a/src/icetransport.cpp +++ b/src/icetransport.cpp @@ -222,8 +222,10 @@ bool IceTransport::send(message_ptr message) { } bool IceTransport::outgoing(message_ptr message) { - return juice_send(mAgent.get(), reinterpret_cast(message->data()), - message->size()) >= 0; + // Explicit Congestion Notification takes the least-significant 2 bits of the DS field + int ds = message->dscp << 2; + return juice_send_diffserv(mAgent.get(), reinterpret_cast(message->data()), + message->size(), ds) >= 0; } void IceTransport::changeGatheringState(GatheringState state) { @@ -330,7 +332,7 @@ IceTransport::IceTransport(const Configuration &config, candidate_callback candi mMid("0"), mGatheringState(GatheringState::New), mCandidateCallback(std::move(candidateCallback)), mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)), - mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr) { + mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr), mOutgoingDscp(0) { PLOG_DEBUG << "Initializing ICE transport (libnice)"; @@ -617,6 +619,13 @@ bool IceTransport::send(message_ptr message) { } bool IceTransport::outgoing(message_ptr message) { + std::lock_guard lock(mOutgoingMutex); + if (mOutgoingDscp != message->dscp) { + mOutgoingDscp = message->dscp; + // Explicit Congestion Notification takes the least-significant 2 bits of the DS field + int ds = message->dscp << 2; + nice_agent_set_stream_tos(mNiceAgent.get(), mStreamId, ds); // ToS is the legacy name for DS + } return nice_agent_send(mNiceAgent.get(), mStreamId, 1, message->size(), reinterpret_cast(message->data())) >= 0; } diff --git a/src/icetransport.hpp b/src/icetransport.hpp index fc5fba4..b941418 100644 --- a/src/icetransport.hpp +++ b/src/icetransport.hpp @@ -35,6 +35,7 @@ #include #include #include +#include namespace rtc { @@ -99,6 +100,8 @@ private: std::unique_ptr mMainLoop; std::thread mMainLoopThread; guint mTimeoutId = 0; + std::mutex mOutgoingMutex; + int mOutgoingDscp; static string AddressToString(const NiceAddress &addr); diff --git a/src/sctptransport.cpp b/src/sctptransport.cpp index fbb5c61..af61ea2 100644 --- a/src/sctptransport.cpp +++ b/src/sctptransport.cpp @@ -325,6 +325,13 @@ void SctpTransport::incoming(message_ptr message) { usrsctp_conninput(this, message->data(), message->size(), 0); } +bool SctpTransport::outgoing(message_ptr message) { + // Set recommended medium-priority DSCP value + // See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18 + message->dscp = 10; // AF11: Assured Forwarding class 1, low drop probability + return Transport::outgoing(std::move(message)); +} + void SctpTransport::doRecv() { std::lock_guard lock(mRecvMutex); --mPendingRecvCount; @@ -554,6 +561,7 @@ int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t std::unique_lock lock(mWriteMutex); PLOG_VERBOSE << "Handle write, len=" << len; + auto message = make_message(data, data + len); if (!outgoing(make_message(data, data + len))) return -1; diff --git a/src/sctptransport.hpp b/src/sctptransport.hpp index 4c699c3..e041715 100644 --- a/src/sctptransport.hpp +++ b/src/sctptransport.hpp @@ -77,6 +77,7 @@ private: void shutdown(); void close(); void incoming(message_ptr message) override; + bool outgoing(message_ptr message) override; void doRecv(); bool trySendQueue(); diff --git a/src/track.cpp b/src/track.cpp index 0a24523..bea5fb5 100644 --- a/src/track.cpp +++ b/src/track.cpp @@ -107,14 +107,18 @@ bool Track::outgoing(message_ptr message) { if (mIsClosed) throw std::runtime_error("Track is closed"); - if (message->size() > maxMessageSize()) - throw std::runtime_error("Message size exceeds limit"); - #if RTC_ENABLE_MEDIA auto transport = mDtlsSrtpTransport.lock(); if (!transport) throw std::runtime_error("Track transport is not open"); + // Set recommended medium-priority DSCP value + // See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18 + if (mMediaDescription.type() == "audio") + message->dscp = 46; // EF: Expedited Forwarding + else + message->dscp = 36; // AF42: Assured Forwarding class 4, medium drop probability + return transport->sendMedia(message); #else PLOG_WARNING << "Ignoring track send (not compiled with SRTP support)";