Compare commits

...

3 Commits

Author SHA1 Message Date
b3edcfa05c Bumped version to 0.11.7 2021-03-04 12:08:43 +01:00
19e148363c Merge pull request #353 from paullouisageneau/fix-buffered-amount-callback
Fix buffered amount callback synchronization
2021-03-04 12:05:11 +01:00
7f6f178177 Fixed buffered amount callback synchronization 2021-03-03 19:27:54 +01:00
3 changed files with 10 additions and 5 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.7) cmake_minimum_required(VERSION 3.7)
project(libdatachannel project(libdatachannel
VERSION 0.11.6 VERSION 0.11.7
LANGUAGES CXX) LANGUAGES CXX)
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library") set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")

View File

@ -533,13 +533,16 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) {
else else
it->second = amount; it->second = amount;
mSendMutex.unlock(); // Synchronously call the buffered amount callback
triggerBufferedAmount(streamId, amount);
}
void SctpTransport::triggerBufferedAmount(uint16_t streamId, size_t amount) {
try { try {
mBufferedAmountCallback(streamId, amount); mBufferedAmountCallback(streamId, amount);
} catch (const std::exception &e) { } catch (const std::exception &e) {
PLOG_DEBUG << "SCTP buffered amount callback: " << e.what(); PLOG_WARNING << "SCTP buffered amount callback: " << e.what();
} }
mSendMutex.lock();
} }
void SctpTransport::sendReset(uint16_t streamId) { void SctpTransport::sendReset(uint16_t streamId) {

View File

@ -83,6 +83,7 @@ private:
bool trySendQueue(); bool trySendQueue();
bool trySendMessage(message_ptr message); bool trySendMessage(message_ptr message);
void updateBufferedAmount(uint16_t streamId, long delta); void updateBufferedAmount(uint16_t streamId, long delta);
void triggerBufferedAmount(uint16_t streamId, size_t amount);
void sendReset(uint16_t streamId); void sendReset(uint16_t streamId);
bool safeFlush(); bool safeFlush();
@ -97,7 +98,8 @@ private:
Processor mProcessor; Processor mProcessor;
std::atomic<int> mPendingRecvCount; std::atomic<int> mPendingRecvCount;
std::mutex mRecvMutex, mSendMutex; std::mutex mRecvMutex;
std::recursive_mutex mSendMutex; // buffered amount callback is synchronous
Queue<message_ptr> mSendQueue; Queue<message_ptr> mSendQueue;
std::map<uint16_t, size_t> mBufferedAmount; std::map<uint16_t, size_t> mBufferedAmount;
amount_callback mBufferedAmountCallback; amount_callback mBufferedAmountCallback;