Compare commits

..

9 Commits

7 changed files with 57 additions and 7 deletions

View File

@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.7)
project(libdatachannel
DESCRIPTION "WebRTC Data Channels Library"
VERSION 0.9.2
VERSION 0.9.3
LANGUAGES CXX)
# Options

View File

@ -38,6 +38,7 @@ public:
~Queue();
void stop();
bool running() const;
bool empty() const;
bool full() const;
size_t size() const; // elements
@ -80,6 +81,11 @@ template <typename T> void Queue<T>::stop() {
mPushCondition.notify_all();
}
template <typename T> bool Queue<T>::running() const {
std::lock_guard lock(mMutex);
return !mQueue.empty() || !mStopping;
}
template <typename T> bool Queue<T>::empty() const {
std::lock_guard lock(mMutex);
return mQueue.empty();

View File

@ -218,6 +218,8 @@ void DataChannel::incoming(message_ptr message) {
switch (message->type) {
case Message::Control: {
if (message->size() == 0)
break; // Ignore
auto raw = reinterpret_cast<const uint8_t *>(message->data());
switch (raw[0]) {
case MESSAGE_OPEN:

View File

@ -435,7 +435,7 @@ void DtlsTransport::runRecvLoop() {
const size_t bufferSize = maxMtu;
byte buffer[bufferSize];
while (true) {
while (mIncomingQueue.running()) {
// Process pending messages
while (auto next = mIncomingQueue.tryPop()) {
message_ptr message = std::move(*next);
@ -492,8 +492,7 @@ void DtlsTransport::runRecvLoop() {
}
}
if (!mIncomingQueue.wait(duration))
break; // queue is stopped
mIncomingQueue.wait(duration);
}
} catch (const std::exception &e) {
PLOG_ERROR << "DTLS recv: " << e.what();

View File

@ -102,6 +102,7 @@ void PeerConnection::setLocalDescription() {
if (std::atomic_load(&mIceTransport)) {
PLOG_DEBUG << "Local description is already set, ignoring";
return;
}
// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of

View File

@ -476,8 +476,6 @@ int SctpTransport::handleRecv(struct socket * /*sock*/, union sctp_sockstore /*a
const byte *data, size_t len, struct sctp_rcvinfo info, int flags) {
try {
PLOG_VERBOSE << "Handle recv, len=" << len;
if (!len)
return 0; // Ignore
// SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB,
// therefore partial notifications and messages need to be handled separately.
@ -497,7 +495,7 @@ int SctpTransport::handleRecv(struct socket * /*sock*/, union sctp_sockstore /*a
if (flags & MSG_EOR) {
// Message is complete, process it
processData(std::move(mPartialMessage), info.rcv_sid,
PayloadId(htonl(info.rcv_ppid)));
PayloadId(ntohl(info.rcv_ppid)));
mPartialMessage.clear();
}
}

View File

@ -125,6 +125,7 @@ void test_connectivity() {
}
});
// Wait a bit
int attempts = 10;
shared_ptr<DataChannel> adc2;
while ((!(adc2 = std::atomic_load(&dc2)) || !adc2->isOpen() || !dc1->isOpen()) && attempts--)
@ -146,6 +147,49 @@ void test_connectivity() {
if (auto addr = pc2->remoteAddress())
cout << "Remote address 2: " << *addr << endl;
// Try to open a second data channel with another label
shared_ptr<DataChannel> second2;
pc2->onDataChannel([&second2](shared_ptr<DataChannel> dc) {
cout << "Second DataChannel 2: Received with label \"" << dc->label() << "\"" << endl;
if (dc->label() != "second") {
cerr << "Wrong second DataChannel label" << endl;
return;
}
dc->onMessage([](variant<binary, string> message) {
if (holds_alternative<string>(message)) {
cout << "Second Message 2: " << get<string>(message) << endl;
}
});
dc->send("Send hello from 2");
std::atomic_store(&second2, dc);
});
auto second1 = pc1->createDataChannel("second");
second1->onOpen([wsecond1 = make_weak_ptr(dc1)]() {
auto second1 = wsecond1.lock();
if (!second1)
return;
cout << "Second DataChannel 1: Open" << endl;
second1->send("Second hello from 1");
});
dc1->onMessage([](const variant<binary, string> &message) {
if (holds_alternative<string>(message)) {
cout << "Second Message 1: " << get<string>(message) << endl;
}
});
// Wait a bit
attempts = 10;
shared_ptr<DataChannel> asecond2;
while (
(!(asecond2 = std::atomic_load(&second2)) || !asecond2->isOpen() || !second1->isOpen()) &&
attempts--)
this_thread::sleep_for(1s);
// Delay close of peer 2 to check closing works properly
pc1->close();
this_thread::sleep_for(1s);