Implemented support for DSCP

This commit is contained in:
Paul-Louis Ageneau
2020-11-21 00:32:10 +01:00
parent 613ebf2564
commit a67ca9da9b
10 changed files with 52 additions and 10 deletions

2
deps/libjuice vendored

View File

@ -42,7 +42,8 @@ struct RTC_CPP_EXPORT Message : binary {
Message(binary &&data, Type type_ = Binary) : binary(std::move(data)), type(type_) {} Message(binary &&data, Type type_ = Binary) : binary(std::move(data)), type(type_) {}
Type type; Type type;
unsigned int stream = 0; unsigned int stream = 0; // Stream id (SCTP stream or SSRC)
int dscp = 0; // Differentiated Services Code Point
std::shared_ptr<Reliability> reliability; std::shared_ptr<Reliability> reliability;
}; };

View File

@ -141,6 +141,8 @@ bool DtlsSrtpTransport::sendMedia(message_ptr message) {
} }
message->resize(size); message->resize(size);
// DSCP is set by Track according to the type
return outgoing(message); return outgoing(message);
} }

View File

@ -53,7 +53,7 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr cer
verifier_callback verifierCallback, state_callback stateChangeCallback) verifier_callback verifierCallback, state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate), : Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate),
mVerifierCallback(std::move(verifierCallback)), mVerifierCallback(std::move(verifierCallback)),
mIsClient(lower->role() == Description::Role::Active) { mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
PLOG_DEBUG << "Initializing DTLS transport (GnuTLS)"; PLOG_DEBUG << "Initializing DTLS transport (GnuTLS)";
@ -122,6 +122,7 @@ bool DtlsTransport::send(message_ptr message) {
PLOG_VERBOSE << "Send size=" << message->size(); PLOG_VERBOSE << "Send size=" << message->size();
mCurrentDscp = message->dscp;
ssize_t ret; ssize_t ret;
do { do {
ret = gnutls_record_send(mSession, message->data(), message->size()); ret = gnutls_record_send(mSession, message->data(), message->size());
@ -143,6 +144,11 @@ void DtlsTransport::incoming(message_ptr message) {
mIncomingQueue.push(message); mIncomingQueue.push(message);
} }
bool DtlsTransport::outgoing(message_ptr message) {
message->dscp = mCurrentDscp;
return Transport::outgoing(std::move(message));
}
void DtlsTransport::postHandshake() { void DtlsTransport::postHandshake() {
// Dummy // Dummy
} }
@ -309,7 +315,7 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certific
verifier_callback verifierCallback, state_callback stateChangeCallback) verifier_callback verifierCallback, state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate), : Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate),
mVerifierCallback(std::move(verifierCallback)), mVerifierCallback(std::move(verifierCallback)),
mIsClient(lower->role() == Description::Role::Active) { mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)"; PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)";
try { try {
@ -405,6 +411,7 @@ bool DtlsTransport::send(message_ptr message) {
PLOG_VERBOSE << "Send size=" << message->size(); PLOG_VERBOSE << "Send size=" << message->size();
mCurrentDscp = message->dscp;
int ret = SSL_write(mSsl, message->data(), int(message->size())); int ret = SSL_write(mSsl, message->data(), int(message->size()));
return openssl::check(mSsl, ret); return openssl::check(mSsl, ret);
} }
@ -419,6 +426,11 @@ void DtlsTransport::incoming(message_ptr message) {
mIncomingQueue.push(message); mIncomingQueue.push(message);
} }
bool DtlsTransport::outgoing(message_ptr message) {
message->dscp = mCurrentDscp;
return Transport::outgoing(std::move(message));
}
void DtlsTransport::postHandshake() { void DtlsTransport::postHandshake() {
// Dummy // Dummy
} }

View File

@ -53,6 +53,7 @@ public:
protected: protected:
virtual void incoming(message_ptr message) override; virtual void incoming(message_ptr message) override;
virtual bool outgoing(message_ptr message) override;
virtual void postHandshake(); virtual void postHandshake();
void runRecvLoop(); void runRecvLoop();
@ -62,6 +63,7 @@ protected:
Queue<message_ptr> mIncomingQueue; Queue<message_ptr> mIncomingQueue;
std::thread mRecvThread; std::thread mRecvThread;
std::atomic<int> mCurrentDscp;
#if USE_GNUTLS #if USE_GNUTLS
gnutls_session_t mSession; gnutls_session_t mSession;

View File

