diff --git a/src/channel.cpp b/src/channel.cpp index 907d6ff..ac8ac83 100644 --- a/src/channel.cpp +++ b/src/channel.cpp @@ -43,10 +43,7 @@ void Channel::onError(std::function callback) { void Channel::onMessage(std::function callback) { impl()->messageCallback = callback; - - // Pass pending messages - while (auto message = receive()) - impl()->messageCallback(*message); + impl()->flushPendingMessages(); } void Channel::onMessage(std::function binaryCallback, diff --git a/src/impl/channel.cpp b/src/impl/channel.cpp index d56c30f..60084ea 100644 --- a/src/impl/channel.cpp +++ b/src/impl/channel.cpp @@ -20,22 +20,21 @@ namespace rtc::impl { -void Channel::triggerOpen() { openCallback(); } +void Channel::triggerOpen() { + mOpenTriggered = true; + openCallback(); + flushPendingMessages(); +} void Channel::triggerClosed() { closedCallback(); } -void Channel::triggerError(string error) { errorCallback(error); } +void Channel::triggerError(string error) { errorCallback(std::move(error)); } void Channel::triggerAvailable(size_t count) { if (count == 1) availableCallback(); - while (messageCallback && count--) { - auto message = receive(); - if (!message) - break; - messageCallback(*message); - } + flushPendingMessages(); } void Channel::triggerBufferedAmount(size_t amount) { @@ -45,6 +44,24 @@ void Channel::triggerBufferedAmount(size_t amount) { bufferedAmountLowCallback(); } +void Channel::flushPendingMessages() { + if (!mOpenTriggered) + return; + + while (messageCallback) { + auto next = receive(); + if (!next) + break; + + messageCallback(*next); + } +} + +void Channel::resetOpenCallback() { + mOpenTriggered = false; + openCallback = nullptr; +} + void Channel::resetCallbacks() { openCallback = nullptr; closedCallback = nullptr; diff --git a/src/impl/channel.hpp b/src/impl/channel.hpp index b61f039..4be22cd 100644 --- a/src/impl/channel.hpp +++ b/src/impl/channel.hpp @@ -38,7 +38,9 @@ struct Channel { virtual void triggerAvailable(size_t count); virtual void triggerBufferedAmount(size_t amount); - virtual void resetCallbacks(); + void flushPendingMessages(); + void resetOpenCallback(); + void resetCallbacks(); synchronized_callback<> openCallback; synchronized_callback<> closedCallback; @@ -49,6 +51,9 @@ struct Channel { std::atomic bufferedAmount = 0; std::atomic bufferedAmountLowThreshold = 0; + +private: + std::atomic mOpenTriggered = false; }; } // namespace rtc::impl diff --git a/src/impl/peerconnection.cpp b/src/impl/peerconnection.cpp index e4387f7..7fc1bcb 100644 --- a/src/impl/peerconnection.cpp +++ b/src/impl/peerconnection.cpp @@ -974,7 +974,7 @@ string PeerConnection::localBundleMid() const { void PeerConnection::triggerDataChannel(weak_ptr weakDataChannel) { auto dataChannel = weakDataChannel.lock(); if (dataChannel) { - dataChannel->openCallback = nullptr; // might be set internally + dataChannel->resetOpenCallback(); // might be set internally mPendingDataChannels.push(std::move(dataChannel)); } triggerPendingDataChannels(); @@ -983,7 +983,7 @@ void PeerConnection::triggerDataChannel(weak_ptr weakDataChannel) { void PeerConnection::triggerTrack(weak_ptr weakTrack) { auto track = weakTrack.lock(); if (track) { - track->openCallback = nullptr; // might be set internally + track->resetOpenCallback(); // might be set internally mPendingTracks.push(std::move(track)); } triggerPendingTracks(); @@ -1008,7 +1008,7 @@ void PeerConnection::triggerPendingTracks() { break; auto impl = std::move(*next); - trackCallback(std::make_shared(std::move(impl))); + trackCallback(std::make_shared(impl)); impl->triggerOpen(); } }