Compare commits

..

15 Commits

Author SHA1 Message Date
ee3bc9694b Bumped version to 0.10.2 2020-11-18 23:39:06 +01:00
0c0ba77de5 Merge pull request #265 from paullouisageneau/optimize-sctp-recv
Schedule SCTP recv task only if necessary
2020-11-18 23:38:49 +01:00
8729e0d2aa Scheduled SCTP recv task only if there is no pending task 2020-11-18 23:26:47 +01:00
90eb610bfe Merge pull request #263 from paullouisageneau/release-rdesc-lock
Prevent holding multiple locks
2020-11-17 23:02:15 +01:00
08ddfa1276 Release remote description lock before passing candidate to transport 2020-11-17 22:41:42 +01:00
87df64a002 Merge pull request #262 from paullouisageneau/fix-duplicate-candidates
Prevent duplicate candidates
2020-11-17 20:40:15 +01:00
5af414d0df Cosmetic fixes in Description 2020-11-17 20:23:59 +01:00
2443c72350 Refactored trimming with util functions 2020-11-17 20:10:47 +01:00
f033e4ab8f Prevent whitspaces at the end of candidates as they confuse libnice 2020-11-17 19:57:51 +01:00
1a6dcdce6f Reordered Candidate getters 2020-11-17 19:28:13 +01:00
100039eba8 Enforce candidates uniqueness in description 2020-11-17 19:23:29 +01:00
e2005c789a Refactored candidate storage and split parsing and resolution 2020-11-17 19:21:48 +01:00
819566b4c1 Merge pull request #261 from paullouisageneau/fix-remote-unordered-flag
Fix remote unordered flag
2020-11-17 00:15:35 +01:00
82caab8906 Added tests for remote protocol and reliability in C API 2020-11-16 23:59:59 +01:00
802516b2db Fixed remote DataChannel unordered flag 2020-11-16 23:59:59 +01:00
11 changed files with 252 additions and 145 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.7) cmake_minimum_required(VERSION 3.7)
project(libdatachannel project(libdatachannel
VERSION 0.10.1 VERSION 0.10.2
LANGUAGES CXX) LANGUAGES CXX)
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library") set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")

View File

@ -40,29 +40,38 @@ public:
enum class ResolveMode { Simple, Lookup }; enum class ResolveMode { Simple, Lookup };
bool resolve(ResolveMode mode = ResolveMode::Simple); bool resolve(ResolveMode mode = ResolveMode::Simple);
Type type() const;
TransportType transportType() const;
uint32_t priority() const;
string candidate() const; string candidate() const;
string mid() const; string mid() const;
operator string() const; operator string() const;
bool operator==(const Candidate &other) const;
bool operator!=(const Candidate &other) const;
bool isResolved() const; bool isResolved() const;
Family family() const; Family family() const;
Type type() const;
TransportType transportType() const;
std::optional<string> address() const; std::optional<string> address() const;
std::optional<uint16_t> port() const; std::optional<uint16_t> port() const;
std::optional<uint32_t> priority() const;
private: private:
string mCandidate; void parse(string candidate);
string mFoundation;
uint32_t mComponent, mPriority;
string mTypeString, mTransportString;
Type mType;
TransportType mTransportType;
string mNode, mService;
string mTail;
std::optional<string> mMid; std::optional<string> mMid;
// Extracted on resolution // Extracted on resolution
Family mFamily; Family mFamily;
Type mType;
TransportType mTransportType;
string mAddress; string mAddress;
uint16_t mPort; uint16_t mPort;
uint32_t mPriority;
}; };
} // namespace rtc } // namespace rtc

View File

@ -53,6 +53,7 @@ public:
void hintType(Type type); void hintType(Type type);
void setFingerprint(string fingerprint); void setFingerprint(string fingerprint);
bool hasCandidate(const Candidate &candidate) const;
void addCandidate(Candidate candidate); void addCandidate(Candidate candidate);
void addCandidates(std::vector<Candidate> candidates); void addCandidates(std::vector<Candidate> candidates);
void endCandidates(); void endCandidates();

View File

