Compare commits

...

17 Commits

Author SHA1 Message Date
38db6d7365 Bumped version to 0.11.10 2021-03-10 18:59:14 +01:00
781d864b9f Added missing atomic 2021-03-10 18:59:12 +01:00
8cbcb35bf4 Fixed incorrect scope_guard 2021-03-10 18:58:15 +01:00
b63ec9cead Bumped version to 0.11.9 2021-03-08 13:07:58 +01:00
aa6f87f467 Removed rtc::Cleanup() call in each test 2021-03-08 13:07:58 +01:00
eec7a761e8 Replaced incorrect reinterpret_pointer_cast by dynamic_pointer_cast 2021-03-08 13:02:25 +01:00
125edff298 Bumped version to 0.11.8 2021-03-07 20:06:02 +01:00
0813976a5a Use SCTP default congestion control instead of H-TCP 2021-03-07 20:05:18 +01:00
4642504b83 Merge pull request #359 from paullouisageneau/no-nrsack
Do not enable SCTP NR-SACKs
2021-03-06 09:08:47 +01:00
5b760532c2 Do not enable SCTP NR-SACKs 2021-03-05 20:51:09 +01:00
69bcdade50 Merge pull request #358 from paullouisageneau/sctp-limit-flush-scheduling
Limit scheduling of flush tasks in SCTP transport
2021-03-05 20:47:08 +01:00
bd3df48c0b Limit scheduling of flush tasks in SCTP transport 2021-03-05 18:50:10 +01:00
faf3158609 Merge pull request #356 from paullouisageneau/fix-threadpool-workers-access
Fix unsynchronized access in thread pool
2021-03-05 12:20:05 +01:00
b766be1880 Fixed unsynchronized access to mWorkers in ThreadPool 2021-03-05 12:10:29 +01:00
b3edcfa05c Bumped version to 0.11.7 2021-03-04 12:08:43 +01:00
19e148363c Merge pull request #353 from paullouisageneau/fix-buffered-amount-callback
Fix buffered amount callback synchronization
2021-03-04 12:05:11 +01:00
7f6f178177 Fixed buffered amount callback synchronization 2021-03-03 19:27:54 +01:00
13 changed files with 82 additions and 92 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.7)
project(libdatachannel
VERSION 0.11.6
VERSION 0.11.10
LANGUAGES CXX)
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")

View File

