mirror of
https://github.com/mii443/libdatachannel.git
synced 2025-08-23 15:48:03 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
60d09d5c6f | |||
266159fe41 | |||
458decb12d | |||
635c2e5513 | |||
adc4223617 | |||
326ae27ad1 | |||
6d33d19816 | |||
734efb391a | |||
cba864507f |
@ -1,7 +1,7 @@
|
|||||||
cmake_minimum_required(VERSION 3.7)
|
cmake_minimum_required(VERSION 3.7)
|
||||||
project(libdatachannel
|
project(libdatachannel
|
||||||
DESCRIPTION "WebRTC Data Channels Library"
|
DESCRIPTION "WebRTC Data Channels Library"
|
||||||
VERSION 0.9.2
|
VERSION 0.9.3
|
||||||
LANGUAGES CXX)
|
LANGUAGES CXX)
|
||||||
|
|
||||||
# Options
|
# Options
|
||||||
|
@ -38,6 +38,7 @@ public:
|
|||||||
~Queue();
|
~Queue();
|
||||||
|
|
||||||
void stop();
|
void stop();
|
||||||
|
bool running() const;
|
||||||
bool empty() const;
|
bool empty() const;
|
||||||
bool full() const;
|
bool full() const;
|
||||||
size_t size() const; // elements
|
size_t size() const; // elements
|
||||||
@ -80,6 +81,11 @@ template <typename T> void Queue<T>::stop() {
|
|||||||
mPushCondition.notify_all();
|
mPushCondition.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T> bool Queue<T>::running() const {
|
||||||
|
std::lock_guard lock(mMutex);
|
||||||
|
return !mQueue.empty() || !mStopping;
|
||||||
|
}
|
||||||
|
|
||||||
template <typename T> bool Queue<T>::empty() const {
|
template <typename T> bool Queue<T>::empty() const {
|
||||||
std::lock_guard lock(mMutex);
|
std::lock_guard lock(mMutex);
|
||||||
return mQueue.empty();
|
return mQueue.empty();
|
||||||
|
@ -218,6 +218,8 @@ void DataChannel::incoming(message_ptr message) {
|
|||||||
|
|
||||||
switch (message->type) {
|
switch (message->type) {
|
||||||
case Message::Control: {
|
case Message::Control: {
|
||||||
|
if (message->size() == 0)
|
||||||
|
break; // Ignore
|
||||||
auto raw = reinterpret_cast<const uint8_t *>(message->data());
|
auto raw = reinterpret_cast<const uint8_t *>(message->data());
|
||||||
switch (raw[0]) {
|
switch (raw[0]) {
|
||||||
case MESSAGE_OPEN:
|
case MESSAGE_OPEN:
|
||||||
|
@ -435,7 +435,7 @@ void DtlsTransport::runRecvLoop() {
|
|||||||
|
|
||||||
const size_t bufferSize = maxMtu;
|
const size_t bufferSize = maxMtu;
|
||||||
byte buffer[bufferSize];
|
byte buffer[bufferSize];
|
||||||
while (true) {
|
while (mIncomingQueue.running()) {
|
||||||
// Process pending messages
|
// Process pending messages
|
||||||
while (auto next = mIncomingQueue.tryPop()) {
|
while (auto next = mIncomingQueue.tryPop()) {
|
||||||
message_ptr message = std::move(*next);
|
message_ptr message = std::move(*next);
|
||||||
@ -492,8 +492,7 @@ void DtlsTransport::runRecvLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!mIncomingQueue.wait(duration))
|
mIncomingQueue.wait(duration);
|
||||||
break; // queue is stopped
|
|
||||||
}
|
}
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
PLOG_ERROR << "DTLS recv: " << e.what();
|
PLOG_ERROR << "DTLS recv: " << e.what();
|
||||||
|
@ -102,6 +102,7 @@ void PeerConnection::setLocalDescription() {
|
|||||||
|
|
||||||
if (std::atomic_load(&mIceTransport)) {
|
if (std::atomic_load(&mIceTransport)) {
|
||||||
PLOG_DEBUG << "Local description is already set, ignoring";
|
PLOG_DEBUG << "Local description is already set, ignoring";
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
|
// RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
|
||||||
|
@ -476,8 +476,6 @@ int SctpTransport::handleRecv(struct socket * /*sock*/, union sctp_sockstore /*a
|
|||||||
const byte *data, size_t len, struct sctp_rcvinfo info, int flags) {
|
const byte *data, size_t len, struct sctp_rcvinfo info, int flags) {
|
||||||
try {
|
try {
|
||||||
PLOG_VERBOSE << "Handle recv, len=" << len;
|
PLOG_VERBOSE << "Handle recv, len=" << len;
|
||||||
if (!len)
|
|
||||||
return 0; // Ignore
|
|
||||||
|
|
||||||
// SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB,
|
// SCTP_FRAGMENT_INTERLEAVE does not seem to work as expected for messages > 64KB,
|
||||||
// therefore partial notifications and messages need to be handled separately.
|
// therefore partial notifications and messages need to be handled separately.
|
||||||
@ -497,7 +495,7 @@ int SctpTransport::handleRecv(struct socket * /*sock*/, union sctp_sockstore /*a
|
|||||||
if (flags & MSG_EOR) {
|
if (flags & MSG_EOR) {
|
||||||
// Message is complete, process it
|
// Message is complete, process it
|
||||||
processData(std::move(mPartialMessage), info.rcv_sid,
|
processData(std::move(mPartialMessage), info.rcv_sid,
|
||||||
PayloadId(htonl(info.rcv_ppid)));
|
PayloadId(ntohl(info.rcv_ppid)));
|
||||||
mPartialMessage.clear();
|
mPartialMessage.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -125,6 +125,7 @@ void test_connectivity() {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Wait a bit
|
||||||
int attempts = 10;
|
int attempts = 10;
|
||||||
shared_ptr<DataChannel> adc2;
|
shared_ptr<DataChannel> adc2;
|
||||||
while ((!(adc2 = std::atomic_load(&dc2)) || !adc2->isOpen() || !dc1->isOpen()) && attempts--)
|
while ((!(adc2 = std::atomic_load(&dc2)) || !adc2->isOpen() || !dc1->isOpen()) && attempts--)
|
||||||
@ -146,6 +147,49 @@ void test_connectivity() {
|
|||||||
if (auto addr = pc2->remoteAddress())
|
if (auto addr = pc2->remoteAddress())
|
||||||
cout << "Remote address 2: " << *addr << endl;
|
cout << "Remote address 2: " << *addr << endl;
|
||||||
|
|
||||||
|
// Try to open a second data channel with another label
|
||||||
|
shared_ptr<DataChannel> second2;
|
||||||
|
pc2->onDataChannel([&second2](shared_ptr<DataChannel> dc) {
|
||||||
|
cout << "Second DataChannel 2: Received with label \"" << dc->label() << "\"" << endl;
|
||||||
|
if (dc->label() != "second") {
|
||||||
|
cerr << "Wrong second DataChannel label" << endl;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
dc->onMessage([](variant<binary, string> message) {
|
||||||
|
if (holds_alternative<string>(message)) {
|
||||||
|
cout << "Second Message 2: " << get<string>(message) << endl;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
dc->send("Send hello from 2");
|
||||||
|
|
||||||
|
std::atomic_store(&second2, dc);
|
||||||
|
});
|
||||||
|
|
||||||
|
auto second1 = pc1->createDataChannel("second");
|
||||||
|
second1->onOpen([wsecond1 = make_weak_ptr(dc1)]() {
|
||||||
|
auto second1 = wsecond1.lock();
|
||||||
|
if (!second1)
|
||||||
|
return;
|
||||||
|
|
||||||
|
cout << "Second DataChannel 1: Open" << endl;
|
||||||
|
second1->send("Second hello from 1");
|
||||||
|
});
|
||||||
|
dc1->onMessage([](const variant<binary, string> &message) {
|
||||||
|
if (holds_alternative<string>(message)) {
|
||||||
|
cout << "Second Message 1: " << get<string>(message) << endl;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait a bit
|
||||||
|
attempts = 10;
|
||||||
|
shared_ptr<DataChannel> asecond2;
|
||||||
|
while (
|
||||||
|
(!(asecond2 = std::atomic_load(&second2)) || !asecond2->isOpen() || !second1->isOpen()) &&
|
||||||
|
attempts--)
|
||||||
|
this_thread::sleep_for(1s);
|
||||||
|
|
||||||
// Delay close of peer 2 to check closing works properly
|
// Delay close of peer 2 to check closing works properly
|
||||||
pc1->close();
|
pc1->close();
|
||||||
this_thread::sleep_for(1s);
|
this_thread::sleep_for(1s);
|
||||||
|
Reference in New Issue
Block a user