Compare commits

...

31 Commits

Author SHA1 Message Date
ee3bc9694b Bumped version to 0.10.2 2020-11-18 23:39:06 +01:00
0c0ba77de5 Merge pull request #265 from paullouisageneau/optimize-sctp-recv
Schedule SCTP recv task only if necessary
2020-11-18 23:38:49 +01:00
8729e0d2aa Scheduled SCTP recv task only if there is no pending task 2020-11-18 23:26:47 +01:00
90eb610bfe Merge pull request #263 from paullouisageneau/release-rdesc-lock
Prevent holding multiple locks
2020-11-17 23:02:15 +01:00
08ddfa1276 Release remote description lock before passing candidate to transport 2020-11-17 22:41:42 +01:00
87df64a002 Merge pull request #262 from paullouisageneau/fix-duplicate-candidates
Prevent duplicate candidates
2020-11-17 20:40:15 +01:00
5af414d0df Cosmetic fixes in Description 2020-11-17 20:23:59 +01:00
2443c72350 Refactored trimming with util functions 2020-11-17 20:10:47 +01:00
f033e4ab8f Prevent whitspaces at the end of candidates as they confuse libnice 2020-11-17 19:57:51 +01:00
1a6dcdce6f Reordered Candidate getters 2020-11-17 19:28:13 +01:00
100039eba8 Enforce candidates uniqueness in description 2020-11-17 19:23:29 +01:00
e2005c789a Refactored candidate storage and split parsing and resolution 2020-11-17 19:21:48 +01:00
819566b4c1 Merge pull request #261 from paullouisageneau/fix-remote-unordered-flag
Fix remote unordered flag
2020-11-17 00:15:35 +01:00
82caab8906 Added tests for remote protocol and reliability in C API 2020-11-16 23:59:59 +01:00
802516b2db Fixed remote DataChannel unordered flag 2020-11-16 23:59:59 +01:00
0fcafad9c7 Bumped version to 0.10.1 2020-11-16 01:00:49 +01:00
aab876d346 Removed SCTP transport receiving flag 2020-11-16 01:00:49 +01:00
11ec8f7247 Made the logging of an RTP packet be verbose 2020-11-16 01:00:49 +01:00
1597c9ae6f Exposed a function to log an RTP packet 2020-11-16 01:00:49 +01:00
b093c4c3d5 Merge pull request #255 from paullouisageneau/fix-renegociation-role
Fix roles during renegociation
2020-11-16 00:02:35 +01:00
447624322c Removed useless role from ICE transport constructor 2020-11-15 23:50:41 +01:00
422713cbdc Added safety check on remote description role 2020-11-15 23:41:52 +01:00
d3d4187021 Fixed roles when receiving actpass during a renegociation 2020-11-15 23:40:16 +01:00
f2dd46e589 Merge pull request #254 from paullouisageneau/usrsctp-no-callback
Change usrsctp callbacks to upcall
2020-11-15 23:37:28 +01:00
5b5debf260 Use upcall and recv instead of message callback for usrsctp 2020-11-15 23:23:40 +01:00
86c3f914fb Merge pull request #252 from paullouisageneau/update-usrsctp
Update usrsctp and enhance threading
2020-11-15 18:31:09 +01:00
6a1fff13c1 Fixed SCTP processor limit 2020-11-15 18:23:00 +01:00
91a854aa5b Refactored processor enqueueing 2020-11-15 18:15:45 +01:00
1181fdc599 Introduced SCTP transport processor 2020-11-15 16:09:29 +01:00
fe3d92cebf Removed usrsctp-static from CMakeLists 2020-11-15 14:44:22 +01:00
c06d77bd8e Updated usrsctp 2020-11-15 14:38:50 +01:00
18 changed files with 420 additions and 277 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.7) cmake_minimum_required(VERSION 3.7)
project(libdatachannel project(libdatachannel
VERSION 0.10.0 VERSION 0.10.2
LANGUAGES CXX) LANGUAGES CXX)
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library") set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")
@ -115,17 +115,15 @@ set(CMAKE_POLICY_DEFAULT_CMP0048 NEW)
add_subdirectory(deps/plog) add_subdirectory(deps/plog)
option(sctp_build_programs 0) option(sctp_build_programs 0)
option(sctp_build_shared_lib 0)
add_subdirectory(deps/usrsctp EXCLUDE_FROM_ALL) add_subdirectory(deps/usrsctp EXCLUDE_FROM_ALL)
if (MSYS OR MINGW) if (MSYS OR MINGW)
target_compile_definitions(usrsctp PUBLIC -DSCTP_STDINT_INCLUDE=<stdint.h>) target_compile_definitions(usrsctp PUBLIC -DSCTP_STDINT_INCLUDE=<stdint.h>)
target_compile_definitions(usrsctp-static PUBLIC -DSCTP_STDINT_INCLUDE=<stdint.h>)
endif() endif()
if (CMAKE_CXX_COMPILER_ID MATCHES "GNU") if (CMAKE_CXX_COMPILER_ID MATCHES "GNU")
target_compile_options(usrsctp PRIVATE -Wno-error=format-truncation) target_compile_options(usrsctp PRIVATE -Wno-error=format-truncation)
target_compile_options(usrsctp-static PRIVATE -Wno-error=format-truncation)
endif() endif()
add_library(Usrsctp::Usrsctp ALIAS usrsctp) add_library(Usrsctp::Usrsctp ALIAS usrsctp)
add_library(Usrsctp::UsrsctpStatic ALIAS usrsctp-static)
if (NO_WEBSOCKET) if (NO_WEBSOCKET)
add_library(datachannel SHARED add_library(datachannel SHARED
@ -156,13 +154,13 @@ target_include_directories(datachannel PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/includ
target_include_directories(datachannel PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc) target_include_directories(datachannel PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc)
target_include_directories(datachannel PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src) target_include_directories(datachannel PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(datachannel PUBLIC Threads::Threads plog::plog) target_link_libraries(datachannel PUBLIC Threads::Threads plog::plog)
target_link_libraries(datachannel PRIVATE Usrsctp::UsrsctpStatic) target_link_libraries(datachannel PRIVATE Usrsctp::Usrsctp)
target_include_directories(datachannel-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) target_include_directories(datachannel-static PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_include_directories(datachannel-static PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc) target_include_directories(datachannel-static PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include/rtc)
target_include_directories(datachannel-static PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src) target_include_directories(datachannel-static PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(datachannel-static PUBLIC Threads::Threads plog::plog) target_link_libraries(datachannel-static PUBLIC Threads::Threads plog::plog)
target_link_libraries(datachannel-static PRIVATE Usrsctp::UsrsctpStatic) target_link_libraries(datachannel-static PRIVATE Usrsctp::Usrsctp)
if(WIN32) if(WIN32)
target_link_libraries(datachannel PRIVATE ws2_32) # winsock2 target_link_libraries(datachannel PRIVATE ws2_32) # winsock2

2
deps/usrsctp vendored

View File

@ -40,29 +40,38 @@ public:
enum class ResolveMode { Simple, Lookup }; enum class ResolveMode { Simple, Lookup };
bool resolve(ResolveMode mode = ResolveMode::Simple); bool resolve(ResolveMode mode = ResolveMode::Simple);
Type type() const;
TransportType transportType() const;
uint32_t priority() const;
string candidate() const; string candidate() const;
string mid() const; string mid() const;
operator string() const; operator string() const;
bool operator==(const Candidate &other) const;
bool operator!=(const Candidate &other) const;
bool isResolved() const; bool isResolved() const;
Family family() const; Family family() const;
Type type() const;
TransportType transportType() const;
std::optional<string> address() const; std::optional<string> address() const;
std::optional<uint16_t> port() const; std::optional<uint16_t> port() const;
std::optional<uint32_t> priority() const;
private: private:
string mCandidate; void parse(string candidate);
string mFoundation;
uint32_t mComponent, mPriority;
string mTypeString, mTransportString;
Type mType;
TransportType mTransportType;
string mNode, mService;
string mTail;
std::optional<string> mMid; std::optional<string> mMid;
// Extracted on resolution // Extracted on resolution
Family mFamily; Family mFamily;
Type mType;
TransportType mTransportType;
string mAddress; string mAddress;
uint16_t mPort; uint16_t mPort;
uint32_t mPriority;
}; };
} // namespace rtc } // namespace rtc

View File

@ -53,6 +53,7 @@ public:
void hintType(Type type); void hintType(Type type);
void setFingerprint(string fingerprint); void setFingerprint(string fingerprint);
bool hasCandidate(const Candidate &candidate) const;
void addCandidate(Candidate candidate); void addCandidate(Candidate candidate);
void addCandidates(std::vector<Candidate> candidates); void addCandidates(std::vector<Candidate> candidates);
void endCandidates(); void endCandidates();

View File

@ -102,12 +102,12 @@ private:
std::function<void()> function; std::function<void()> function;
}; };
template <typename... P> class synchronized_callback { template <typename... Args> class synchronized_callback {
public: public:
synchronized_callback() = default; synchronized_callback() = default;
synchronized_callback(synchronized_callback &&cb) { *this = std::move(cb); } synchronized_callback(synchronized_callback &&cb) { *this = std::move(cb); }
synchronized_callback(const synchronized_callback &cb) { *this = cb; } synchronized_callback(const synchronized_callback &cb) { *this = cb; }
synchronized_callback(std::function<void(P...)> func) { *this = std::move(func); } synchronized_callback(std::function<void(Args...)> func) { *this = std::move(func); }
~synchronized_callback() { *this = nullptr; } ~synchronized_callback() { *this = nullptr; }
synchronized_callback &operator=(synchronized_callback &&cb) { synchronized_callback &operator=(synchronized_callback &&cb) {
@ -123,16 +123,16 @@ public:
return *this; return *this;
} }
synchronized_callback &operator=(std::function<void(P...)> func) { synchronized_callback &operator=(std::function<void(Args...)> func) {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
callback = std::move(func); callback = std::move(func);
return *this; return *this;
} }
void operator()(P... args) const { void operator()(Args... args) const {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
if (callback) if (callback)
callback(args...); callback(std::move(args)...);
} }
operator bool() const { operator bool() const {
@ -140,8 +140,12 @@ public:
return callback ? true : false; return callback ? true : false;
} }
std::function<void(Args...)> wrap() const {
return [this](Args... args) { (*this)(std::move(args)...); };
}
private: private:
std::function<void(P...)> callback; std::function<void(Args...)> callback;
mutable std::recursive_mutex mutex; mutable std::recursive_mutex mutex;
}; };
} // namespace rtc } // namespace rtc

