Added NegociationNeeded flag

This commit is contained in:
Paul-Louis Ageneau
2020-10-31 10:26:12 +01:00
parent de73af4b80
commit a3cc74c8f1
2 changed files with 70 additions and 50 deletions

View File

@ -168,6 +168,7 @@ private:
std::atomic<State> mState;
std::atomic<GatheringState> mGatheringState;
std::atomic<bool> mNegociationNeeded;
synchronized_callback<std::shared_ptr<DataChannel>> mDataChannelCallback;
synchronized_callback<Description> mLocalDescriptionCallback;

View File

@ -44,7 +44,7 @@ PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
PeerConnection::PeerConnection(const Configuration &config)
: mConfig(config), mCertificate(make_certificate()), mProcessor(std::make_unique<Processor>()),
mState(State::New), mGatheringState(GatheringState::New) {
mState(State::New), mGatheringState(GatheringState::New), mNegociationNeeded(false) {
PLOG_VERBOSE << "Creating PeerConnection";
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
@ -60,6 +60,8 @@ PeerConnection::~PeerConnection() {
void PeerConnection::close() {
PLOG_VERBOSE << "Closing PeerConnection";
mNegociationNeeded = false;
// Close data channels asynchronously
mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this));
@ -100,8 +102,8 @@ bool PeerConnection::hasMedia() const {
void PeerConnection::setLocalDescription() {
PLOG_VERBOSE << "Setting local description";
if (std::atomic_load(&mIceTransport)) {
PLOG_DEBUG << "Local description is already set, ignoring";
if (!mNegociationNeeded.exchange(false)) {
PLOG_DEBUG << "No negociation needed";
return;
}
@ -117,9 +119,6 @@ void PeerConnection::setLocalDescription() {
void PeerConnection::setRemoteDescription(Description description) {
PLOG_VERBOSE << "Setting remote description: " << string(description);
if (hasRemoteDescription())
throw std::logic_error("Remote description is already set");
if (description.mediaCount() == 0)
throw std::invalid_argument("Remote description has no media line");
@ -139,20 +138,23 @@ void PeerConnection::setRemoteDescription(Description description) {
if (!description.fingerprint())
throw std::invalid_argument("Remote description has no fingerprint");
if (auto local = localDescription()) {
if (description.iceUfrag() == local->iceUfrag() && description.icePwd() == local->icePwd())
throw std::logic_error("Got the local description as remote description");
}
description.hintType(hasLocalDescription() ? Description::Type::Answer
: Description::Type::Offer);
if (description.type() == Description::Type::Offer) {
if (hasLocalDescription()) {
PLOG_ERROR << "Got a remote offer description while an answer was expected";
throw std::logic_error("Got an unexpected remote offer description");
}
} else { // Answer
if (auto local = localDescription()) {
if (description.iceUfrag() == local->iceUfrag() &&
description.icePwd() == local->icePwd())
throw std::logic_error("Got the local description as remote description");
} else {
// If there is no remote description, this is the first negociation
// Check it is what we expect
if (!hasRemoteDescription()) {
if (description.type() == Description::Type::Offer) {
if (hasLocalDescription()) {
PLOG_ERROR << "Got a remote offer description while an answer was expected";
throw std::logic_error("Got an unexpected remote offer description");
}
} else { // Answer
PLOG_ERROR << "Got a remote answer description while an offer was expected";
throw std::logic_error("Got an unexpected remote answer description");
}
@ -169,16 +171,23 @@ void PeerConnection::setRemoteDescription(Description description) {
{
// Set as remote description
std::lock_guard lock(mRemoteDescriptionMutex);
std::vector<Candidate> existingCandidates;
if(mRemoteDescription)
existingCandidates = mRemoteDescription->extractCandidates();
mRemoteDescription.emplace(std::move(description));
for (const auto &candidate : existingCandidates)
mRemoteDescription->addCandidate(candidate);
}
if (description.type() == Description::Type::Offer) {
// This is an offer and we are the answerer.
// This is an offer, we need to answer
Description localDescription = iceTransport->getLocalDescription(Description::Type::Answer);
processLocalDescription(localDescription);
iceTransport->gatherLocalCandidates();
} else {
// This is an answer and we are the offerer.
// This is an answer
auto sctpTransport = std::atomic_load(&mSctpTransport);
if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
// Since we assumed passive role during DataChannel creation, we need to shift the
@ -238,11 +247,6 @@ std::optional<string> PeerConnection::remoteAddress() const {
shared_ptr<DataChannel> PeerConnection::addDataChannel(string label, string protocol,
Reliability reliability) {
if (auto local = localDescription(); local && !local->hasApplication()) {
PLOG_ERROR << "The PeerConnection was negociated without DataChannel support.";
throw std::runtime_error("No DataChannel support on the PeerConnection");
}
// RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
// setup:passive. [...] Thus, setup:active is RECOMMENDED.
// See https://tools.ietf.org/html/rfc5763#section-5
@ -257,6 +261,11 @@ shared_ptr<DataChannel> PeerConnection::addDataChannel(string label, string prot
if (transport->state() == SctpTransport::State::Connected)
channel->open(transport);
// Renegociation is needed if the current local description does not have application
std::lock_guard lock(mLocalDescriptionMutex);
if (!mLocalDescription || !mLocalDescription->hasApplication())
mNegociationNeeded = true;
return channel;
}
@ -303,6 +312,10 @@ std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description)
#endif
auto track = std::make_shared<Track>(std::move(description));
mTracks.emplace(std::make_pair(track->mid(), track));
// Renegociation is needed for the new track
mNegociationNeeded = true;
return track;
}
@ -454,31 +467,31 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
auto lower = std::atomic_load(&mDtlsTransport);
auto transport = std::make_shared<SctpTransport>(
lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
[this, weak_this = weak_from_this()](SctpTransport::State state) {
auto shared_this = weak_this.lock();
if (!shared_this)
return;
switch (state) {
case SctpTransport::State::Connected:
changeState(State::Connected);
mProcessor->enqueue(std::bind(&PeerConnection::openDataChannels, this));
break;
case SctpTransport::State::Failed:
LOG_WARNING << "SCTP transport failed";
changeState(State::Failed);
mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
break;
case SctpTransport::State::Disconnected:
changeState(State::Disconnected);
mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
break;
default:
// Ignore
break;
}
});
lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
[this, weak_this = weak_from_this()](SctpTransport::State state) {
auto shared_this = weak_this.lock();
if (!shared_this)
return;
switch (state) {
case SctpTransport::State::Connected:
changeState(State::Connected);
mProcessor->enqueue(std::bind(&PeerConnection::openDataChannels, this));
break;
case SctpTransport::State::Failed:
LOG_WARNING << "SCTP transport failed";
changeState(State::Failed);
mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
break;
case SctpTransport::State::Disconnected:
changeState(State::Disconnected);
mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
break;
default:
// Ignore
break;
}
});
std::atomic_store(&mSctpTransport, transport);
if (mState == State::Closed) {
@ -728,7 +741,6 @@ void PeerConnection::openTracks() {
#endif
}
void PeerConnection::processLocalDescription(Description description) {
int activeMediaCount = 0;
@ -821,7 +833,14 @@ void PeerConnection::processLocalDescription(Description description) {
{
// Set as local description
std::lock_guard lock(mLocalDescriptionMutex);
std::vector<Candidate> existingCandidates;
if(mLocalDescription)
existingCandidates = mLocalDescription->extractCandidates();
mLocalDescription.emplace(std::move(description));
for (const auto &candidate : existingCandidates)
mLocalDescription->addCandidate(candidate);
}
mProcessor->enqueue([this, description = *mLocalDescription]() {