@ -222,8 +222,10 @@ bool IceTransport::send(message_ptr message) {
} }
bool IceTransport::outgoing(message_ptr message) { bool IceTransport::outgoing(message_ptr message) {
return juice_send(mAgent.get(), reinterpret_cast<const char *>(message->data()), // Explicit Congestion Notification takes the least-significant 2 bits of the DS field
message->size()) >= 0; int ds = message->dscp << 2;
return juice_send_diffserv(mAgent.get(), reinterpret_cast<const char *>(message->data()),
message->size(), ds) >= 0;
} }
void IceTransport::changeGatheringState(GatheringState state) { void IceTransport::changeGatheringState(GatheringState state) {
@ -330,7 +332,7 @@ IceTransport::IceTransport(const Configuration &config, candidate_callback candi
mMid("0"), mGatheringState(GatheringState::New), mMid("0"), mGatheringState(GatheringState::New),
mCandidateCallback(std::move(candidateCallback)), mCandidateCallback(std::move(candidateCallback)),
mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)), mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)),
mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr) { mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr), mOutgoingDscp(0) {
PLOG_DEBUG << "Initializing ICE transport (libnice)"; PLOG_DEBUG << "Initializing ICE transport (libnice)";
@ -617,6 +619,13 @@ bool IceTransport::send(message_ptr message) {
} }
bool IceTransport::outgoing(message_ptr message) { bool IceTransport::outgoing(message_ptr message) {
std::lock_guard lock(mOutgoingMutex);
if (mOutgoingDscp != message->dscp) {
mOutgoingDscp = message->dscp;
// Explicit Congestion Notification takes the least-significant 2 bits of the DS field
int ds = message->dscp << 2;
nice_agent_set_stream_tos(mNiceAgent.get(), mStreamId, ds); // ToS is the legacy name for DS
}
return nice_agent_send(mNiceAgent.get(), mStreamId, 1, message->size(), return nice_agent_send(mNiceAgent.get(), mStreamId, 1, message->size(),
reinterpret_cast<const char *>(message->data())) >= 0; reinterpret_cast<const char *>(message->data())) >= 0;
} }

View File

@ -35,6 +35,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include <mutex>
namespace rtc { namespace rtc {
@ -99,6 +100,8 @@ private:
std::unique_ptr<GMainLoop, void (*)(GMainLoop *)> mMainLoop; std::unique_ptr<GMainLoop, void (*)(GMainLoop *)> mMainLoop;
std::thread mMainLoopThread; std::thread mMainLoopThread;
guint mTimeoutId = 0; guint mTimeoutId = 0;
std::mutex mOutgoingMutex;
int mOutgoingDscp;
static string AddressToString(const NiceAddress &addr); static string AddressToString(const NiceAddress &addr);

View File

@ -325,6 +325,13 @@ void SctpTransport::incoming(message_ptr message) {
usrsctp_conninput(this, message->data(), message->size(), 0); usrsctp_conninput(this, message->data(), message->size(), 0);
} }
bool SctpTransport::outgoing(message_ptr message) {
// Set recommended medium-priority DSCP value
// See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18
message->dscp = 10; // AF11: Assured Forwarding class 1, low drop probability
return Transport::outgoing(std::move(message));
}
void SctpTransport::doRecv() { void SctpTransport::doRecv() {
std::lock_guard lock(mRecvMutex); std::lock_guard lock(mRecvMutex);
--mPendingRecvCount; --mPendingRecvCount;
@ -554,6 +561,7 @@ int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t
std::unique_lock lock(mWriteMutex); std::unique_lock lock(mWriteMutex);
PLOG_VERBOSE << "Handle write, len=" << len; PLOG_VERBOSE << "Handle write, len=" << len;
auto message = make_message(data, data + len);
if (!outgoing(make_message(data, data + len))) if (!outgoing(make_message(data, data + len)))
return -1; return -1;

View File

@ -77,6 +77,7 @@ private:
void shutdown(); void shutdown();
void close(); void close();
void incoming(message_ptr message) override; void incoming(message_ptr message) override;
bool outgoing(message_ptr message) override;
void doRecv(); void doRecv();
bool trySendQueue(); bool trySendQueue();

View File

@ -107,14 +107,18 @@ bool Track::outgoing(message_ptr message) {
if (mIsClosed) if (mIsClosed)
throw std::runtime_error("Track is closed"); throw std::runtime_error("Track is closed");
if (message->size() > maxMessageSize())
throw std::runtime_error("Message size exceeds limit");
#if RTC_ENABLE_MEDIA #if RTC_ENABLE_MEDIA
auto transport = mDtlsSrtpTransport.lock(); auto transport = mDtlsSrtpTransport.lock();
if (!transport) if (!transport)
throw std::runtime_error("Track transport is not open"); throw std::runtime_error("Track transport is not open");
// Set recommended medium-priority DSCP value
// See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18
if (mMediaDescription.type() == "audio")
message->dscp = 46; // EF: Expedited Forwarding
else
message->dscp = 36; // AF42: Assured Forwarding class 4, medium drop probability
return transport->sendMedia(message); return transport->sendMedia(message);
#else #else
PLOG_WARNING << "Ignoring track send (not compiled with SRTP support)"; PLOG_WARNING << "Ignoring track send (not compiled with SRTP support)";