@ -37,17 +37,6 @@
#include <set>
#include <thread>
#if __clang__ && defined(__APPLE__)
namespace {
template <typename To, typename From>
inline std::shared_ptr<To> reinterpret_pointer_cast(std::shared_ptr<From> const &ptr) noexcept {
return std::shared_ptr<To>(ptr, reinterpret_cast<To *>(ptr.get()));
}
} // namespace
#else
using std::reinterpret_pointer_cast;
#endif
static rtc::LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
"Number of RTP packets truncated over past second");
static rtc::LogCounter
@ -701,7 +690,7 @@ void PeerConnection::forwardMedia(message_ptr message) {
std::set<uint32_t> ssrcs;
size_t offset = 0;
while ((sizeof(rtc::RTCP_HEADER) + offset) <= message->size()) {
auto header = reinterpret_cast<rtc::RTCP_HEADER *>(message->data() + offset);
auto header = reinterpret_cast<RTCP_HEADER *>(message->data() + offset);
if (header->lengthInBytes() > message->size() - offset) {
COUNTER_MEDIA_TRUNCATED++;
break;
@ -923,13 +912,14 @@ void PeerConnection::incomingTrack(Description::Media description) {
void PeerConnection::openTracks() {
#if RTC_ENABLE_MEDIA
if (auto transport = std::atomic_load(&mDtlsTransport)) {
auto srtpTransport = reinterpret_pointer_cast<DtlsSrtpTransport>(transport);
if (auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport)) {
std::shared_lock lock(mTracksMutex); // read-only
for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
if (auto track = it->second.lock())
if (!track->isOpen())
track->open(srtpTransport);
}
}
#endif
}

View File

@ -81,11 +81,12 @@ void SctpTransport::Init() {
usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
// Change congestion control from the default TCP Reno (RFC 2581) to H-TCP
usrsctp_sysctl_set_sctp_default_cc_module(SCTP_CC_HTCP);
// Use default congestion control (RFC 4960)
// See https://github.com/paullouisageneau/libdatachannel/issues/354
usrsctp_sysctl_set_sctp_default_cc_module(0);
// Enable Non-Renegable Selective Acknowledgments (NR-SACKs)
usrsctp_sysctl_set_sctp_nrsack_enable(1);
// Enable Partial Reliability Extension (RFC 3758)
usrsctp_sysctl_set_sctp_pr_enable(1);
// Increase the initial window size to 10 MTUs (RFC 6928)
usrsctp_sysctl_set_sctp_initial_cwnd(10);
@ -103,7 +104,7 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
std::optional<size_t> mtu, message_callback recvCallback,
amount_callback bufferedAmountCallback,
state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
: Transport(lower, std::move(stateChangeCallback)), mPort(port),
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
onRecv(recvCallback);
@ -259,7 +260,7 @@ bool SctpTransport::stop() {
return false;
mSendQueue.stop();
safeFlush();
flush();
shutdown();
onRecv(nullptr);
return true;
@ -333,13 +334,20 @@ bool SctpTransport::send(message_ptr message) {
return false;
}
void SctpTransport::closeStream(unsigned int stream) {
send(make_message(0, Message::Reset, uint16_t(stream)));
}
void SctpTransport::flush() {
bool SctpTransport::flush() {
try {
std::lock_guard lock(mSendMutex);
trySendQueue();
return true;
} catch (const std::exception &e) {
PLOG_WARNING << "SCTP flush: " << e.what();
return false;
}
}
void SctpTransport::closeStream(unsigned int stream) {
send(make_message(0, Message::Reset, uint16_t(stream)));
}
void SctpTransport::incoming(message_ptr message) {
@ -427,6 +435,16 @@ void SctpTransport::doRecv() {
}
}
void SctpTransport::doFlush() {
std::lock_guard lock(mSendMutex);
--mPendingFlushCount;
try {
trySendQueue();
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
bool SctpTransport::trySendQueue() {
// Requires mSendMutex to be locked
while (auto next = mSendQueue.peek()) {
@ -533,13 +551,16 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
else
it->second = amount;
mSendMutex.unlock();
// Synchronously call the buffered amount callback
triggerBufferedAmount(streamId, amount);
}
void SctpTransport::triggerBufferedAmount(uint16_t streamId, size_t amount) {
try {
mBufferedAmountCallback(streamId, amount);
} catch (const std::exception &e) {
PLOG_DEBUG << "SCTP buffered amount callback: " << e.what();
PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
}
mSendMutex.lock();
}
void SctpTransport::sendReset(uint16_t streamId) {
@ -569,17 +590,6 @@ void SctpTransport::sendReset(uint16_t streamId) {
}
}
bool SctpTransport::safeFlush() {
try {
flush();
return true;
} catch (const std::exception &e) {
PLOG_WARNING << "SCTP flush: " << e.what();
return false;
}
}
void SctpTransport::handleUpcall() {
if (!mSock)
return;
@ -593,8 +603,10 @@ void SctpTransport::handleUpcall() {
mProcessor.enqueue(&SctpTransport::doRecv, this);
}
if (events & SCTP_EVENT_WRITE)
mProcessor.enqueue(&SctpTransport::safeFlush, this);
if (events & SCTP_EVENT_WRITE && mPendingFlushCount == 0) {
++mPendingFlushCount;
mProcessor.enqueue(&SctpTransport::doFlush, this);
}
}
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
@ -709,7 +721,7 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
PLOG_VERBOSE << "SCTP dry event";
// It should not be necessary since the send callback should have been called already,
// but to be sure, let's try to send now.
safeFlush();
flush();
break;
}

View File

@ -51,8 +51,8 @@ public:
void start() override;
bool stop() override;
bool send(message_ptr message) override; // false if buffered
bool flush();
void closeStream(unsigned int stream);
void flush();
// Stats
void clearStats();
@ -80,11 +80,12 @@ private:
bool outgoing(message_ptr message) override;
void doRecv();
void doFlush();
bool trySendQueue();
bool trySendMessage(message_ptr message);
void updateBufferedAmount(uint16_t streamId, long delta);
void triggerBufferedAmount(uint16_t streamId, size_t amount);
void sendReset(uint16_t streamId);
bool safeFlush();
void handleUpcall();
int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df);
@ -96,8 +97,10 @@ private:
struct socket *mSock;
Processor mProcessor;
std::atomic<int> mPendingRecvCount;
std::mutex mRecvMutex, mSendMutex;
std::atomic<int> mPendingRecvCount = 0;
std::atomic<int> mPendingFlushCount = 0;
std::mutex mRecvMutex;
std::recursive_mutex mSendMutex; // buffered amount callback is synchronous
Queue<message_ptr> mSendQueue;
std::map<uint16_t, size_t> mBufferedAmount;
amount_callback mBufferedAmountCallback;

View File

@ -51,7 +51,7 @@ void ThreadPool::spawn(int count) {
void ThreadPool::join() {
{
std::unique_lock lock(mMutex);
mWaitingCondition.wait(lock, [&]() { return mWaitingWorkers == int(mWorkers.size()); });
mWaitingCondition.wait(lock, [&]() { return mBusyWorkers == 0; });
mJoining = true;
mTasksCondition.notify_all();
}
@ -66,6 +66,8 @@ void ThreadPool::join() {
}
void ThreadPool::run() {
++mBusyWorkers;
scope_guard guard([&]() { --mBusyWorkers; });
while (runOne()) {
}
}
@ -81,24 +83,23 @@ bool ThreadPool::runOne() {
std::function<void()> ThreadPool::dequeue() {
std::unique_lock lock(mMutex);
while (!mJoining) {
std::optional<clock::time_point> time;
if (!mTasks.empty()) {
if (mTasks.top().time <= clock::now()) {
time = mTasks.top().time;
if (*time <= clock::now()) {
auto func = std::move(mTasks.top().func);
mTasks.pop();
return func;
}
++mWaitingWorkers;
mWaitingCondition.notify_all();
mTasksCondition.wait_until(lock, mTasks.top().time);
} else {
++mWaitingWorkers;
mWaitingCondition.notify_all();
mTasksCondition.wait(lock);
}
--mWaitingWorkers;
--mBusyWorkers;
scope_guard guard([&]() { ++mBusyWorkers; });
mWaitingCondition.notify_all();
if(time)
mTasksCondition.wait_until(lock, *time);
else
mTasksCondition.wait(lock);
}
return nullptr;
}

View File

@ -72,7 +72,7 @@ protected:
std::function<void()> dequeue(); // returns null function if joining
std::vector<std::thread> mWorkers;
int mWaitingWorkers = 0;
std::atomic<int> mBusyWorkers = 0;
std::atomic<bool> mJoining = false;
struct Task {

View File

@ -127,9 +127,13 @@ size_t benchmark(milliseconds duration) {
openTime = steady_clock::now();
cout << "DataChannel open, sending data..." << endl;
try {
while (dc1->bufferedAmount() == 0) {
dc1->send(messageData);
}
} catch (const std::exception &e) {
std::cout << "Send failed: " << e.what() << std::endl;
}
// When sent data is buffered in the DataChannel,
// wait for onBufferedAmountLow callback to continue
@ -141,9 +145,13 @@ size_t benchmark(milliseconds duration) {
return;
// Continue sending
while (dc1->bufferedAmount() == 0) {
try {
while (dc1->isOpen() && dc1->bufferedAmount() == 0) {
dc1->send(messageData);
}
} catch (const std::exception &e) {
std::cout << "Send failed: " << e.what() << std::endl;
}
});
const int steps = 10;

View File

@ -326,10 +326,6 @@ int test_capi_connectivity_main() {
deletePeer(peer2);
sleep(1);
// You may call rtcCleanup() when finished to free static resources
rtcCleanup();
sleep(1);
printf("Success\n");
return 0;

View File

@ -177,10 +177,6 @@ int test_capi_track_main() {
deletePeer(peer2);
sleep(1);
// You may call rtcCleanup() when finished to free static resources
rtcCleanup();
sleep(1);
printf("Success\n");
return 0;

View File

@ -251,9 +251,5 @@ void test_connectivity() {
pc2->close();
this_thread::sleep_for(1s);
// You may call rtc::Cleanup() when finished to free static resources
rtc::Cleanup();
this_thread::sleep_for(1s);
cout << "Success" << endl;
}

View File

@ -143,9 +143,5 @@ void test_track() {
pc2->close();
this_thread::sleep_for(1s);
// You may call rtc::Cleanup() when finished to free static resources
rtc::Cleanup();
this_thread::sleep_for(1s);
cout << "Success" << endl;
}

View File

@ -258,9 +258,5 @@ void test_turn_connectivity() {
pc2->close();
this_thread::sleep_for(1s);
// You may call rtc::Cleanup() when finished to free static resources
rtc::Cleanup();
this_thread::sleep_for(1s);
cout << "Success" << endl;
}

View File

@ -78,10 +78,6 @@ void test_websocket() {
ws->close();
this_thread::sleep_for(1s);
// You may call rtc::Cleanup() when finished to free static resources
rtc::Cleanup();
this_thread::sleep_for(1s);
cout << "Success" << endl;
}