View File

@ -129,7 +129,7 @@ public:
void onTrack(std::function<void(std::shared_ptr<Track> track)> callback); void onTrack(std::function<void(std::shared_ptr<Track> track)> callback);
private: private:
std::shared_ptr<IceTransport> initIceTransport(Description::Role role); std::shared_ptr<IceTransport> initIceTransport();
std::shared_ptr<DtlsTransport> initDtlsTransport(); std::shared_ptr<DtlsTransport> initDtlsTransport();
std::shared_ptr<SctpTransport> initSctpTransport(); std::shared_ptr<SctpTransport> initSctpTransport();
void closeTransports(); void closeTransports();

View File

@ -57,6 +57,7 @@ public:
inline uint8_t version() const { return _first >> 6; } inline uint8_t version() const { return _first >> 6; }
inline bool padding() const { return (_first >> 5) & 0x01; } inline bool padding() const { return (_first >> 5) & 0x01; }
inline bool extension() const { return (_first >> 4) & 0x01; }
inline uint8_t csrcCount() const { return _first & 0x0F; } inline uint8_t csrcCount() const { return _first & 0x0F; }
inline uint8_t marker() const { return _payloadType & 0b10000000; } inline uint8_t marker() const { return _payloadType & 0b10000000; }
inline uint8_t payloadType() const { return _payloadType & 0b01111111; } inline uint8_t payloadType() const { return _payloadType & 0b01111111; }
@ -77,6 +78,17 @@ public:
inline void setSsrc(uint32_t ssrc) { _ssrc = htonl(ssrc); } inline void setSsrc(uint32_t ssrc) { _ssrc = htonl(ssrc); }
void setTimestamp(uint32_t i) { _timestamp = htonl(i); } void setTimestamp(uint32_t i) { _timestamp = htonl(i); }
void log() {
PLOG_VERBOSE << "RTP V: " << (int) version()
<< " P: " << (padding() ? "P" : " ")
<< " X: " << (extension() ? "X" : " ")
<< " CC: " << (int) csrcCount()
<< " M: " << (marker() ? "M" : " ")
<< " PT: " << (int) payloadType()
<< " SEQNO: " << seqNumber()
<< " TS: " << timestamp();
}
}; };
struct RTCP_ReportBlock { struct RTCP_ReportBlock {

View File

@ -20,6 +20,7 @@
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <cctype>
#include <sstream> #include <sstream>
#include <unordered_map> #include <unordered_map>
@ -39,39 +40,44 @@ using std::string;
namespace { namespace {
inline bool hasprefix(const string &str, const string &prefix) { inline bool match_prefix(const string &str, const string &prefix) {
return str.size() >= prefix.size() && return str.size() >= prefix.size() &&
std::mismatch(prefix.begin(), prefix.end(), str.begin()).first == prefix.end(); std::mismatch(prefix.begin(), prefix.end(), str.begin()).first == prefix.end();
} }
inline void trim_begin(string &str) {
str.erase(str.begin(),
std::find_if(str.begin(), str.end(), [](char c) { return !std::isspace(c); }));
}
inline void trim_end(string &str) {
str.erase(
std::find_if(str.rbegin(), str.rend(), [](char c) { return !std::isspace(c); }).base(),
str.end());
}
} // namespace } // namespace
namespace rtc { namespace rtc {
Candidate::Candidate() Candidate::Candidate()
: mFamily(Family::Unresolved), mType(Type::Unknown), mTransportType(TransportType::Unknown), : mFoundation("none"), mComponent(0), mPriority(0), mTypeString("unknown"),
mPort(0), mPriority(0) {} mTransportString("unknown"), mType(Type::Unknown), mTransportType(TransportType::Unknown),
mNode("0.0.0.0"), mService("9"), mFamily(Family::Unresolved), mPort(0) {}
Candidate::Candidate(string candidate) : Candidate() { Candidate::Candidate(string candidate) : Candidate() {
const std::array prefixes{"a=", "candidate:"}; if (!candidate.empty())
for (const string &prefix : prefixes) parse(std::move(candidate));
if (hasprefix(candidate, prefix))
candidate.erase(0, prefix.size());
mCandidate = std::move(candidate);
} }
Candidate::Candidate(string candidate, string mid) : Candidate(std::move(candidate)) { Candidate::Candidate(string candidate, string mid) : Candidate() {
if (!candidate.empty())
parse(std::move(candidate));
if (!mid.empty()) if (!mid.empty())
mMid.emplace(std::move(mid)); mMid.emplace(std::move(mid));
} }
void Candidate::hintMid(string mid) { void Candidate::parse(string candidate) {
if (!mMid)
mMid.emplace(std::move(mid));
}
bool Candidate::resolve(ResolveMode mode) {
using TypeMap_t = std::unordered_map<string, Type>; using TypeMap_t = std::unordered_map<string, Type>;
using TcpTypeMap_t = std::unordered_map<string, TransportType>; using TcpTypeMap_t = std::unordered_map<string, TransportType>;
@ -84,24 +90,23 @@ bool Candidate::resolve(ResolveMode mode) {
{"passive", TransportType::TcpPassive}, {"passive", TransportType::TcpPassive},
{"so", TransportType::TcpSo}}; {"so", TransportType::TcpSo}};
if (mFamily != Family::Unresolved) const std::array prefixes{"a=", "candidate:"};
return true; for (const string &prefix : prefixes)
if (match_prefix(candidate, prefix))
candidate.erase(0, prefix.size());
if (mCandidate.empty()) PLOG_VERBOSE << "Parsing candidate: " << candidate;
throw std::logic_error("Candidate is empty");
PLOG_VERBOSE << "Resolving candidate (mode="
<< (mode == ResolveMode::Simple ? "simple" : "lookup") << "): " << mCandidate;
// See RFC 8445 for format // See RFC 8445 for format
std::istringstream iss(mCandidate); std::istringstream iss(candidate);
int component{0}, priority{0}; string transport, typ_, type;
string foundation, transport, node, service, typ_, type; if (!(iss >> mFoundation >> mComponent >> mTransportString >> mPriority &&
if (iss >> foundation >> component >> transport >> priority && iss >> mNode >> mService >> typ_ >> mTypeString && typ_ == "typ"))
iss >> node >> service >> typ_ >> type && typ_ == "typ") { throw std::invalid_argument("Invalid candidate format");
string left; std::getline(iss, mTail);
std::getline(iss, left); trim_begin(mTail);
trim_end(mTail);
if (auto it = TypeMap.find(type); it != TypeMap.end()) if (auto it = TypeMap.find(type); it != TypeMap.end())
mType = it->second; mType = it->second;
@ -111,7 +116,8 @@ bool Candidate::resolve(ResolveMode mode) {
if (transport == "UDP" || transport == "udp") { if (transport == "UDP" || transport == "udp") {
mTransportType = TransportType::Udp; mTransportType = TransportType::Udp;
} else if (transport == "TCP" || transport == "tcp") { } else if (transport == "TCP" || transport == "tcp") {
std::istringstream iss(left); // Peek tail to find TCP type
std::istringstream iss(mTail);
string tcptype_, tcptype; string tcptype_, tcptype;
if (iss >> tcptype_ >> tcptype && tcptype_ == "tcptype") { if (iss >> tcptype_ >> tcptype && tcptype_ == "tcptype") {
if (auto it = TcpTypeMap.find(tcptype); it != TcpTypeMap.end()) if (auto it = TcpTypeMap.find(tcptype); it != TcpTypeMap.end())
@ -125,8 +131,19 @@ bool Candidate::resolve(ResolveMode mode) {
} else { } else {
mTransportType = TransportType::Unknown; mTransportType = TransportType::Unknown;
} }
}
// Try to resolve the node void Candidate::hintMid(string mid) {
if (!mMid)
mMid.emplace(std::move(mid));
}
bool Candidate::resolve(ResolveMode mode) {
PLOG_VERBOSE << "Resolving candidate (mode="
<< (mode == ResolveMode::Simple ? "simple" : "lookup") << "): " << mNode << ' '
<< mService;
// Try to resolve the node and service
struct addrinfo hints = {}; struct addrinfo hints = {};
hints.ai_family = AF_UNSPEC; hints.ai_family = AF_UNSPEC;
hints.ai_flags = AI_ADDRCONFIG; hints.ai_flags = AI_ADDRCONFIG;
@ -142,10 +159,9 @@ bool Candidate::resolve(ResolveMode mode) {
hints.ai_flags |= AI_NUMERICHOST; hints.ai_flags |= AI_NUMERICHOST;
struct addrinfo *result = nullptr; struct addrinfo *result = nullptr;
if (getaddrinfo(node.c_str(), service.c_str(), &hints, &result) == 0) { if (getaddrinfo(mNode.c_str(), mService.c_str(), &hints, &result) == 0) {
for (auto p = result; p; p = p->ai_next) { for (auto p = result; p; p = p->ai_next) {
if (p->ai_family == AF_INET || p->ai_family == AF_INET6) { if (p->ai_family == AF_INET || p->ai_family == AF_INET6) {
// Rewrite the candidate
char nodebuffer[MAX_NUMERICNODE_LEN]; char nodebuffer[MAX_NUMERICNODE_LEN];
char servbuffer[MAX_NUMERICSERV_LEN]; char servbuffer[MAX_NUMERICSERV_LEN];
if (getnameinfo(p->ai_addr, socklen_t(p->ai_addrlen), nodebuffer, if (getnameinfo(p->ai_addr, socklen_t(p->ai_addrlen), nodebuffer,
@ -155,15 +171,7 @@ bool Candidate::resolve(ResolveMode mode) {
mAddress = nodebuffer; mAddress = nodebuffer;
mPort = uint16_t(std::stoul(servbuffer)); mPort = uint16_t(std::stoul(servbuffer));
mFamily = p->ai_family == AF_INET6 ? Family::Ipv6 : Family::Ipv4; mFamily = p->ai_family == AF_INET6 ? Family::Ipv6 : Family::Ipv4;
PLOG_VERBOSE << "Resolved candidate: " << mAddress << ' ' << mPort;
const char sp{' '};
std::ostringstream oss;
oss << foundation << sp << component << sp << transport << sp << priority;
oss << sp << nodebuffer << sp << servbuffer << sp << "typ" << sp << type;
oss << left;
mCandidate = oss.str();
PLOG_VERBOSE << "Resolved candidate: " << mCandidate;
break; break;
} }
} }
@ -171,12 +179,33 @@ bool Candidate::resolve(ResolveMode mode) {
freeaddrinfo(result); freeaddrinfo(result);
} }
}
return mFamily != Family::Unresolved; return mFamily != Family::Unresolved;
} }
string Candidate::candidate() const { return "candidate:" + mCandidate; } Candidate::Type Candidate::type() const { return mType; }
Candidate::TransportType Candidate::transportType() const { return mTransportType; }
uint32_t Candidate::priority() const { return mPriority; }
string Candidate::candidate() const {
const char sp{' '};
std::ostringstream oss;
oss << "candidate:";
oss << mFoundation << sp << mComponent << sp << mTransportString << sp << mPriority << sp;
if (isResolved())
oss << mAddress << sp << mPort;
else
oss << mNode << sp << mService;
oss << sp << "typ" << sp << mTypeString;
if (!mTail.empty())
oss << sp << mTail;
return oss.str();
}
string Candidate::mid() const { return mMid.value_or("0"); } string Candidate::mid() const { return mMid.value_or("0"); }
@ -186,14 +215,18 @@ Candidate::operator string() const {
return line.str(); return line.str();
} }
bool Candidate::operator==(const Candidate &other) const {
return mFoundation == other.mFoundation;
}
bool Candidate::operator!=(const Candidate &other) const {
return mFoundation != other.mFoundation;
}
bool Candidate::isResolved() const { return mFamily != Family::Unresolved; } bool Candidate::isResolved() const { return mFamily != Family::Unresolved; }
Candidate::Family Candidate::family() const { return mFamily; } Candidate::Family Candidate::family() const { return mFamily; }
Candidate::Type Candidate::type() const { return mType; }
Candidate::TransportType Candidate::transportType() const { return mTransportType; }
std::optional<string> Candidate::address() const { std::optional<string> Candidate::address() const {
return isResolved() ? std::make_optional(mAddress) : nullopt; return isResolved() ? std::make_optional(mAddress) : nullopt;
} }
@ -202,10 +235,6 @@ std::optional<uint16_t> Candidate::port() const {
return isResolved() ? std::make_optional(mPort) : nullopt; return isResolved() ? std::make_optional(mPort) : nullopt;
} }
std::optional<uint32_t> Candidate::priority() const {
return isResolved() ? std::make_optional(mPriority) : nullopt;
}
} // namespace rtc } // namespace rtc
std::ostream &operator<<(std::ostream &out, const rtc::Candidate &candidate) { std::ostream &operator<<(std::ostream &out, const rtc::Candidate &candidate) {
@ -217,11 +246,11 @@ std::ostream &operator<<(std::ostream &out, const rtc::Candidate::Type &type) {
case rtc::Candidate::Type::Host: case rtc::Candidate::Type::Host:
return out << "host"; return out << "host";
case rtc::Candidate::Type::PeerReflexive: case rtc::Candidate::Type::PeerReflexive:
return out << "peer_reflexive"; return out << "prflx";
case rtc::Candidate::Type::ServerReflexive: case rtc::Candidate::Type::ServerReflexive:
return out << "server_reflexive"; return out << "srflx";
case rtc::Candidate::Type::Relayed: case rtc::Candidate::Type::Relayed:
return out << "relayed"; return out << "relay";
default: default:
return out << "unknown"; return out << "unknown";
} }

View File

@ -306,7 +306,7 @@ void NegociatedDataChannel::processOpenMessage(message_ptr message) {
mLabel.assign(end, open.labelLength); mLabel.assign(end, open.labelLength);
mProtocol.assign(end + open.labelLength, open.protocolLength); mProtocol.assign(end + open.labelLength, open.protocolLength);
mReliability->unordered = (open.reliabilityParameter & 0x80) != 0; mReliability->unordered = (open.channelType & 0x80) != 0;
switch (open.channelType & 0x7F) { switch (open.channelType & 0x7F) {
case CHANNEL_PARTIAL_RELIABLE_REXMIT: case CHANNEL_PARTIAL_RELIABLE_REXMIT:
mReliability->type = Reliability::Type::Rexmit; mReliability->type = Reliability::Type::Rexmit;

View File

@ -106,7 +106,7 @@ Description::Description(const string &sdp, Type type, Role role)
mFingerprint->begin(), mFingerprint->begin(),
[](char c) { return char(std::toupper(c)); }); [](char c) { return char(std::toupper(c)); });
} else { } else {
PLOG_WARNING << "Unknown SDP fingerprint type: " << value; PLOG_WARNING << "Unknown SDP fingerprint format: " << value;
} }
} else if (key == "ice-ufrag") { } else if (key == "ice-ufrag") {
mIceUfrag = value; mIceUfrag = value;
@ -171,16 +171,27 @@ void Description::setFingerprint(string fingerprint) {
mFingerprint.emplace(std::move(fingerprint)); mFingerprint.emplace(std::move(fingerprint));
} }
bool Description::hasCandidate(const Candidate &candidate) const {
for (const Candidate &other : mCandidates)
if (candidate == other)
return true;
return false;
}
void Description::addCandidate(Candidate candidate) { void Description::addCandidate(Candidate candidate) {
candidate.hintMid(bundleMid()); candidate.hintMid(bundleMid());
for (const Candidate &other : mCandidates)
if (candidate == other)
return;
mCandidates.emplace_back(std::move(candidate)); mCandidates.emplace_back(std::move(candidate));
} }
void Description::addCandidates(std::vector<Candidate> candidates) { void Description::addCandidates(std::vector<Candidate> candidates) {
for (Candidate candidate : candidates) { for (Candidate candidate : candidates)
candidate.hintMid(bundleMid()); addCandidate(std::move(candidate));
mCandidates.emplace_back(std::move(candidate));
}
} }
void Description::endCandidates() { mEnded = true; } void Description::endCandidates() { mEnded = true; }

View File

@ -46,11 +46,12 @@ using std::chrono::system_clock;
namespace rtc { namespace rtc {
IceTransport::IceTransport(const Configuration &config, Description::Role role, IceTransport::IceTransport(const Configuration &config, candidate_callback candidateCallback,
candidate_callback candidateCallback, state_callback stateChangeCallback, state_callback stateChangeCallback,
gathering_state_callback gatheringStateChangeCallback) gathering_state_callback gatheringStateChangeCallback)
: Transport(nullptr, std::move(stateChangeCallback)), mRole(role), mMid("0"), : Transport(nullptr, std::move(stateChangeCallback)), mRole(Description::Role::ActPass),
mGatheringState(GatheringState::New), mCandidateCallback(std::move(candidateCallback)), mMid("0"), mGatheringState(GatheringState::New),
mCandidateCallback(std::move(candidateCallback)),
mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)), mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)),
mAgent(nullptr, nullptr) { mAgent(nullptr, nullptr) {
@ -139,13 +140,19 @@ Description IceTransport::getLocalDescription(Description::Type type) const {
if (juice_get_local_description(mAgent.get(), sdp, JUICE_MAX_SDP_STRING_LEN) < 0) if (juice_get_local_description(mAgent.get(), sdp, JUICE_MAX_SDP_STRING_LEN) < 0)
throw std::runtime_error("Failed to generate local SDP"); throw std::runtime_error("Failed to generate local SDP");
// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
// setup:actpass.
// See https://tools.ietf.org/html/rfc5763#section-5
return Description(string(sdp), type, return Description(string(sdp), type,
type == Description::Type::Offer ? Description::Role::ActPass : mRole); type == Description::Type::Offer ? Description::Role::ActPass : mRole);
} }
void IceTransport::setRemoteDescription(const Description &description) { void IceTransport::setRemoteDescription(const Description &description) {
if (mRole == Description::Role::ActPass)
mRole = description.role() == Description::Role::Active ? Description::Role::Passive mRole = description.role() == Description::Role::Active ? Description::Role::Passive
: Description::Role::Active; : Description::Role::Active;
if (mRole == description.role())
throw std::logic_error("Incompatible roles with remote description");
mMid = description.bundleMid(); mMid = description.bundleMid();
if (juice_set_remote_description(mAgent.get(), if (juice_set_remote_description(mAgent.get(),
@ -316,11 +323,12 @@ void IceTransport::LogCallback(juice_log_level_t level, const char *message) {
namespace rtc { namespace rtc {
IceTransport::IceTransport(const Configuration &config, Description::Role role, IceTransport::IceTransport(const Configuration &config, candidate_callback candidateCallback,
candidate_callback candidateCallback, state_callback stateChangeCallback, state_callback stateChangeCallback,
gathering_state_callback gatheringStateChangeCallback) gathering_state_callback gatheringStateChangeCallback)
: Transport(nullptr, std::move(stateChangeCallback)), mRole(role), mMid("0"), : Transport(nullptr, std::move(stateChangeCallback)), mRole(Description::Role::ActPass),
mGatheringState(GatheringState::New), mCandidateCallback(std::move(candidateCallback)), mMid("0"), mGatheringState(GatheringState::New),
mCandidateCallback(std::move(candidateCallback)),
mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)), mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)),
mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr) { mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr) {
@ -526,12 +534,21 @@ Description IceTransport::getLocalDescription(Description::Type type) const {
std::unique_ptr<gchar[], void (*)(void *)> sdp(nice_agent_generate_local_sdp(mNiceAgent.get()), std::unique_ptr<gchar[], void (*)(void *)> sdp(nice_agent_generate_local_sdp(mNiceAgent.get()),
g_free); g_free);
return Description(string(sdp.get()), type, mRole);
// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
// setup:actpass.
// See https://tools.ietf.org/html/rfc5763#section-5
return Description(string(sdp.get()), type,
type == Description::Type::Offer ? Description::Role::ActPass : mRole);
} }
void IceTransport::setRemoteDescription(const Description &description) { void IceTransport::setRemoteDescription(const Description &description) {
if (mRole == Description::Role::ActPass)
mRole = description.role() == Description::Role::Active ? Description::Role::Passive mRole = description.role() == Description::Role::Active ? Description::Role::Passive
: Description::Role::Active; : Description::Role::Active;
if (mRole == description.role())
throw std::logic_error("Incompatible roles with remote description");
mMid = description.bundleMid(); mMid = description.bundleMid();
mTrickleTimeout = !description.ended() ? 30s : 0s; mTrickleTimeout = !description.ended() ? 30s : 0s;
@ -547,12 +564,14 @@ bool IceTransport::addRemoteCandidate(const Candidate &candidate) {
return false; return false;
// Warning: the candidate string must start with "a=candidate:" and it must not end with a // Warning: the candidate string must start with "a=candidate:" and it must not end with a
// newline, else libnice will reject it. // newline or whitespace, else libnice will reject it.
string sdp(candidate); string sdp(candidate);
NiceCandidate *cand = NiceCandidate *cand =
nice_agent_parse_remote_candidate_sdp(mNiceAgent.get(), mStreamId, sdp.c_str()); nice_agent_parse_remote_candidate_sdp(mNiceAgent.get(), mStreamId, sdp.c_str());
if (!cand) if (!cand) {
PLOG_WARNING << "Rejected ICE candidate: " << sdp;
return false; return false;
}
GSList *list = g_slist_append(nullptr, cand); GSList *list = g_slist_append(nullptr, cand);
int ret = nice_agent_set_remote_candidates(mNiceAgent.get(), mStreamId, 1, list); int ret = nice_agent_set_remote_candidates(mNiceAgent.get(), mStreamId, 1, list);

View File

@ -45,8 +45,8 @@ public:
using candidate_callback = std::function<void(const Candidate &candidate)>; using candidate_callback = std::function<void(const Candidate &candidate)>;
using gathering_state_callback = std::function<void(GatheringState state)>; using gathering_state_callback = std::function<void(GatheringState state)>;
IceTransport(const Configuration &config, Description::Role role, IceTransport(const Configuration &config, candidate_callback candidateCallback,
candidate_callback candidateCallback, state_callback stateChangeCallback, state_callback stateChangeCallback,
gathering_state_callback gatheringStateChangeCallback); gathering_state_callback gatheringStateChangeCallback);
~IceTransport(); ~IceTransport();

View File

@ -77,7 +77,7 @@ void PeerConnection::close() {
mNegotiationNeeded = false; mNegotiationNeeded = false;
// Close data channels asynchronously // Close data channels asynchronously
mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this)); mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
closeTransports(); closeTransports();
} }
@ -187,13 +187,7 @@ void PeerConnection::setLocalDescription(Description::Type type) {
} }
} }
auto iceTransport = std::atomic_load(&mIceTransport); auto iceTransport = initIceTransport();
if (!iceTransport) {
// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
// setup:actpass.
// See https://tools.ietf.org/html/rfc5763#section-5
iceTransport = initIceTransport(Description::Role::ActPass);
}
Description localDescription = iceTransport->getLocalDescription(type); Description localDescription = iceTransport->getLocalDescription(type);
processLocalDescription(std::move(localDescription)); processLocalDescription(std::move(localDescription));
@ -272,12 +266,6 @@ void PeerConnection::setRemoteDescription(Description description) {
// Candidates will be added at the end, extract them for now // Candidates will be added at the end, extract them for now
auto remoteCandidates = description.extractCandidates(); auto remoteCandidates = description.extractCandidates();
auto type = description.type(); auto type = description.type();
auto iceTransport = std::atomic_load(&mIceTransport);
if (!iceTransport)
iceTransport = initIceTransport(Description::Role::ActPass);
iceTransport->setRemoteDescription(description);
processRemoteDescription(std::move(description)); processRemoteDescription(std::move(description));
changeSignalingState(newSignalingState); changeSignalingState(newSignalingState);
@ -287,8 +275,9 @@ void PeerConnection::setRemoteDescription(Description description) {
setLocalDescription(Description::Type::Answer); setLocalDescription(Description::Type::Answer);
} else { } else {
// This is an answer // This is an answer
auto iceTransport = std::atomic_load(&mIceTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport); auto sctpTransport = std::atomic_load(&mSctpTransport);
if (!sctpTransport && iceTransport->role() == Description::Role::Active) { if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
// Since we assumed passive role during DataChannel creation, we need to shift the // Since we assumed passive role during DataChannel creation, we need to shift the
// stream numbers by one to shift them from odd to even. // stream numbers by one to shift them from odd to even.
std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
@ -405,14 +394,14 @@ void PeerConnection::onTrack(std::function<void(std::shared_ptr<Track>)> callbac
mTrackCallback = callback; mTrackCallback = callback;
} }
shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) { shared_ptr<IceTransport> PeerConnection::initIceTransport() {
PLOG_VERBOSE << "Starting ICE transport"; PLOG_VERBOSE << "Starting ICE transport";
try { try {
if (auto transport = std::atomic_load(&mIceTransport)) if (auto transport = std::atomic_load(&mIceTransport))
return transport; return transport;
auto transport = std::make_shared<IceTransport>( auto transport = std::make_shared<IceTransport>(
mConfig, role, weak_bind(&PeerConnection::processLocalCandidate, this, _1), mConfig, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
[this, weak_this = weak_from_this()](IceTransport::State state) { [this, weak_this = weak_from_this()](IceTransport::State state) {
auto shared_this = weak_this.lock(); auto shared_this = weak_this.lock();
if (!shared_this) if (!shared_this)
@ -490,7 +479,7 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
else else
changeState(State::Connected); changeState(State::Connected);
mProcessor->enqueue(std::bind(&PeerConnection::openTracks, this)); mProcessor->enqueue(&PeerConnection::openTracks, this);
break; break;
case DtlsTransport::State::Failed: case DtlsTransport::State::Failed:
changeState(State::Failed); changeState(State::Failed);
@ -561,16 +550,16 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
switch (state) { switch (state) {
case SctpTransport::State::Connected: case SctpTransport::State::Connected:
changeState(State::Connected); changeState(State::Connected);
mProcessor->enqueue(std::bind(&PeerConnection::openDataChannels, this)); mProcessor->enqueue(&PeerConnection::openDataChannels, this);
break; break;
case SctpTransport::State::Failed: case SctpTransport::State::Failed:
LOG_WARNING << "SCTP transport failed"; LOG_WARNING << "SCTP transport failed";
changeState(State::Failed); changeState(State::Failed);
mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this)); mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
break; break;
case SctpTransport::State::Disconnected: case SctpTransport::State::Disconnected:
changeState(State::Disconnected); changeState(State::Disconnected);
mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this)); mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
break; break;
default: default:
// Ignore // Ignore
@ -1069,19 +1058,17 @@ void PeerConnection::processLocalDescription(Description description) {
mCurrentLocalDescription.emplace(std::move(*mLocalDescription)); mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
} }
mLocalDescription.emplace(std::move(description)); mLocalDescription.emplace(description);
mLocalDescription->addCandidates(std::move(existingCandidates)); mLocalDescription->addCandidates(std::move(existingCandidates));
} }
mProcessor->enqueue([this, description = *mLocalDescription]() {
PLOG_VERBOSE << "Issuing local description: " << description; PLOG_VERBOSE << "Issuing local description: " << description;
mLocalDescriptionCallback(std::move(description)); mProcessor->enqueue(mLocalDescriptionCallback.wrap(), std::move(description));
});
// Reciprocated tracks might need to be open // Reciprocated tracks might need to be open
if (auto dtlsTransport = std::atomic_load(&mDtlsTransport); if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
dtlsTransport && dtlsTransport->state() == Transport::State::Connected) dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
mProcessor->enqueue(std::bind(&PeerConnection::openTracks, this)); mProcessor->enqueue(&PeerConnection::openTracks, this);
} }
void PeerConnection::processLocalCandidate(Candidate candidate) { void PeerConnection::processLocalCandidate(Candidate candidate) {
@ -1089,13 +1076,11 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
if (!mLocalDescription) if (!mLocalDescription)
throw std::logic_error("Got a local candidate without local description"); throw std::logic_error("Got a local candidate without local description");
candidate.resolve(Candidate::ResolveMode::Simple); // for proper SDP generation later candidate.resolve(Candidate::ResolveMode::Simple);
mLocalDescription->addCandidate(candidate); mLocalDescription->addCandidate(candidate);
mProcessor->enqueue([this, candidate = std::move(candidate)]() {
PLOG_VERBOSE << "Issuing local candidate: " << candidate; PLOG_VERBOSE << "Issuing local candidate: " << candidate;
mLocalCandidateCallback(std::move(candidate)); mProcessor->enqueue(mLocalCandidateCallback.wrap(), std::move(candidate));
});
} }
void PeerConnection::processRemoteDescription(Description description) { void PeerConnection::processRemoteDescription(Description description) {
@ -1107,10 +1092,13 @@ void PeerConnection::processRemoteDescription(Description description) {
if (mRemoteDescription) if (mRemoteDescription)
existingCandidates = mRemoteDescription->extractCandidates(); existingCandidates = mRemoteDescription->extractCandidates();
mRemoteDescription.emplace(std::move(description)); mRemoteDescription.emplace(description);
mRemoteDescription->addCandidates(std::move(existingCandidates)); mRemoteDescription->addCandidates(std::move(existingCandidates));
} }
auto iceTransport = initIceTransport();
iceTransport->setRemoteDescription(std::move(description));
if (description.hasApplication()) { if (description.hasApplication()) {
auto dtlsTransport = std::atomic_load(&mDtlsTransport); auto dtlsTransport = std::atomic_load(&mDtlsTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport); auto sctpTransport = std::atomic_load(&mSctpTransport);
@ -1121,28 +1109,40 @@ void PeerConnection::processRemoteDescription(Description description) {
} }
void PeerConnection::processRemoteCandidate(Candidate candidate) { void PeerConnection::processRemoteCandidate(Candidate candidate) {
std::lock_guard lock(mRemoteDescriptionMutex);
auto iceTransport = std::atomic_load(&mIceTransport); auto iceTransport = std::atomic_load(&mIceTransport);
if (!mRemoteDescription || !iceTransport) {
// Set as remote candidate
std::lock_guard lock(mRemoteDescriptionMutex);
if (!mRemoteDescription)
throw std::logic_error("Got a remote candidate without remote description"); throw std::logic_error("Got a remote candidate without remote description");
if (!iceTransport)
throw std::logic_error("Got a remote candidate without ICE transport");
candidate.hintMid(mRemoteDescription->bundleMid()); candidate.hintMid(mRemoteDescription->bundleMid());
if (candidate.resolve(Candidate::ResolveMode::Simple)) { if (mRemoteDescription->hasCandidate(candidate))
iceTransport->addRemoteCandidate(candidate); return; // already in description, ignore
candidate.resolve(Candidate::ResolveMode::Simple);
mRemoteDescription->addCandidate(candidate);
}
if (candidate.isResolved()) {
iceTransport->addRemoteCandidate(std::move(candidate));
} else { } else {
// OK, we might need a lookup, do it asynchronously // We might need a lookup, do it asynchronously
// We don't use the thread pool because we have no control on the timeout // We don't use the thread pool because we have no control on the timeout
if (auto iceTransport = std::atomic_load(&mIceTransport)) {
weak_ptr<IceTransport> weakIceTransport{iceTransport}; weak_ptr<IceTransport> weakIceTransport{iceTransport};
std::thread t([weakIceTransport, candidate]() mutable { std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
if (candidate.resolve(Candidate::ResolveMode::Lookup)) if (candidate.resolve(Candidate::ResolveMode::Lookup))
if (auto iceTransport = weakIceTransport.lock()) if (auto iceTransport = weakIceTransport.lock())
iceTransport->addRemoteCandidate(candidate); iceTransport->addRemoteCandidate(std::move(candidate));
}); });
t.detach(); t.detach();
} }
}
mRemoteDescription->addCandidate(std::move(candidate));
} }
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) { void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
@ -1150,12 +1150,11 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
if (!dataChannel) if (!dataChannel)
return; return;
mProcessor->enqueue( mProcessor->enqueue(mDataChannelCallback.wrap(), std::move(dataChannel));
[this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
} }
void PeerConnection::triggerTrack(std::shared_ptr<Track> track) { void PeerConnection::triggerTrack(std::shared_ptr<Track> track) {
mProcessor->enqueue([this, track = std::move(track)]() { mTrackCallback(track); }); mProcessor->enqueue(mTrackCallback.wrap(), std::move(track));
} }
bool PeerConnection::changeState(State state) { bool PeerConnection::changeState(State state) {
@ -1177,7 +1176,7 @@ bool PeerConnection::changeState(State state) {
// This is the last state change, so we may steal the callback // This is the last state change, so we may steal the callback
mProcessor->enqueue([cb = std::move(mStateChangeCallback)]() { cb(State::Closed); }); mProcessor->enqueue([cb = std::move(mStateChangeCallback)]() { cb(State::Closed); });
else else
mProcessor->enqueue([this, state]() { mStateChangeCallback(state); }); mProcessor->enqueue(mStateChangeCallback.wrap(), state);
return true; return true;
} }
@ -1189,7 +1188,7 @@ bool PeerConnection::changeGatheringState(GatheringState state) {
std::ostringstream s; std::ostringstream s;
s << state; s << state;
PLOG_INFO << "Changed gathering state to " << s.str(); PLOG_INFO << "Changed gathering state to " << s.str();
mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); }); mProcessor->enqueue(mGatheringStateChangeCallback.wrap(), state);
return true; return true;
} }
@ -1200,7 +1199,7 @@ bool PeerConnection::changeSignalingState(SignalingState state) {
std::ostringstream s; std::ostringstream s;
s << state; s << state;
PLOG_INFO << "Changed signaling state to " << s.str(); PLOG_INFO << "Changed signaling state to " << s.str();
mProcessor->enqueue([this, state] { mSignalingStateChangeCallback(state); }); mProcessor->enqueue(mSignalingStateChangeCallback.wrap(), state);
return true; return true;
} }

View File

@ -20,6 +20,8 @@
namespace rtc { namespace rtc {
Processor::Processor(size_t limit) : mTasks(limit) {}
Processor::~Processor() { join(); } Processor::~Processor() { join(); }
void Processor::join() { void Processor::join() {
@ -29,15 +31,13 @@ void Processor::join() {
void Processor::schedule() { void Processor::schedule() {
std::unique_lock lock(mMutex); std::unique_lock lock(mMutex);
if (mTasks.empty()) { if (auto next = mTasks.tryPop()) {
ThreadPool::Instance().enqueue(std::move(*next));
} else {
// No more tasks // No more tasks
mPending = false; mPending = false;
mCondition.notify_all(); mCondition.notify_all();
return;
} }
ThreadPool::Instance().enqueue(std::move(mTasks.front()));
mTasks.pop();
} }
} // namespace rtc } // namespace rtc

View File

@ -22,6 +22,7 @@
#include "include.hpp" #include "include.hpp"
#include "init.hpp" #include "init.hpp"
#include "threadpool.hpp" #include "threadpool.hpp"
#include "queue.hpp"
#include <condition_variable> #include <condition_variable>
#include <future> #include <future>
@ -34,7 +35,7 @@ namespace rtc {
// Processed tasks in order by delegating them to the thread pool // Processed tasks in order by delegating them to the thread pool
class Processor final { class Processor final {
public: public:
Processor() = default; Processor(size_t limit = 0);
~Processor(); ~Processor();
Processor(const Processor &) = delete; Processor(const Processor &) = delete;
@ -52,7 +53,7 @@ protected:
// Keep an init token // Keep an init token
const init_token mInitToken = Init::Token(); const init_token mInitToken = Init::Token();
std::queue<std::function<void()>> mTasks; Queue<std::function<void()>> mTasks;
bool mPending = false; // true iff a task is pending in the thread pool bool mPending = false; // true iff a task is pending in the thread pool
mutable std::mutex mMutex; mutable std::mutex mMutex;
@ -71,7 +72,7 @@ template <class F, class... Args> void Processor::enqueue(F &&f, Args &&... args
ThreadPool::Instance().enqueue(std::move(task)); ThreadPool::Instance().enqueue(std::move(task));
mPending = true; mPending = true;
} else { } else {
mTasks.emplace(std::move(task)); mTasks.push(std::move(task));
} }
} }

View File

@ -88,7 +88,7 @@ void SctpTransport::Cleanup() {
SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
message_callback recvCallback, amount_callback bufferedAmountCallback, message_callback recvCallback, amount_callback bufferedAmountCallback,
state_callback stateChangeCallback) state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mPort(port), : Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) { mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
onRecv(recvCallback); onRecv(recvCallback);
@ -100,11 +100,12 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
Instances.insert(this); Instances.insert(this);
} }
mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, &SctpTransport::RecvCallback, mSock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, nullptr, nullptr, 0, nullptr);
&SctpTransport::SendCallback, 0, this);
if (!mSock) if (!mSock)
throw std::runtime_error("Could not create SCTP socket, errno=" + std::to_string(errno)); throw std::runtime_error("Could not create SCTP socket, errno=" + std::to_string(errno));
usrsctp_set_upcall(mSock, &SctpTransport::UpcallCallback, this);
if (usrsctp_set_non_blocking(mSock, 1)) if (usrsctp_set_non_blocking(mSock, 1))
throw std::runtime_error("Unable to set non-blocking mode, errno=" + std::to_string(errno)); throw std::runtime_error("Unable to set non-blocking mode, errno=" + std::to_string(errno));
@ -122,6 +123,10 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av))) if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av)))
throw std::runtime_error("Could not set socket option SCTP_ENABLE_STREAM_RESET, errno=" + throw std::runtime_error("Could not set socket option SCTP_ENABLE_STREAM_RESET, errno=" +
std::to_string(errno)); std::to_string(errno));
int on = 1;
if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(on)))
throw std::runtime_error("Could set socket option SCTP_RECVRCVINFO, errno=" +
std::to_string(errno));
struct sctp_event se = {}; struct sctp_event se = {};
se.se_assoc_id = SCTP_ALL_ASSOC; se.se_assoc_id = SCTP_ALL_ASSOC;
@ -225,6 +230,7 @@ bool SctpTransport::stop() {
void SctpTransport::close() { void SctpTransport::close() {
if (mSock) { if (mSock) {
mProcessor.join();
usrsctp_close(mSock); usrsctp_close(mSock);
mSock = nullptr; mSock = nullptr;
} }
@ -319,6 +325,60 @@ void SctpTransport::incoming(message_ptr message) {
usrsctp_conninput(this, message->data(), message->size(), 0); usrsctp_conninput(this, message->data(), message->size(), 0);
} }
void SctpTransport::doRecv() {
std::lock_guard lock(mRecvMutex);
--mPendingRecvCount;
try {
while (true) {
const size_t bufferSize = 65536;
byte buffer[bufferSize];
socklen_t fromlen = 0;
struct sctp_rcvinfo info = {};
socklen_t infolen = sizeof(info);
unsigned int infotype = 0;
int flags = 0;
ssize_t len = usrsctp_recvv(mSock, buffer, bufferSize, nullptr, &fromlen, &info,
&infolen, &infotype, &flags);
if (len < 0) {
if (errno == EWOULDBLOCK || errno == EAGAIN || errno == ECONNRESET)
break;
else
throw std::runtime_error("SCTP recv failed, errno=" + std::to_string(errno));
}
PLOG_VERBOSE << "SCTP recv, len=" << len;
// SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB,
// therefore partial notifications and messages need to be handled separately.
if (flags & MSG_NOTIFICATION) {
// SCTP event notification
mPartialNotification.insert(mPartialNotification.end(), buffer, buffer + len);
if (flags & MSG_EOR) {
// Notification is complete, process it
auto notification =
reinterpret_cast<union sctp_notification *>(mPartialNotification.data());
processNotification(notification, mPartialNotification.size());
mPartialNotification.clear();
}
} else {
// SCTP message
mPartialMessage.insert(mPartialMessage.end(), buffer, buffer + len);
if (flags & MSG_EOR) {
// Message is complete, process it
if (infotype != SCTP_RECVV_RCVINFO)
throw std::runtime_error("Missing SCTP recv info");
processData(std::move(mPartialMessage), info.rcv_sid,
PayloadId(ntohl(info.rcv_ppid)));
mPartialMessage.clear();
}
}
}
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
bool SctpTransport::trySendQueue() { bool SctpTransport::trySendQueue() {
// Requires mSendMutex to be locked // Requires mSendMutex to be locked
while (auto next = mSendQueue.peek()) { while (auto next = mSendQueue.peek()) {
@ -472,44 +532,21 @@ bool SctpTransport::safeFlush() {
} }
} }
int SctpTransport::handleRecv(struct socket * /*sock*/, union sctp_sockstore /*addr*/, void SctpTransport::handleUpcall() {
const byte *data, size_t len, struct sctp_rcvinfo info, int flags) { if (!mSock)
try { return;
PLOG_VERBOSE << "Handle recv, len=" << len;
// SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB, PLOG_VERBOSE << "Handle upcall";
// therefore partial notifications and messages need to be handled separately.
if (flags & MSG_NOTIFICATION) { int events = usrsctp_get_events(mSock);
// SCTP event notification
mPartialNotification.insert(mPartialNotification.end(), data, data + len); if (events & SCTP_EVENT_READ && mPendingRecvCount == 0) {
if (flags & MSG_EOR) { ++mPendingRecvCount;
// Notification is complete, process it mProcessor.enqueue(&SctpTransport::doRecv, this);
processNotification(
reinterpret_cast<const union sctp_notification *>(mPartialNotification.data()),
mPartialNotification.size());
mPartialNotification.clear();
}
} else {
// SCTP message
mPartialMessage.insert(mPartialMessage.end(), data, data + len);
if (flags & MSG_EOR) {
// Message is complete, process it
processData(std::move(mPartialMessage), info.rcv_sid,
PayloadId(ntohl(info.rcv_ppid)));
mPartialMessage.clear();
}
} }
} catch (const std::exception &e) { if (events & SCTP_EVENT_WRITE)
PLOG_ERROR << "SCTP recv: " << e.what(); mProcessor.enqueue(&SctpTransport::safeFlush, this);
return -1;
}
return 0; // success
}
int SctpTransport::handleSend(size_t free) {
PLOG_VERBOSE << "Handle send, free=" << free;
return safeFlush() ? 0 : -1;
} }
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) { int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
@ -699,31 +736,14 @@ std::optional<milliseconds> SctpTransport::rtt() {
return milliseconds(status.sstat_primary.spinfo_srtt); return milliseconds(status.sstat_primary.spinfo_srtt);
} }
int SctpTransport::RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data, void SctpTransport::UpcallCallback(struct socket *, void *arg, int /* flags */) {
size_t len, struct sctp_rcvinfo recv_info, int flags, auto *transport = static_cast<SctpTransport *>(arg);
void *ulp_info) {
auto *transport = static_cast<SctpTransport *>(ulp_info);
std::shared_lock lock(InstancesMutex);
if (Instances.find(transport) == Instances.end()) {
free(data);
return -1;
}
int ret =
transport->handleRecv(sock, addr, static_cast<const byte *>(data), len, recv_info, flags);
free(data);
return ret;
}
int SctpTransport::SendCallback(struct socket *, uint32_t sb_free, void *ulp_info) {
auto *transport = static_cast<SctpTransport *>(ulp_info);
std::shared_lock lock(InstancesMutex); std::shared_lock lock(InstancesMutex);
if (Instances.find(transport) == Instances.end()) if (Instances.find(transport) == Instances.end())
return -1; return;
return transport->handleSend(size_t(sb_free)); transport->handleUpcall();
} }
int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) { int SctpTransport::WriteCallback(void *ptr, void *data, size_t len, uint8_t tos, uint8_t set_df) {

View File

@ -21,8 +21,10 @@
#include "include.hpp" #include "include.hpp"
#include "peerconnection.hpp" #include "peerconnection.hpp"
#include "processor.hpp"
#include "queue.hpp" #include "queue.hpp"
#include "transport.hpp" #include "transport.hpp"
#include "processor.hpp"
#include <condition_variable> #include <condition_variable>
#include <functional> #include <functional>
@ -35,7 +37,7 @@
namespace rtc { namespace rtc {
class SctpTransport : public Transport { class SctpTransport final : public Transport {
public: public:
static void Init(); static void Init();
static void Cleanup(); static void Cleanup();
@ -76,15 +78,14 @@ private:
void close(); void close();
void incoming(message_ptr message) override; void incoming(message_ptr message) override;
void doRecv();
bool trySendQueue(); bool trySendQueue();
bool trySendMessage(message_ptr message); bool trySendMessage(message_ptr message);
void updateBufferedAmount(uint16_t streamId, long delta); void updateBufferedAmount(uint16_t streamId, long delta);
void sendReset(uint16_t streamId); void sendReset(uint16_t streamId);
bool safeFlush(); bool safeFlush();
int handleRecv(struct socket *sock, union sctp_sockstore addr, const byte *data, size_t len, void handleUpcall();
struct sctp_rcvinfo recv_info, int flags);
int handleSend(size_t free);
int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df); int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df);
void processData(binary &&data, uint16_t streamId, PayloadId ppid); void processData(binary &&data, uint16_t streamId, PayloadId ppid);
@ -93,7 +94,9 @@ private:
const uint16_t mPort; const uint16_t mPort;
struct socket *mSock; struct socket *mSock;
std::mutex mSendMutex; Processor mProcessor;
std::atomic<int> mPendingRecvCount;
std::mutex mRecvMutex, mSendMutex;
Queue<message_ptr> mSendQueue; Queue<message_ptr> mSendQueue;
std::map<uint16_t, size_t> mBufferedAmount; std::map<uint16_t, size_t> mBufferedAmount;
amount_callback mBufferedAmountCallback; amount_callback mBufferedAmountCallback;
@ -109,9 +112,7 @@ private:
// Stats // Stats
std::atomic<size_t> mBytesSent = 0, mBytesReceived = 0; std::atomic<size_t> mBytesSent = 0, mBytesReceived = 0;
static int RecvCallback(struct socket *sock, union sctp_sockstore addr, void *data, size_t len, static void UpcallCallback(struct socket *sock, void *arg, int flags);
struct sctp_rcvinfo recv_info, int flags, void *ulp_info);
static int SendCallback(struct socket *sock, uint32_t sb_free, void *ulp_info);
static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df); static int WriteCallback(void *sctp_ptr, void *data, size_t len, uint8_t tos, uint8_t set_df);
static std::unordered_set<SctpTransport *> Instances; static std::unordered_set<SctpTransport *> Instances;

View File

@ -100,14 +100,48 @@ static void RTC_API messageCallback(int id, const char *message, int size, void
static void RTC_API dataChannelCallback(int pc, int dc, void *ptr) { static void RTC_API dataChannelCallback(int pc, int dc, void *ptr) {
Peer *peer = (Peer *)ptr; Peer *peer = (Peer *)ptr;
peer->dc = dc;
peer->connected = true; char label[256];
if (rtcGetDataChannelLabel(dc, label, 256) < 0) {
fprintf(stderr, "rtcGetDataChannelLabel failed\n");
return;
}
char protocol[256];
if (rtcGetDataChannelProtocol(dc, protocol, 256) < 0) {
fprintf(stderr, "rtcGetDataChannelProtocol failed\n");
return;
}
rtcReliability reliability;
if (rtcGetDataChannelReliability(dc, &reliability) < 0) {
fprintf(stderr, "rtcGetDataChannelReliability failed\n");
return;
}
printf("DataChannel %d: Received with label \"%s\" and protocol \"%s\"\n",
peer == peer1 ? 1 : 2, label, protocol);
if (strcmp(label, "test") != 0) {
fprintf(stderr, "Wrong DataChannel label\n");
return;
}
if (strcmp(protocol, "protocol") != 0) {
fprintf(stderr, "Wrong DataChannel protocol\n");
return;
}
if (reliability.unordered == false) {
fprintf(stderr, "Wrong DataChannel reliability\n");
return;
}
rtcSetClosedCallback(dc, closedCallback); rtcSetClosedCallback(dc, closedCallback);
rtcSetMessageCallback(dc, messageCallback); rtcSetMessageCallback(dc, messageCallback);
char buffer[256]; peer->dc = dc;
if (rtcGetDataChannelLabel(dc, buffer, 256) >= 0) peer->connected = true;
printf("DataChannel %d: Received with label \"%s\"\n", peer == peer1 ? 1 : 2, buffer);
const char *message = peer == peer1 ? "Hello from 1" : "Hello from 2"; const char *message = peer == peer1 ? "Hello from 1" : "Hello from 2";
rtcSendMessage(peer->dc, message, -1); // negative size indicates a null-terminated string rtcSendMessage(peer->dc, message, -1); // negative size indicates a null-terminated string
@ -173,7 +207,12 @@ int test_capi_connectivity_main() {
goto error; goto error;
// Peer 1: Create data channel // Peer 1: Create data channel
peer1->dc = rtcCreateDataChannel(peer1->pc, "test"); rtcDataChannelInit init;
memset(&init, 0, sizeof(init));
init.protocol = "protocol";
init.reliability.unordered = true;
peer1->dc = rtcCreateDataChannelEx(peer1->pc, "test", &init);
rtcSetOpenCallback(peer1->dc, openCallback); rtcSetOpenCallback(peer1->dc, openCallback);
rtcSetClosedCallback(peer1->dc, closedCallback); rtcSetClosedCallback(peer1->dc, closedCallback);
rtcSetMessageCallback(peer1->dc, messageCallback); rtcSetMessageCallback(peer1->dc, messageCallback);