Compare commits

..

15 Commits

Author SHA1 Message Date
ee3bc9694b Bumped version to 0.10.2 2020-11-18 23:39:06 +01:00
0c0ba77de5 Merge pull request #265 from paullouisageneau/optimize-sctp-recv
Schedule SCTP recv task only if necessary
2020-11-18 23:38:49 +01:00
8729e0d2aa Scheduled SCTP recv task only if there is no pending task 2020-11-18 23:26:47 +01:00
90eb610bfe Merge pull request #263 from paullouisageneau/release-rdesc-lock
Prevent holding multiple locks
2020-11-17 23:02:15 +01:00
08ddfa1276 Release remote description lock before passing candidate to transport 2020-11-17 22:41:42 +01:00
87df64a002 Merge pull request #262 from paullouisageneau/fix-duplicate-candidates
Prevent duplicate candidates
2020-11-17 20:40:15 +01:00
5af414d0df Cosmetic fixes in Description 2020-11-17 20:23:59 +01:00
2443c72350 Refactored trimming with util functions 2020-11-17 20:10:47 +01:00
f033e4ab8f Prevent whitspaces at the end of candidates as they confuse libnice 2020-11-17 19:57:51 +01:00
1a6dcdce6f Reordered Candidate getters 2020-11-17 19:28:13 +01:00
100039eba8 Enforce candidates uniqueness in description 2020-11-17 19:23:29 +01:00
e2005c789a Refactored candidate storage and split parsing and resolution 2020-11-17 19:21:48 +01:00
819566b4c1 Merge pull request #261 from paullouisageneau/fix-remote-unordered-flag
Fix remote unordered flag
2020-11-17 00:15:35 +01:00
82caab8906 Added tests for remote protocol and reliability in C API 2020-11-16 23:59:59 +01:00
802516b2db Fixed remote DataChannel unordered flag 2020-11-16 23:59:59 +01:00
11 changed files with 252 additions and 145 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.7)
project(libdatachannel
VERSION 0.10.1
VERSION 0.10.2
LANGUAGES CXX)
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")

View File

@ -40,29 +40,38 @@ public:
enum class ResolveMode { Simple, Lookup };
bool resolve(ResolveMode mode = ResolveMode::Simple);
Type type() const;
TransportType transportType() const;
uint32_t priority() const;
string candidate() const;
string mid() const;
operator string() const;
bool operator==(const Candidate &other) const;
bool operator!=(const Candidate &other) const;
bool isResolved() const;
Family family() const;
Type type() const;
TransportType transportType() const;
std::optional<string> address() const;
std::optional<uint16_t> port() const;
std::optional<uint32_t> priority() const;
private:
string mCandidate;
void parse(string candidate);
string mFoundation;
uint32_t mComponent, mPriority;
string mTypeString, mTransportString;
Type mType;
TransportType mTransportType;
string mNode, mService;
string mTail;
std::optional<string> mMid;
// Extracted on resolution
Family mFamily;
Type mType;
TransportType mTransportType;
string mAddress;
uint16_t mPort;
uint32_t mPriority;
};
} // namespace rtc

View File

@ -53,6 +53,7 @@ public:
void hintType(Type type);
void setFingerprint(string fingerprint);
bool hasCandidate(const Candidate &candidate) const;
void addCandidate(Candidate candidate);
void addCandidates(std::vector<Candidate> candidates);
void endCandidates();

View File

