diff --git a/src/sctptransport.cpp b/src/sctptransport.cpp index 6ac988f..73b2aa1 100644 --- a/src/sctptransport.cpp +++ b/src/sctptransport.cpp @@ -533,13 +533,16 @@ void SctpTransport::updateBufferedAmount(uint16_t streamId, long delta) { else it->second = amount; - mSendMutex.unlock(); + // Synchronously call the buffered amount callback + triggerBufferedAmount(streamId, amount); +} + +void SctpTransport::triggerBufferedAmount(uint16_t streamId, size_t amount) { try { mBufferedAmountCallback(streamId, amount); } catch (const std::exception &e) { - PLOG_DEBUG << "SCTP buffered amount callback: " << e.what(); + PLOG_WARNING << "SCTP buffered amount callback: " << e.what(); } - mSendMutex.lock(); } void SctpTransport::sendReset(uint16_t streamId) { diff --git a/src/sctptransport.hpp b/src/sctptransport.hpp index 2bae169..a9a78a1 100644 --- a/src/sctptransport.hpp +++ b/src/sctptransport.hpp @@ -83,6 +83,7 @@ private: bool trySendQueue(); bool trySendMessage(message_ptr message); void updateBufferedAmount(uint16_t streamId, long delta); + void triggerBufferedAmount(uint16_t streamId, size_t amount); void sendReset(uint16_t streamId); bool safeFlush(); @@ -97,7 +98,8 @@ private: Processor mProcessor; std::atomic mPendingRecvCount; - std::mutex mRecvMutex, mSendMutex; + std::mutex mRecvMutex; + std::recursive_mutex mSendMutex; // buffered amount callback is synchronous Queue mSendQueue; std::map mBufferedAmount; amount_callback mBufferedAmountCallback;