@ -20,6 +20,7 @@
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <cctype>
#include <sstream> #include <sstream>
#include <unordered_map> #include <unordered_map>
@ -39,39 +40,44 @@ using std::string;
namespace { namespace {
inline bool hasprefix(const string &str, const string &prefix) { inline bool match_prefix(const string &str, const string &prefix) {
return str.size() >= prefix.size() && return str.size() >= prefix.size() &&
std::mismatch(prefix.begin(), prefix.end(), str.begin()).first == prefix.end(); std::mismatch(prefix.begin(), prefix.end(), str.begin()).first == prefix.end();
} }
inline void trim_begin(string &str) {
str.erase(str.begin(),
std::find_if(str.begin(), str.end(), [](char c) { return !std::isspace(c); }));
}
inline void trim_end(string &str) {
str.erase(
std::find_if(str.rbegin(), str.rend(), [](char c) { return !std::isspace(c); }).base(),
str.end());
}
} // namespace } // namespace
namespace rtc { namespace rtc {
Candidate::Candidate() Candidate::Candidate()
: mFamily(Family::Unresolved), mType(Type::Unknown), mTransportType(TransportType::Unknown), : mFoundation("none"), mComponent(0), mPriority(0), mTypeString("unknown"),
mPort(0), mPriority(0) {} mTransportString("unknown"), mType(Type::Unknown), mTransportType(TransportType::Unknown),
mNode("0.0.0.0"), mService("9"), mFamily(Family::Unresolved), mPort(0) {}
Candidate::Candidate(string candidate) : Candidate() { Candidate::Candidate(string candidate) : Candidate() {
const std::array prefixes{"a=", "candidate:"}; if (!candidate.empty())
for (const string &prefix : prefixes) parse(std::move(candidate));
if (hasprefix(candidate, prefix))
candidate.erase(0, prefix.size());
mCandidate = std::move(candidate);
} }
Candidate::Candidate(string candidate, string mid) : Candidate(std::move(candidate)) { Candidate::Candidate(string candidate, string mid) : Candidate() {
if (!candidate.empty())
parse(std::move(candidate));
if (!mid.empty()) if (!mid.empty())
mMid.emplace(std::move(mid)); mMid.emplace(std::move(mid));
} }
void Candidate::hintMid(string mid) { void Candidate::parse(string candidate) {
if (!mMid)
mMid.emplace(std::move(mid));
}
bool Candidate::resolve(ResolveMode mode) {
using TypeMap_t = std::unordered_map<string, Type>; using TypeMap_t = std::unordered_map<string, Type>;
using TcpTypeMap_t = std::unordered_map<string, TransportType>; using TcpTypeMap_t = std::unordered_map<string, TransportType>;
@ -84,24 +90,23 @@ bool Candidate::resolve(ResolveMode mode) {
{"passive", TransportType::TcpPassive}, {"passive", TransportType::TcpPassive},
{"so", TransportType::TcpSo}}; {"so", TransportType::TcpSo}};
if (mFamily != Family::Unresolved) const std::array prefixes{"a=", "candidate:"};
return true; for (const string &prefix : prefixes)
if (match_prefix(candidate, prefix))
candidate.erase(0, prefix.size());
if (mCandidate.empty()) PLOG_VERBOSE << "Parsing candidate: " << candidate;
throw std::logic_error("Candidate is empty");
PLOG_VERBOSE << "Resolving candidate (mode="
<< (mode == ResolveMode::Simple ? "simple" : "lookup") << "): " << mCandidate;
// See RFC 8445 for format // See RFC 8445 for format
std::istringstream iss(mCandidate); std::istringstream iss(candidate);
int component{0}, priority{0}; string transport, typ_, type;
string foundation, transport, node, service, typ_, type; if (!(iss >> mFoundation >> mComponent >> mTransportString >> mPriority &&
if (iss >> foundation >> component >> transport >> priority && iss >> mNode >> mService >> typ_ >> mTypeString && typ_ == "typ"))
iss >> node >> service >> typ_ >> type && typ_ == "typ") { throw std::invalid_argument("Invalid candidate format");
string left; std::getline(iss, mTail);
std::getline(iss, left); trim_begin(mTail);
trim_end(mTail);
if (auto it = TypeMap.find(type); it != TypeMap.end()) if (auto it = TypeMap.find(type); it != TypeMap.end())
mType = it->second; mType = it->second;
@ -111,7 +116,8 @@ bool Candidate::resolve(ResolveMode mode) {
if (transport == "UDP" || transport == "udp") { if (transport == "UDP" || transport == "udp") {
mTransportType = TransportType::Udp; mTransportType = TransportType::Udp;
} else if (transport == "TCP" || transport == "tcp") { } else if (transport == "TCP" || transport == "tcp") {
std::istringstream iss(left); // Peek tail to find TCP type
std::istringstream iss(mTail);
string tcptype_, tcptype; string tcptype_, tcptype;
if (iss >> tcptype_ >> tcptype && tcptype_ == "tcptype") { if (iss >> tcptype_ >> tcptype && tcptype_ == "tcptype") {
if (auto it = TcpTypeMap.find(tcptype); it != TcpTypeMap.end()) if (auto it = TcpTypeMap.find(tcptype); it != TcpTypeMap.end())
@ -125,8 +131,19 @@ bool Candidate::resolve(ResolveMode mode) {
} else { } else {
mTransportType = TransportType::Unknown; mTransportType = TransportType::Unknown;
} }
}
// Try to resolve the node void Candidate::hintMid(string mid) {
if (!mMid)
mMid.emplace(std::move(mid));
}
bool Candidate::resolve(ResolveMode mode) {
PLOG_VERBOSE << "Resolving candidate (mode="
<< (mode == ResolveMode::Simple ? "simple" : "lookup") << "): " << mNode << ' '
<< mService;
// Try to resolve the node and service
struct addrinfo hints = {}; struct addrinfo hints = {};
hints.ai_family = AF_UNSPEC; hints.ai_family = AF_UNSPEC;
hints.ai_flags = AI_ADDRCONFIG; hints.ai_flags = AI_ADDRCONFIG;
@ -142,10 +159,9 @@ bool Candidate::resolve(ResolveMode mode) {
hints.ai_flags |= AI_NUMERICHOST; hints.ai_flags |= AI_NUMERICHOST;
struct addrinfo *result = nullptr; struct addrinfo *result = nullptr;
if (getaddrinfo(node.c_str(), service.c_str(), &hints, &result) == 0) { if (getaddrinfo(mNode.c_str(), mService.c_str(), &hints, &result) == 0) {
for (auto p = result; p; p = p->ai_next) { for (auto p = result; p; p = p->ai_next) {
if (p->ai_family == AF_INET || p->ai_family == AF_INET6) { if (p->ai_family == AF_INET || p->ai_family == AF_INET6) {
// Rewrite the candidate
char nodebuffer[MAX_NUMERICNODE_LEN]; char nodebuffer[MAX_NUMERICNODE_LEN];
char servbuffer[MAX_NUMERICSERV_LEN]; char servbuffer[MAX_NUMERICSERV_LEN];
if (getnameinfo(p->ai_addr, socklen_t(p->ai_addrlen), nodebuffer, if (getnameinfo(p->ai_addr, socklen_t(p->ai_addrlen), nodebuffer,
@ -155,15 +171,7 @@ bool Candidate::resolve(ResolveMode mode) {
mAddress = nodebuffer; mAddress = nodebuffer;
mPort = uint16_t(std::stoul(servbuffer)); mPort = uint16_t(std::stoul(servbuffer));
mFamily = p->ai_family == AF_INET6 ? Family::Ipv6 : Family::Ipv4; mFamily = p->ai_family == AF_INET6 ? Family::Ipv6 : Family::Ipv4;
PLOG_VERBOSE << "Resolved candidate: " << mAddress << ' ' << mPort;
const char sp{' '};
std::ostringstream oss;
oss << foundation << sp << component << sp << transport << sp << priority;
oss << sp << nodebuffer << sp << servbuffer << sp << "typ" << sp << type;
oss << left;
mCandidate = oss.str();
PLOG_VERBOSE << "Resolved candidate: " << mCandidate;
break; break;
} }
} }
@ -171,12 +179,33 @@ bool Candidate::resolve(ResolveMode mode) {
freeaddrinfo(result); freeaddrinfo(result);
} }
}
return mFamily != Family::Unresolved; return mFamily != Family::Unresolved;
} }
string Candidate::candidate() const { return "candidate:" + mCandidate; } Candidate::Type Candidate::type() const { return mType; }
Candidate::TransportType Candidate::transportType() const { return mTransportType; }
uint32_t Candidate::priority() const { return mPriority; }
string Candidate::candidate() const {
const char sp{' '};
std::ostringstream oss;
oss << "candidate:";
oss << mFoundation << sp << mComponent << sp << mTransportString << sp << mPriority << sp;
if (isResolved())
oss << mAddress << sp << mPort;
else
oss << mNode << sp << mService;
oss << sp << "typ" << sp << mTypeString;
if (!mTail.empty())
oss << sp << mTail;
return oss.str();
}
string Candidate::mid() const { return mMid.value_or("0"); } string Candidate::mid() const { return mMid.value_or("0"); }
@ -186,14 +215,18 @@ Candidate::operator string() const {
return line.str(); return line.str();
} }
bool Candidate::operator==(const Candidate &other) const {
return mFoundation == other.mFoundation;
}
bool Candidate::operator!=(const Candidate &other) const {
return mFoundation != other.mFoundation;
}
bool Candidate::isResolved() const { return mFamily != Family::Unresolved; } bool Candidate::isResolved() const { return mFamily != Family::Unresolved; }
Candidate::Family Candidate::family() const { return mFamily; } Candidate::Family Candidate::family() const { return mFamily; }
Candidate::Type Candidate::type() const { return mType; }
Candidate::TransportType Candidate::transportType() const { return mTransportType; }
std::optional<string> Candidate::address() const { std::optional<string> Candidate::address() const {
return isResolved() ? std::make_optional(mAddress) : nullopt; return isResolved() ? std::make_optional(mAddress) : nullopt;
} }
@ -202,10 +235,6 @@ std::optional<uint16_t> Candidate::port() const {
return isResolved() ? std::make_optional(mPort) : nullopt; return isResolved() ? std::make_optional(mPort) : nullopt;
} }
std::optional<uint32_t> Candidate::priority() const {
return isResolved() ? std::make_optional(mPriority) : nullopt;
}
} // namespace rtc } // namespace rtc
std::ostream &operator<<(std::ostream &out, const rtc::Candidate &candidate) { std::ostream &operator<<(std::ostream &out, const rtc::Candidate &candidate) {
@ -217,11 +246,11 @@ std::ostream &operator<<(std::ostream &out, const rtc::Candidate::Type &type) {
case rtc::Candidate::Type::Host: case rtc::Candidate::Type::Host:
return out << "host"; return out << "host";
case rtc::Candidate::Type::PeerReflexive: case rtc::Candidate::Type::PeerReflexive:
return out << "peer_reflexive"; return out << "prflx";
case rtc::Candidate::Type::ServerReflexive: case rtc::Candidate::Type::ServerReflexive:
return out << "server_reflexive"; return out << "srflx";
case rtc::Candidate::Type::Relayed: case rtc::Candidate::Type::Relayed:
return out << "relayed"; return out << "relay";
default: default:
return out << "unknown"; return out << "unknown";
} }

View File

@ -306,7 +306,7 @@ void NegociatedDataChannel::processOpenMessage(message_ptr message) {
mLabel.assign(end, open.labelLength); mLabel.assign(end, open.labelLength);
mProtocol.assign(end + open.labelLength, open.protocolLength); mProtocol.assign(end + open.labelLength, open.protocolLength);
mReliability->unordered = (open.reliabilityParameter & 0x80) != 0; mReliability->unordered = (open.channelType & 0x80) != 0;
switch (open.channelType & 0x7F) { switch (open.channelType & 0x7F) {
case CHANNEL_PARTIAL_RELIABLE_REXMIT: case CHANNEL_PARTIAL_RELIABLE_REXMIT:
mReliability->type = Reliability::Type::Rexmit; mReliability->type = Reliability::Type::Rexmit;

View File

@ -171,16 +171,27 @@ void Description::setFingerprint(string fingerprint) {
mFingerprint.emplace(std::move(fingerprint)); mFingerprint.emplace(std::move(fingerprint));
} }
bool Description::hasCandidate(const Candidate &candidate) const {
for (const Candidate &other : mCandidates)
if (candidate == other)
return true;
return false;
}
void Description::addCandidate(Candidate candidate) { void Description::addCandidate(Candidate candidate) {
candidate.hintMid(bundleMid()); candidate.hintMid(bundleMid());
for (const Candidate &other : mCandidates)
if (candidate == other)
return;
mCandidates.emplace_back(std::move(candidate)); mCandidates.emplace_back(std::move(candidate));
} }
void Description::addCandidates(std::vector<Candidate> candidates) { void Description::addCandidates(std::vector<Candidate> candidates) {
for (Candidate candidate : candidates) { for (Candidate candidate : candidates)
candidate.hintMid(bundleMid()); addCandidate(std::move(candidate));
mCandidates.emplace_back(std::move(candidate));
}
} }
void Description::endCandidates() { mEnded = true; } void Description::endCandidates() { mEnded = true; }

View File

@ -564,12 +564,14 @@ bool IceTransport::addRemoteCandidate(const Candidate &candidate) {
return false; return false;
// Warning: the candidate string must start with "a=candidate:" and it must not end with a // Warning: the candidate string must start with "a=candidate:" and it must not end with a
// newline, else libnice will reject it. // newline or whitespace, else libnice will reject it.
string sdp(candidate); string sdp(candidate);
NiceCandidate *cand = NiceCandidate *cand =
nice_agent_parse_remote_candidate_sdp(mNiceAgent.get(), mStreamId, sdp.c_str()); nice_agent_parse_remote_candidate_sdp(mNiceAgent.get(), mStreamId, sdp.c_str());
if (!cand) if (!cand) {
PLOG_WARNING << "Rejected ICE candidate: " << sdp;
return false; return false;
}
GSList *list = g_slist_append(nullptr, cand); GSList *list = g_slist_append(nullptr, cand);
int ret = nice_agent_set_remote_candidates(mNiceAgent.get(), mStreamId, 1, list); int ret = nice_agent_set_remote_candidates(mNiceAgent.get(), mStreamId, 1, list);

View File

@ -266,10 +266,6 @@ void PeerConnection::setRemoteDescription(Description description) {
// Candidates will be added at the end, extract them for now // Candidates will be added at the end, extract them for now
auto remoteCandidates = description.extractCandidates(); auto remoteCandidates = description.extractCandidates();
auto type = description.type(); auto type = description.type();
auto iceTransport = initIceTransport();
iceTransport->setRemoteDescription(description);
processRemoteDescription(std::move(description)); processRemoteDescription(std::move(description));
changeSignalingState(newSignalingState); changeSignalingState(newSignalingState);
@ -279,8 +275,9 @@ void PeerConnection::setRemoteDescription(Description description) {
setLocalDescription(Description::Type::Answer); setLocalDescription(Description::Type::Answer);
} else { } else {
// This is an answer // This is an answer
auto iceTransport = std::atomic_load(&mIceTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport); auto sctpTransport = std::atomic_load(&mSctpTransport);
if (!sctpTransport && iceTransport->role() == Description::Role::Active) { if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
// Since we assumed passive role during DataChannel creation, we need to shift the // Since we assumed passive role during DataChannel creation, we need to shift the
// stream numbers by one to shift them from odd to even. // stream numbers by one to shift them from odd to even.
std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
@ -1079,7 +1076,7 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
if (!mLocalDescription) if (!mLocalDescription)
throw std::logic_error("Got a local candidate without local description"); throw std::logic_error("Got a local candidate without local description");
candidate.resolve(Candidate::ResolveMode::Simple); // for proper SDP generation later candidate.resolve(Candidate::ResolveMode::Simple);
mLocalDescription->addCandidate(candidate); mLocalDescription->addCandidate(candidate);
PLOG_VERBOSE << "Issuing local candidate: " << candidate; PLOG_VERBOSE << "Issuing local candidate: " << candidate;
@ -1095,10 +1092,13 @@ void PeerConnection::processRemoteDescription(Description description) {
if (mRemoteDescription) if (mRemoteDescription)
existingCandidates = mRemoteDescription->extractCandidates(); existingCandidates = mRemoteDescription->extractCandidates();
mRemoteDescription.emplace(std::move(description)); mRemoteDescription.emplace(description);
mRemoteDescription->addCandidates(std::move(existingCandidates)); mRemoteDescription->addCandidates(std::move(existingCandidates));
} }
auto iceTransport = initIceTransport();
iceTransport->setRemoteDescription(std::move(description));
if (description.hasApplication()) { if (description.hasApplication()) {
auto dtlsTransport = std::atomic_load(&mDtlsTransport); auto dtlsTransport = std::atomic_load(&mDtlsTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport); auto sctpTransport = std::atomic_load(&mSctpTransport);
@ -1109,28 +1109,40 @@ void PeerConnection::processRemoteDescription(Description description) {
} }
void PeerConnection::processRemoteCandidate(Candidate candidate) { void PeerConnection::processRemoteCandidate(Candidate candidate) {
std::lock_guard lock(mRemoteDescriptionMutex);
auto iceTransport = std::atomic_load(&mIceTransport); auto iceTransport = std::atomic_load(&mIceTransport);
if (!mRemoteDescription || !iceTransport) {
// Set as remote candidate
std::lock_guard lock(mRemoteDescriptionMutex);
if (!mRemoteDescription)
throw std::logic_error("Got a remote candidate without remote description"); throw std::logic_error("Got a remote candidate without remote description");
if (!iceTransport)
throw std::logic_error("Got a remote candidate without ICE transport");
candidate.hintMid(mRemoteDescription->bundleMid()); candidate.hintMid(mRemoteDescription->bundleMid());
if (candidate.resolve(Candidate::ResolveMode::Simple)) { if (mRemoteDescription->hasCandidate(candidate))
iceTransport->addRemoteCandidate(candidate); return; // already in description, ignore
candidate.resolve(Candidate::ResolveMode::Simple);
mRemoteDescription->addCandidate(candidate);
}
if (candidate.isResolved()) {
iceTransport->addRemoteCandidate(std::move(candidate));
} else { } else {
// OK, we might need a lookup, do it asynchronously // We might need a lookup, do it asynchronously
// We don't use the thread pool because we have no control on the timeout // We don't use the thread pool because we have no control on the timeout
if (auto iceTransport = std::atomic_load(&mIceTransport)) {
weak_ptr<IceTransport> weakIceTransport{iceTransport}; weak_ptr<IceTransport> weakIceTransport{iceTransport};
std::thread t([weakIceTransport, candidate]() mutable { std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
if (candidate.resolve(Candidate::ResolveMode::Lookup)) if (candidate.resolve(Candidate::ResolveMode::Lookup))
if (auto iceTransport = weakIceTransport.lock()) if (auto iceTransport = weakIceTransport.lock())
iceTransport->addRemoteCandidate(candidate); iceTransport->addRemoteCandidate(std::move(candidate));
}); });
t.detach(); t.detach();
} }
}
mRemoteDescription->addCandidate(std::move(candidate));
} }
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) { void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {

View File

@ -88,7 +88,7 @@ void SctpTransport::Cleanup() {
SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
message_callback recvCallback, amount_callback bufferedAmountCallback, message_callback recvCallback, amount_callback bufferedAmountCallback,
state_callback stateChangeCallback) state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mPort(port), : Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) { mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
onRecv(recvCallback); onRecv(recvCallback);
@ -327,6 +327,7 @@ void SctpTransport::incoming(message_ptr message) {
void SctpTransport::doRecv() { void SctpTransport::doRecv() {
std::lock_guard lock(mRecvMutex); std::lock_guard lock(mRecvMutex);
--mPendingRecvCount;
try { try {
while (true) { while (true) {
const size_t bufferSize = 65536; const size_t bufferSize = 65536;
@ -539,8 +540,10 @@ void SctpTransport::handleUpcall() {
int events = usrsctp_get_events(mSock); int events = usrsctp_get_events(mSock);
if (events & SCTP_EVENT_READ) if (events & SCTP_EVENT_READ && mPendingRecvCount == 0) {
++mPendingRecvCount;
mProcessor.enqueue(&SctpTransport::doRecv, this); mProcessor.enqueue(&SctpTransport::doRecv, this);
}
if (events & SCTP_EVENT_WRITE) if (events & SCTP_EVENT_WRITE)
mProcessor.enqueue(&SctpTransport::safeFlush, this); mProcessor.enqueue(&SctpTransport::safeFlush, this);

View File

@ -95,6 +95,7 @@ private:
struct socket *mSock; struct socket *mSock;
Processor mProcessor; Processor mProcessor;
std::atomic<int> mPendingRecvCount;
std::mutex mRecvMutex, mSendMutex; std::mutex mRecvMutex, mSendMutex;
Queue<message_ptr> mSendQueue; Queue<message_ptr> mSendQueue;
std::map<uint16_t, size_t> mBufferedAmount; std::map<uint16_t, size_t> mBufferedAmount;

View File

@ -100,14 +100,48 @@ static void RTC_API messageCallback(int id, const char *message, int size, void
static void RTC_API dataChannelCallback(int pc, int dc, void *ptr) { static void RTC_API dataChannelCallback(int pc, int dc, void *ptr) {
Peer *peer = (Peer *)ptr; Peer *peer = (Peer *)ptr;
peer->dc = dc;
peer->connected = true; char label[256];
if (rtcGetDataChannelLabel(dc, label, 256) < 0) {
fprintf(stderr, "rtcGetDataChannelLabel failed\n");
return;
}
char protocol[256];
if (rtcGetDataChannelProtocol(dc, protocol, 256) < 0) {
fprintf(stderr, "rtcGetDataChannelProtocol failed\n");
return;
}
rtcReliability reliability;
if (rtcGetDataChannelReliability(dc, &reliability) < 0) {
fprintf(stderr, "rtcGetDataChannelReliability failed\n");
return;
}
printf("DataChannel %d: Received with label \"%s\" and protocol \"%s\"\n",
peer == peer1 ? 1 : 2, label, protocol);
if (strcmp(label, "test") != 0) {
fprintf(stderr, "Wrong DataChannel label\n");
return;
}
if (strcmp(protocol, "protocol") != 0) {
fprintf(stderr, "Wrong DataChannel protocol\n");
return;
}
if (reliability.unordered == false) {
fprintf(stderr, "Wrong DataChannel reliability\n");
return;
}
rtcSetClosedCallback(dc, closedCallback); rtcSetClosedCallback(dc, closedCallback);
rtcSetMessageCallback(dc, messageCallback); rtcSetMessageCallback(dc, messageCallback);
char buffer[256]; peer->dc = dc;
if (rtcGetDataChannelLabel(dc, buffer, 256) >= 0) peer->connected = true;
printf("DataChannel %d: Received with label \"%s\"\n", peer == peer1 ? 1 : 2, buffer);
const char *message = peer == peer1 ? "Hello from 1" : "Hello from 2"; const char *message = peer == peer1 ? "Hello from 1" : "Hello from 2";
rtcSendMessage(peer->dc, message, -1); // negative size indicates a null-terminated string rtcSendMessage(peer->dc, message, -1); // negative size indicates a null-terminated string
@ -173,7 +207,12 @@ int test_capi_connectivity_main() {
goto error; goto error;
// Peer 1: Create data channel // Peer 1: Create data channel
peer1->dc = rtcCreateDataChannel(peer1->pc, "test"); rtcDataChannelInit init;
memset(&init, 0, sizeof(init));
init.protocol = "protocol";
init.reliability.unordered = true;
peer1->dc = rtcCreateDataChannelEx(peer1->pc, "test", &init);
rtcSetOpenCallback(peer1->dc, openCallback); rtcSetOpenCallback(peer1->dc, openCallback);
rtcSetClosedCallback(peer1->dc, closedCallback); rtcSetClosedCallback(peer1->dc, closedCallback);
rtcSetMessageCallback(peer1->dc, messageCallback); rtcSetMessageCallback(peer1->dc, messageCallback);