@ -20,6 +20,7 @@
#include <algorithm>
#include <array>
#include <cctype>
#include <sstream>
#include <unordered_map>
@ -39,39 +40,44 @@ using std::string;
namespace {
inline bool hasprefix(const string &str, const string &prefix) {
inline bool match_prefix(const string &str, const string &prefix) {
return str.size() >= prefix.size() &&
std::mismatch(prefix.begin(), prefix.end(), str.begin()).first == prefix.end();
}
inline void trim_begin(string &str) {
str.erase(str.begin(),
std::find_if(str.begin(), str.end(), [](char c) { return !std::isspace(c); }));
}
inline void trim_end(string &str) {
str.erase(
std::find_if(str.rbegin(), str.rend(), [](char c) { return !std::isspace(c); }).base(),
str.end());
}
} // namespace
namespace rtc {
Candidate::Candidate()
: mFamily(Family::Unresolved), mType(Type::Unknown), mTransportType(TransportType::Unknown),
mPort(0), mPriority(0) {}
: mFoundation("none"), mComponent(0), mPriority(0), mTypeString("unknown"),
mTransportString("unknown"), mType(Type::Unknown), mTransportType(TransportType::Unknown),
mNode("0.0.0.0"), mService("9"), mFamily(Family::Unresolved), mPort(0) {}
Candidate::Candidate(string candidate) : Candidate() {
const std::array prefixes{"a=", "candidate:"};
for (const string &prefix : prefixes)
if (hasprefix(candidate, prefix))
candidate.erase(0, prefix.size());
mCandidate = std::move(candidate);
if (!candidate.empty())
parse(std::move(candidate));
}
Candidate::Candidate(string candidate, string mid) : Candidate(std::move(candidate)) {
Candidate::Candidate(string candidate, string mid) : Candidate() {
if (!candidate.empty())
parse(std::move(candidate));
if (!mid.empty())
mMid.emplace(std::move(mid));
}
void Candidate::hintMid(string mid) {
if (!mMid)
mMid.emplace(std::move(mid));
}
bool Candidate::resolve(ResolveMode mode) {
void Candidate::parse(string candidate) {
using TypeMap_t = std::unordered_map<string, Type>;
using TcpTypeMap_t = std::unordered_map<string, TransportType>;
@ -84,99 +90,122 @@ bool Candidate::resolve(ResolveMode mode) {
{"passive", TransportType::TcpPassive},
{"so", TransportType::TcpSo}};
if (mFamily != Family::Unresolved)
return true;
const std::array prefixes{"a=", "candidate:"};
for (const string &prefix : prefixes)
if (match_prefix(candidate, prefix))
candidate.erase(0, prefix.size());
if (mCandidate.empty())
throw std::logic_error("Candidate is empty");
PLOG_VERBOSE << "Resolving candidate (mode="
<< (mode == ResolveMode::Simple ? "simple" : "lookup") << "): " << mCandidate;
PLOG_VERBOSE << "Parsing candidate: " << candidate;
// See RFC 8445 for format
std::istringstream iss(mCandidate);
int component{0}, priority{0};
string foundation, transport, node, service, typ_, type;
if (iss >> foundation >> component >> transport >> priority &&
iss >> node >> service >> typ_ >> type && typ_ == "typ") {
std::istringstream iss(candidate);
string transport, typ_, type;
if (!(iss >> mFoundation >> mComponent >> mTransportString >> mPriority &&
iss >> mNode >> mService >> typ_ >> mTypeString && typ_ == "typ"))
throw std::invalid_argument("Invalid candidate format");
string left;
std::getline(iss, left);
std::getline(iss, mTail);
trim_begin(mTail);
trim_end(mTail);
if (auto it = TypeMap.find(type); it != TypeMap.end())
mType = it->second;
else
mType = Type::Unknown;
if (auto it = TypeMap.find(type); it != TypeMap.end())
mType = it->second;
else
mType = Type::Unknown;
if (transport == "UDP" || transport == "udp") {
mTransportType = TransportType::Udp;
} else if (transport == "TCP" || transport == "tcp") {
std::istringstream iss(left);
string tcptype_, tcptype;
if (iss >> tcptype_ >> tcptype && tcptype_ == "tcptype") {
if (auto it = TcpTypeMap.find(tcptype); it != TcpTypeMap.end())
mTransportType = it->second;
else
mTransportType = TransportType::TcpUnknown;
} else {
if (transport == "UDP" || transport == "udp") {
mTransportType = TransportType::Udp;
} else if (transport == "TCP" || transport == "tcp") {
// Peek tail to find TCP type
std::istringstream iss(mTail);
string tcptype_, tcptype;
if (iss >> tcptype_ >> tcptype && tcptype_ == "tcptype") {
if (auto it = TcpTypeMap.find(tcptype); it != TcpTypeMap.end())
mTransportType = it->second;
else
mTransportType = TransportType::TcpUnknown;
}
} else {
mTransportType = TransportType::Unknown;
mTransportType = TransportType::TcpUnknown;
}
} else {
mTransportType = TransportType::Unknown;
}
}
// Try to resolve the node
struct addrinfo hints = {};
hints.ai_family = AF_UNSPEC;
hints.ai_flags = AI_ADDRCONFIG;
if (mTransportType == TransportType::Udp) {
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
} else if (mTransportType != TransportType::Unknown) {
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
}
void Candidate::hintMid(string mid) {
if (!mMid)
mMid.emplace(std::move(mid));
}
if (mode == ResolveMode::Simple)
hints.ai_flags |= AI_NUMERICHOST;
bool Candidate::resolve(ResolveMode mode) {
PLOG_VERBOSE << "Resolving candidate (mode="
<< (mode == ResolveMode::Simple ? "simple" : "lookup") << "): " << mNode << ' '
<< mService;
struct addrinfo *result = nullptr;
if (getaddrinfo(node.c_str(), service.c_str(), &hints, &result) == 0) {
for (auto p = result; p; p = p->ai_next) {
if (p->ai_family == AF_INET || p->ai_family == AF_INET6) {
// Rewrite the candidate
char nodebuffer[MAX_NUMERICNODE_LEN];
char servbuffer[MAX_NUMERICSERV_LEN];
if (getnameinfo(p->ai_addr, socklen_t(p->ai_addrlen), nodebuffer,
MAX_NUMERICNODE_LEN, servbuffer, MAX_NUMERICSERV_LEN,
NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
// Try to resolve the node and service
struct addrinfo hints = {};
hints.ai_family = AF_UNSPEC;
hints.ai_flags = AI_ADDRCONFIG;
if (mTransportType == TransportType::Udp) {
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
} else if (mTransportType != TransportType::Unknown) {
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
}
mAddress = nodebuffer;
mPort = uint16_t(std::stoul(servbuffer));
mFamily = p->ai_family == AF_INET6 ? Family::Ipv6 : Family::Ipv4;
if (mode == ResolveMode::Simple)
hints.ai_flags |= AI_NUMERICHOST;
const char sp{' '};
std::ostringstream oss;
oss << foundation << sp << component << sp << transport << sp << priority;
oss << sp << nodebuffer << sp << servbuffer << sp << "typ" << sp << type;
oss << left;
mCandidate = oss.str();
struct addrinfo *result = nullptr;
if (getaddrinfo(mNode.c_str(), mService.c_str(), &hints, &result) == 0) {
for (auto p = result; p; p = p->ai_next) {
if (p->ai_family == AF_INET || p->ai_family == AF_INET6) {
char nodebuffer[MAX_NUMERICNODE_LEN];
char servbuffer[MAX_NUMERICSERV_LEN];
if (getnameinfo(p->ai_addr, socklen_t(p->ai_addrlen), nodebuffer,
MAX_NUMERICNODE_LEN, servbuffer, MAX_NUMERICSERV_LEN,
NI_NUMERICHOST | NI_NUMERICSERV) == 0) {
PLOG_VERBOSE << "Resolved candidate: " << mCandidate;
break;
}
mAddress = nodebuffer;
mPort = uint16_t(std::stoul(servbuffer));
mFamily = p->ai_family == AF_INET6 ? Family::Ipv6 : Family::Ipv4;
PLOG_VERBOSE << "Resolved candidate: " << mAddress << ' ' << mPort;
break;
}
}
freeaddrinfo(result);
}
freeaddrinfo(result);
}
return mFamily != Family::Unresolved;
}
string Candidate::candidate() const { return "candidate:" + mCandidate; }
Candidate::Type Candidate::type() const { return mType; }
Candidate::TransportType Candidate::transportType() const { return mTransportType; }
uint32_t Candidate::priority() const { return mPriority; }
string Candidate::candidate() const {
const char sp{' '};
std::ostringstream oss;
oss << "candidate:";
oss << mFoundation << sp << mComponent << sp << mTransportString << sp << mPriority << sp;
if (isResolved())
oss << mAddress << sp << mPort;
else
oss << mNode << sp << mService;
oss << sp << "typ" << sp << mTypeString;
if (!mTail.empty())
oss << sp << mTail;
return oss.str();
}
string Candidate::mid() const { return mMid.value_or("0"); }
@ -186,14 +215,18 @@ Candidate::operator string() const {
return line.str();
}
bool Candidate::operator==(const Candidate &other) const {
return mFoundation == other.mFoundation;
}
bool Candidate::operator!=(const Candidate &other) const {
return mFoundation != other.mFoundation;
}
bool Candidate::isResolved() const { return mFamily != Family::Unresolved; }
Candidate::Family Candidate::family() const { return mFamily; }
Candidate::Type Candidate::type() const { return mType; }
Candidate::TransportType Candidate::transportType() const { return mTransportType; }
std::optional<string> Candidate::address() const {
return isResolved() ? std::make_optional(mAddress) : nullopt;
}
@ -202,10 +235,6 @@ std::optional<uint16_t> Candidate::port() const {
return isResolved() ? std::make_optional(mPort) : nullopt;
}
std::optional<uint32_t> Candidate::priority() const {
return isResolved() ? std::make_optional(mPriority) : nullopt;
}
} // namespace rtc
std::ostream &operator<<(std::ostream &out, const rtc::Candidate &candidate) {
@ -217,11 +246,11 @@ std::ostream &operator<<(std::ostream &out, const rtc::Candidate::Type &type) {
case rtc::Candidate::Type::Host:
return out << "host";
case rtc::Candidate::Type::PeerReflexive:
return out << "peer_reflexive";
return out << "prflx";
case rtc::Candidate::Type::ServerReflexive:
return out << "server_reflexive";
return out << "srflx";
case rtc::Candidate::Type::Relayed:
return out << "relayed";
return out << "relay";
default:
return out << "unknown";
}

View File

@ -306,7 +306,7 @@ void NegociatedDataChannel::processOpenMessage(message_ptr message) {
mLabel.assign(end, open.labelLength);
mProtocol.assign(end + open.labelLength, open.protocolLength);
mReliability->unordered = (open.reliabilityParameter & 0x80) != 0;
mReliability->unordered = (open.channelType & 0x80) != 0;
switch (open.channelType & 0x7F) {
case CHANNEL_PARTIAL_RELIABLE_REXMIT:
mReliability->type = Reliability::Type::Rexmit;

View File

@ -171,16 +171,27 @@ void Description::setFingerprint(string fingerprint) {
mFingerprint.emplace(std::move(fingerprint));
}
bool Description::hasCandidate(const Candidate &candidate) const {
for (const Candidate &other : mCandidates)
if (candidate == other)
return true;
return false;
}
void Description::addCandidate(Candidate candidate) {
candidate.hintMid(bundleMid());
for (const Candidate &other : mCandidates)
if (candidate == other)
return;
mCandidates.emplace_back(std::move(candidate));
}
void Description::addCandidates(std::vector<Candidate> candidates) {
for (Candidate candidate : candidates) {
candidate.hintMid(bundleMid());
mCandidates.emplace_back(std::move(candidate));
}
for (Candidate candidate : candidates)
addCandidate(std::move(candidate));
}
void Description::endCandidates() { mEnded = true; }

View File

@ -564,12 +564,14 @@ bool IceTransport::addRemoteCandidate(const Candidate &candidate) {
return false;
// Warning: the candidate string must start with "a=candidate:" and it must not end with a
// newline, else libnice will reject it.
// newline or whitespace, else libnice will reject it.
string sdp(candidate);
NiceCandidate *cand =
nice_agent_parse_remote_candidate_sdp(mNiceAgent.get(), mStreamId, sdp.c_str());
if (!cand)
if (!cand) {
PLOG_WARNING << "Rejected ICE candidate: " << sdp;
return false;
}
GSList *list = g_slist_append(nullptr, cand);
int ret = nice_agent_set_remote_candidates(mNiceAgent.get(), mStreamId, 1, list);

View File

@ -266,10 +266,6 @@ void PeerConnection::setRemoteDescription(Description description) {
// Candidates will be added at the end, extract them for now
auto remoteCandidates = description.extractCandidates();
auto type = description.type();
auto iceTransport = initIceTransport();
iceTransport->setRemoteDescription(description);
processRemoteDescription(std::move(description));
changeSignalingState(newSignalingState);
@ -279,8 +275,9 @@ void PeerConnection::setRemoteDescription(Description description) {
setLocalDescription(Description::Type::Answer);
} else {
// This is an answer
auto iceTransport = std::atomic_load(&mIceTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport);
if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
// Since we assumed passive role during DataChannel creation, we need to shift the
// stream numbers by one to shift them from odd to even.
std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
@ -1079,7 +1076,7 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
if (!mLocalDescription)
throw std::logic_error("Got a local candidate without local description");
candidate.resolve(Candidate::ResolveMode::Simple); // for proper SDP generation later
candidate.resolve(Candidate::ResolveMode::Simple);
mLocalDescription->addCandidate(candidate);
PLOG_VERBOSE << "Issuing local candidate: " << candidate;
@ -1095,10 +1092,13 @@ void PeerConnection::processRemoteDescription(Description description) {
if (mRemoteDescription)
existingCandidates = mRemoteDescription->extractCandidates();
mRemoteDescription.emplace(std::move(description));
mRemoteDescription.emplace(description);
mRemoteDescription->addCandidates(std::move(existingCandidates));
}
auto iceTransport = initIceTransport();
iceTransport->setRemoteDescription(std::move(description));
if (description.hasApplication()) {
auto dtlsTransport = std::atomic_load(&mDtlsTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport);
@ -1109,28 +1109,40 @@ void PeerConnection::processRemoteDescription(Description description) {
}
void PeerConnection::processRemoteCandidate(Candidate candidate) {
std::lock_guard lock(mRemoteDescriptionMutex);
auto iceTransport = std::atomic_load(&mIceTransport);
if (!mRemoteDescription || !iceTransport)
throw std::logic_error("Got a remote candidate without remote description");
{
// Set as remote candidate
std::lock_guard lock(mRemoteDescriptionMutex);
if (!mRemoteDescription)
throw std::logic_error("Got a remote candidate without remote description");
candidate.hintMid(mRemoteDescription->bundleMid());
if (!iceTransport)
throw std::logic_error("Got a remote candidate without ICE transport");
if (candidate.resolve(Candidate::ResolveMode::Simple)) {
iceTransport->addRemoteCandidate(candidate);
} else {
// OK, we might need a lookup, do it asynchronously
// We don't use the thread pool because we have no control on the timeout
weak_ptr<IceTransport> weakIceTransport{iceTransport};
std::thread t([weakIceTransport, candidate]() mutable {
if (candidate.resolve(Candidate::ResolveMode::Lookup))
if (auto iceTransport = weakIceTransport.lock())
iceTransport->addRemoteCandidate(candidate);
});
t.detach();
candidate.hintMid(mRemoteDescription->bundleMid());
if (mRemoteDescription->hasCandidate(candidate))
return; // already in description, ignore
candidate.resolve(Candidate::ResolveMode::Simple);
mRemoteDescription->addCandidate(candidate);
}
mRemoteDescription->addCandidate(std::move(candidate));
if (candidate.isResolved()) {
iceTransport->addRemoteCandidate(std::move(candidate));
} else {
// We might need a lookup, do it asynchronously
// We don't use the thread pool because we have no control on the timeout
if (auto iceTransport = std::atomic_load(&mIceTransport)) {
weak_ptr<IceTransport> weakIceTransport{iceTransport};
std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
if (candidate.resolve(Candidate::ResolveMode::Lookup))
if (auto iceTransport = weakIceTransport.lock())
iceTransport->addRemoteCandidate(std::move(candidate));
});
t.detach();
}
}
}
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {

View File

@ -88,7 +88,7 @@ void SctpTransport::Cleanup() {
SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
message_callback recvCallback, amount_callback bufferedAmountCallback,
state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mPort(port),
: Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
onRecv(recvCallback);
@ -327,6 +327,7 @@ void SctpTransport::incoming(message_ptr message) {
void SctpTransport::doRecv() {
std::lock_guard lock(mRecvMutex);
--mPendingRecvCount;
try {
while (true) {
const size_t bufferSize = 65536;
@ -532,15 +533,17 @@ bool SctpTransport::safeFlush() {
}
void SctpTransport::handleUpcall() {
if(!mSock)
if (!mSock)
return;
PLOG_VERBOSE << "Handle upcall";
int events = usrsctp_get_events(mSock);
if (events & SCTP_EVENT_READ)
if (events & SCTP_EVENT_READ && mPendingRecvCount == 0) {
++mPendingRecvCount;
mProcessor.enqueue(&SctpTransport::doRecv, this);
}
if (events & SCTP_EVENT_WRITE)
mProcessor.enqueue(&SctpTransport::safeFlush, this);

View File

@ -95,6 +95,7 @@ private:
struct socket *mSock;
Processor mProcessor;
std::atomic<int> mPendingRecvCount;
std::mutex mRecvMutex, mSendMutex;
Queue<message_ptr> mSendQueue;
std::map<uint16_t, size_t> mBufferedAmount;

View File

@ -100,14 +100,48 @@ static void RTC_API messageCallback(int id, const char *message, int size, void
static void RTC_API dataChannelCallback(int pc, int dc, void *ptr) {
Peer *peer = (Peer *)ptr;
peer->dc = dc;
peer->connected = true;
char label[256];
if (rtcGetDataChannelLabel(dc, label, 256) < 0) {
fprintf(stderr, "rtcGetDataChannelLabel failed\n");
return;
}
char protocol[256];
if (rtcGetDataChannelProtocol(dc, protocol, 256) < 0) {
fprintf(stderr, "rtcGetDataChannelProtocol failed\n");
return;
}
rtcReliability reliability;
if (rtcGetDataChannelReliability(dc, &reliability) < 0) {
fprintf(stderr, "rtcGetDataChannelReliability failed\n");
return;
}
printf("DataChannel %d: Received with label \"%s\" and protocol \"%s\"\n",
peer == peer1 ? 1 : 2, label, protocol);
if (strcmp(label, "test") != 0) {
fprintf(stderr, "Wrong DataChannel label\n");
return;
}
if (strcmp(protocol, "protocol") != 0) {
fprintf(stderr, "Wrong DataChannel protocol\n");
return;
}
if (reliability.unordered == false) {
fprintf(stderr, "Wrong DataChannel reliability\n");
return;
}
rtcSetClosedCallback(dc, closedCallback);
rtcSetMessageCallback(dc, messageCallback);
char buffer[256];
if (rtcGetDataChannelLabel(dc, buffer, 256) >= 0)
printf("DataChannel %d: Received with label \"%s\"\n", peer == peer1 ? 1 : 2, buffer);
peer->dc = dc;
peer->connected = true;
const char *message = peer == peer1 ? "Hello from 1" : "Hello from 2";
rtcSendMessage(peer->dc, message, -1); // negative size indicates a null-terminated string
@ -173,7 +207,12 @@ int test_capi_connectivity_main() {
goto error;
// Peer 1: Create data channel
peer1->dc = rtcCreateDataChannel(peer1->pc, "test");
rtcDataChannelInit init;
memset(&init, 0, sizeof(init));
init.protocol = "protocol";
init.reliability.unordered = true;
peer1->dc = rtcCreateDataChannelEx(peer1->pc, "test", &init);
rtcSetOpenCallback(peer1->dc, openCallback);
rtcSetClosedCallback(peer1->dc, closedCallback);
rtcSetMessageCallback(peer1->dc, messageCallback);