Compare commits

...

8 Commits

6 changed files with 65 additions and 46 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.7) cmake_minimum_required(VERSION 3.7)
project(libdatachannel project(libdatachannel
VERSION 0.11.7 VERSION 0.11.8
LANGUAGES CXX) LANGUAGES CXX)
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library") set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")

View File

@ -81,11 +81,12 @@ void SctpTransport::Init() {
usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024); usrsctp_sysctl_set_sctp_max_chunks_on_queue(10 * 1024);
// Change congestion control from the default TCP Reno (RFC 2581) to H-TCP // Use default congestion control (RFC 4960)
usrsctp_sysctl_set_sctp_default_cc_module(SCTP_CC_HTCP); // See https://github.com/paullouisageneau/libdatachannel/issues/354
usrsctp_sysctl_set_sctp_default_cc_module(0);
// Enable Non-Renegable Selective Acknowledgments (NR-SACKs) // Enable Partial Reliability Extension (RFC 3758)
usrsctp_sysctl_set_sctp_nrsack_enable(1); usrsctp_sysctl_set_sctp_pr_enable(1);
// Increase the initial window size to 10 MTUs (RFC 6928) // Increase the initial window size to 10 MTUs (RFC 6928)
usrsctp_sysctl_set_sctp_initial_cwnd(10); usrsctp_sysctl_set_sctp_initial_cwnd(10);
@ -103,7 +104,7 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
std::optional<size_t> mtu, message_callback recvCallback, std::optional<size_t> mtu, message_callback recvCallback,
amount_callback bufferedAmountCallback, amount_callback bufferedAmountCallback,
state_callback stateChangeCallback) state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0), : Transport(lower, std::move(stateChangeCallback)), mPort(port),
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) { mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
onRecv(recvCallback); onRecv(recvCallback);
@ -259,7 +260,7 @@ bool SctpTransport::stop() {
return false; return false;
mSendQueue.stop(); mSendQueue.stop();
safeFlush(); flush();
shutdown(); shutdown();
onRecv(nullptr); onRecv(nullptr);
return true; return true;
@ -333,13 +334,20 @@ bool SctpTransport::send(message_ptr message) {
return false; return false;
} }
void SctpTransport::closeStream(unsigned int stream) { bool SctpTransport::flush() {
send(make_message(0, Message::Reset, uint16_t(stream))); try {
std::lock_guard lock(mSendMutex);
trySendQueue();
return true;
} catch (const std::exception &e) {
PLOG_WARNING << "SCTP flush: " << e.what();
return false;
}
} }
void SctpTransport::flush() { void SctpTransport::closeStream(unsigned int stream) {
std::lock_guard lock(mSendMutex); send(make_message(0, Message::Reset, uint16_t(stream)));
trySendQueue();
} }
void SctpTransport::incoming(message_ptr message) { void SctpTransport::incoming(message_ptr message) {
@ -427,6 +435,16 @@ void SctpTransport::doRecv() {
} }
} }
void SctpTransport::doFlush() {
std::lock_guard lock(mSendMutex);
--mPendingFlushCount;
try {
trySendQueue();
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
bool SctpTransport::trySendQueue() { bool SctpTransport::trySendQueue() {
// Requires mSendMutex to be locked // Requires mSendMutex to be locked
while (auto next = mSendQueue.peek()) { while (auto next = mSendQueue.peek()) {
@ -572,17 +590,6 @@ void SctpTransport::sendReset(uint16_t streamId) {
} }
} }
bool SctpTransport::safeFlush() {
try {
flush();
return true;
} catch (const std::exception &e) {
PLOG_WARNING << "SCTP flush: " << e.what();
return false;
}
}
void SctpTransport::handleUpcall() { void SctpTransport::handleUpcall() {
if (!mSock) if (!mSock)
return; return;
@ -596,8 +603,10 @@ void SctpTransport::handleUpcall() {
mProcessor.enqueue(&SctpTransport::doRecv, this); mProcessor.enqueue(&SctpTransport::doRecv, this);
} }
if (events & SCTP_EVENT_WRITE) if (events & SCTP_EVENT_WRITE && mPendingFlushCount == 0) {
mProcessor.enqueue(&SctpTransport::safeFlush, this); ++mPendingFlushCount;
mProcessor.enqueue(&SctpTransport::doFlush, this);
}
} }
int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) { int SctpTransport::handleWrite(byte *data, size_t len, uint8_t /*tos*/, uint8_t /*set_df*/) {
@ -712,7 +721,7 @@ void SctpTransport::processNotification(const union sctp_notification *notify, s
PLOG_VERBOSE << "SCTP dry event"; PLOG_VERBOSE << "SCTP dry event";
// It should not be necessary since the send callback should have been called already, // It should not be necessary since the send callback should have been called already,
// but to be sure, let's try to send now. // but to be sure, let's try to send now.
safeFlush(); flush();
break; break;
} }

View File

@ -51,8 +51,8 @@ public:
void start() override; void start() override;
bool stop() override; bool stop() override;
bool send(message_ptr message) override; // false if buffered bool send(message_ptr message) override; // false if buffered
bool flush();
void closeStream(unsigned int stream); void closeStream(unsigned int stream);
void flush();
// Stats // Stats
void clearStats(); void clearStats();
@ -80,12 +80,12 @@ private:
bool outgoing(message_ptr message) override; bool outgoing(message_ptr message) override;
void doRecv(); void doRecv();
void doFlush();
bool trySendQueue(); bool trySendQueue();
bool trySendMessage(message_ptr message); bool trySendMessage(message_ptr message);
void updateBufferedAmount(uint16_t streamId, long delta); void updateBufferedAmount(uint16_t streamId, long delta);
void triggerBufferedAmount(uint16_t streamId, size_t amount); void triggerBufferedAmount(uint16_t streamId, size_t amount);
void sendReset(uint16_t streamId); void sendReset(uint16_t streamId);
bool safeFlush();
void handleUpcall(); void handleUpcall();
int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df); int handleWrite(byte *data, size_t len, uint8_t tos, uint8_t set_df);
@ -97,7 +97,8 @@ private:
struct socket *mSock; struct socket *mSock;
Processor mProcessor; Processor mProcessor;
std::atomic<int> mPendingRecvCount; std::atomic<int> mPendingRecvCount = 0;
std::atomic<int> mPendingFlushCount = 0;
std::mutex mRecvMutex; std::mutex mRecvMutex;
std::recursive_mutex mSendMutex; // buffered amount callback is synchronous std::recursive_mutex mSendMutex; // buffered amount callback is synchronous
Queue<message_ptr> mSendQueue; Queue<message_ptr> mSendQueue;

View File

@ -51,7 +51,7 @@ void ThreadPool::spawn(int count) {
void ThreadPool::join() { void ThreadPool::join() {
{ {
std::unique_lock lock(mMutex); std::unique_lock lock(mMutex);
mWaitingCondition.wait(lock, [&]() { return mWaitingWorkers == int(mWorkers.size()); }); mWaitingCondition.wait(lock, [&]() { return mBusyWorkers == 0; });
mJoining = true; mJoining = true;
mTasksCondition.notify_all(); mTasksCondition.notify_all();
} }
@ -66,6 +66,8 @@ void ThreadPool::join() {
} }
void ThreadPool::run() { void ThreadPool::run() {
++mBusyWorkers;
scope_guard([&]() { --mBusyWorkers; });
while (runOne()) { while (runOne()) {
} }
} }
@ -81,24 +83,23 @@ bool ThreadPool::runOne() {
std::function<void()> ThreadPool::dequeue() { std::function<void()> ThreadPool::dequeue() {
std::unique_lock lock(mMutex); std::unique_lock lock(mMutex);
while (!mJoining) { while (!mJoining) {
std::optional<clock::time_point> time;
if (!mTasks.empty()) { if (!mTasks.empty()) {
if (mTasks.top().time <= clock::now()) { time = mTasks.top().time;
if (*time <= clock::now()) {
auto func = std::move(mTasks.top().func); auto func = std::move(mTasks.top().func);
mTasks.pop(); mTasks.pop();
return func; return func;
} }
++mWaitingWorkers;
mWaitingCondition.notify_all();
mTasksCondition.wait_until(lock, mTasks.top().time);
} else {
++mWaitingWorkers;
mWaitingCondition.notify_all();
mTasksCondition.wait(lock);
} }
--mWaitingWorkers; --mBusyWorkers;
scope_guard([&]() { ++mBusyWorkers; });
mWaitingCondition.notify_all();
if(time)
mTasksCondition.wait_until(lock, *time);
else
mTasksCondition.wait(lock);
} }
return nullptr; return nullptr;
} }

View File

@ -72,7 +72,7 @@ protected:
std::function<void()> dequeue(); // returns null function if joining std::function<void()> dequeue(); // returns null function if joining
std::vector<std::thread> mWorkers; std::vector<std::thread> mWorkers;
int mWaitingWorkers = 0; int mBusyWorkers = 0;
std::atomic<bool> mJoining = false; std::atomic<bool> mJoining = false;
struct Task { struct Task {

View File

@ -127,8 +127,12 @@ size_t benchmark(milliseconds duration) {
openTime = steady_clock::now(); openTime = steady_clock::now();
cout << "DataChannel open, sending data..." << endl; cout << "DataChannel open, sending data..." << endl;
while (dc1->bufferedAmount() == 0) { try {
dc1->send(messageData); while (dc1->bufferedAmount() == 0) {
dc1->send(messageData);
}
} catch (const std::exception &e) {
std::cout << "Send failed: " << e.what() << std::endl;
} }
// When sent data is buffered in the DataChannel, // When sent data is buffered in the DataChannel,
@ -141,8 +145,12 @@ size_t benchmark(milliseconds duration) {
return; return;
// Continue sending // Continue sending
while (dc1->bufferedAmount() == 0) { try {
dc1->send(messageData); while (dc1->isOpen() && dc1->bufferedAmount() == 0) {
dc1->send(messageData);
}
} catch (const std::exception &e) {
std::cout << "Send failed: " << e.what() << std::endl;
} }
}); });