Compare commits

..

49 Commits

Author SHA1 Message Date
e99ba3c5d8 Bumped version to 0.11.5 2021-02-28 18:11:12 +01:00
65dba2c299 Merge pull request #346 from paullouisageneau/fix-mtu
Fix path MTU
2021-02-27 11:39:22 +01:00
6ef8f1e1a7 Added optional MTU setting in configuration 2021-02-27 11:17:49 +01:00
56dbcaad97 Fixed path MTU 2021-02-26 14:12:12 +01:00
d748016446 Merge pull request #344 from paullouisageneau/fix-datachannel-data-race
Fix possible data race in DataChannel
2021-02-25 19:25:40 +01:00
e543d789a4 Refactored Track to follow DataChannel 2021-02-23 22:53:04 +01:00
90e59435c0 Added synchronization to DataChannel 2021-02-23 22:52:56 +01:00
785c3b8149 Renamed "Negociated" to "Negotiated" 2021-02-23 18:34:23 +01:00
c37c88543d Bumped version to 0.11.4 2021-02-22 21:04:31 +01:00
011bfbe46f Merge pull request #342 from paullouisageneau/fix-usrsctp-data-race
Update usrsctp and mitigate possible data race
2021-02-22 19:28:06 +01:00
de2ac6c0c2 Mitigation for data race 2021-02-22 19:05:45 +01:00
75619babd7 Updated usrsctp to v0.9.5.0 2021-02-22 19:00:35 +01:00
fe9a34905b Fixed missing data channels mutex lock 2021-02-22 09:49:03 +01:00
b88f1f5e72 Bumped version to 0.11.3 2021-02-21 20:49:30 +01:00
ab7d7fefe0 Prevent lock order inversion 2021-02-21 20:46:04 +01:00
e592fcf217 Fixed compilation warnings 2021-02-21 15:46:03 +01:00
10567074c3 Bumped version to 0.11.2 2021-02-20 21:58:43 +01:00
524c56dee9 Merge pull request #339 from paullouisageneau/fix-scheduling
Proper fix for possible deadlock at exit
2021-02-17 21:32:22 +01:00
efe12f0b73 Proper fix for thread pool deadlock at exit 2021-02-17 18:21:02 +01:00
82568e3aa0 Revert "Prevent scheduling tasks while joining thread pool"
This reverts commit ab392fe0da.
2021-02-17 17:15:03 +01:00
7082129b54 Merge pull request #337 from hanseuljun/windows-examples-dll-copy
Add copy commands that copies datachannel.dll to examples when examples built
2021-02-16 22:47:09 +01:00
24f8016e4e Merge pull request #336 from hanseuljun/capi-enum-integers
Assign integers to enums rtcCodec and rtcDirection.
2021-02-16 22:45:56 +01:00
98d926a7bf Clean up examples/streamer/CMakeLists.txt a little bit. 2021-02-16 13:32:44 -08:00
ffb589d498 Add dll copy commands to examples for Windows. 2021-02-16 13:28:45 -08:00
4db6afe9e2 Assign integers to enums rtcCodec and rtcDirection. 2021-02-16 13:13:25 -08:00
bdb59905dd Merge pull request #335 from in2core/feature/remove-wrap-macro
Remove WRAP macro
2021-02-16 19:00:20 +01:00
c2b181c6da Remove WRAP macro 2021-02-16 10:28:47 +01:00
3f53365564 Merge pull request #333 from hanseuljun/cmake-add-headers
Add header files to cmake add_library() calls.
2021-02-15 22:15:38 +01:00
95dfa1015d Fixed headers path 2021-02-15 19:27:38 +01:00
1f20f8f1e7 Add header files to cmake add_library() calls. 2021-02-14 23:43:19 -08:00
b2d1a41f7e Merge pull request #332 from paullouisageneau/capi-remote-type
Add description type getters to C API
2021-02-14 16:12:32 +01:00
9e2e7a7722 Added rtcGetLocalDescriptionType() and rtcGetRemoteDescriptionType() 2021-02-14 15:53:19 +01:00
1e02fa34c3 Fixed some compilation warnings 2021-02-11 00:23:43 +01:00
882c605876 Merge pull request #318 from in2core/feature/chainable-rtcp-handlers
Chaining of multiple RTP/RTCP handlers
2021-02-10 22:23:50 +01:00
36a88c605a Merge branch 'master' into feature/chainable-rtcp-handlers
# Conflicts:
#	src/h264packetizationhandler.cpp
2021-02-08 10:19:26 +01:00
09dfc39fd9 Refactor optional pointer to pointer 2021-02-08 10:13:31 +01:00
986e5f985f Merge pull request #328 from in2core/hotfix/split-h264-by-start-sequence
Fix splitting of h264 NALU by start sequence
2021-02-06 10:32:01 +01:00
3e2b0c43ef Merge pull request #327 from in2core/feature/msid-set-trackid
Add API to set track id in msid
2021-02-05 22:29:39 +01:00
557b293934 Fix: splitting of h264 NALU by start sequence now works correctly 2021-02-05 14:37:13 +01:00
7730496bf9 Fix streamer example for safari 2021-02-05 13:58:33 +01:00
2b7dc4c529 Rename RtcpHandler to MediaHandler 2021-02-05 13:17:59 +01:00
b5699239cc Rename RtcpSRReporter to RtcpSrReporter for naming consistency 2021-02-05 12:53:26 +01:00
9f3b004756 Add API to set track id in msid 2021-02-05 12:10:51 +01:00
b347afae14 Fix unexpected deallocation of element in RtcpChainableHandler's chain 2021-02-02 15:52:02 +01:00
0482953062 Refactor capi.cpp to preffer make_shared 2021-02-02 13:59:47 +01:00
7b206899a4 Update streamer example 2021-01-28 12:24:31 +01:00
c2c57b16df Add RTCP Nack responder 2021-01-28 12:19:47 +01:00
9805b2fcb5 Rename functions in MessageHandlerElement 2021-01-28 11:56:05 +01:00
569a317bf0 Add chainable Rtcp Handler 2021-01-28 10:16:59 +01:00
65 changed files with 1777 additions and 647 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.7)
project(libdatachannel
VERSION 0.11.1
VERSION 0.11.5
LANGUAGES CXX)
set(PROJECT_DESCRIPTION "WebRTC Data Channels Library")
@ -67,13 +67,30 @@ set(LIBDATACHANNEL_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/rtppacketizationconfig.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpsenderreporter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpsrreporter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/rtppacketizer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/opusrtppacketizer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/opuspacketizationhandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/h264rtppacketizer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/nalunit.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/h264packetizationhandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/mediachainablehandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/mediahandlerelement.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/mediahandlerrootelement.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpnackresponder.cpp
)
set(LIBDATACHANNEL_PRIVATE_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/src/certificate.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/dtlssrtptransport.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/dtlstransport.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/icetransport.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/logcounter.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tls.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/processor.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/transport.hpp
)
set(LIBDATACHANNEL_WEBSOCKET_SOURCES
@ -85,6 +102,14 @@ set(LIBDATACHANNEL_WEBSOCKET_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/wstransport.cpp
)
set(LIBDATACHANNEL_WEBSOCKET_PRIVATE_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/src/base64.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tcptransport.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tlstransport.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/verifiedtlstransport.hpp
${CMAKE_CURRENT_SOURCE_DIR}/src/wstransport.hpp
)
set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/candidate.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/channel.hpp
@ -92,7 +117,7 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/configuration.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/datachannel.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/description.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcphandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/mediahandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpreceivingsession.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/include.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/init.hpp
@ -107,13 +132,17 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/track.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/websocket.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtppacketizationconfig.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpsenderreporter.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpsrreporter.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtppacketizer.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/opusrtppacketizer.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/opuspacketizationhandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/h264rtppacketizer.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/nalunit.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/h264packetizationhandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/mediachainablehandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/mediahandlerelement.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/mediahandlerrootelement.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
)
set(TESTS_SOURCES
@ -165,18 +194,28 @@ add_library(Usrsctp::Usrsctp ALIAS usrsctp)
if (NO_WEBSOCKET)
add_library(datachannel SHARED
${LIBDATACHANNEL_SOURCES})
${LIBDATACHANNEL_SOURCES}
${LIBDATACHANNEL_PRIVATE_HEADERS}
${LIBDATACHANNEL_HEADERS})
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL
${LIBDATACHANNEL_SOURCES})
${LIBDATACHANNEL_SOURCES}
${LIBDATACHANNEL_PRIVATE_HEADERS}
${LIBDATACHANNEL_HEADERS})
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=0)
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=0)
else()
add_library(datachannel SHARED
${LIBDATACHANNEL_SOURCES}
${LIBDATACHANNEL_WEBSOCKET_SOURCES})
${LIBDATACHANNEL_PRIVATE_HEADERS}
${LIBDATACHANNEL_WEBSOCKET_SOURCES}
${LIBDATACHANNEL_WEBSOCKET_PRIVATE_HEADERS}
${LIBDATACHANNEL_HEADERS})
add_library(datachannel-static STATIC EXCLUDE_FROM_ALL
${LIBDATACHANNEL_SOURCES}
${LIBDATACHANNEL_WEBSOCKET_SOURCES})
${LIBDATACHANNEL_PRIVATE_HEADERS}
${LIBDATACHANNEL_WEBSOCKET_SOURCES}
${LIBDATACHANNEL_WEBSOCKET_PRIVATE_HEADERS}
${LIBDATACHANNEL_HEADERS})
target_compile_definitions(datachannel PUBLIC RTC_ENABLE_WEBSOCKET=1)
target_compile_definitions(datachannel-static PUBLIC RTC_ENABLE_WEBSOCKET=1)
endif()

2
deps/usrsctp vendored

View File

@ -4,10 +4,10 @@ if(POLICY CMP0079)
endif()
if(WIN32)
add_executable(datachannel-client main.cpp parse_cl.cpp parse_cl.h getopt.cpp getopt.h)
target_compile_definitions(datachannel-client PUBLIC STATIC_GETOPT)
add_executable(datachannel-client main.cpp parse_cl.cpp parse_cl.h getopt.cpp getopt.h)
target_compile_definitions(datachannel-client PUBLIC STATIC_GETOPT)
else()
add_executable(datachannel-client main.cpp parse_cl.cpp parse_cl.h)
add_executable(datachannel-client main.cpp parse_cl.cpp parse_cl.h)
endif()
set_target_properties(datachannel-client PROPERTIES
@ -15,3 +15,10 @@ set_target_properties(datachannel-client PROPERTIES
OUTPUT_NAME client)
target_link_libraries(datachannel-client datachannel nlohmann_json)
if(WIN32)
add_custom_command(TARGET datachannel-client POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
"$<TARGET_FILE_DIR:datachannel>/datachannel.dll"
$<TARGET_FILE_DIR:datachannel-client>
)
endif()

View File

@ -13,3 +13,15 @@ set_target_properties(datachannel-copy-paste-capi-answerer PROPERTIES
OUTPUT_NAME answerer)
target_link_libraries(datachannel-copy-paste-capi-answerer datachannel)
if(WIN32)
add_custom_command(TARGET datachannel-copy-paste-capi-offerer POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
"$<TARGET_FILE_DIR:datachannel>/datachannel.dll"
$<TARGET_FILE_DIR:datachannel-copy-paste-capi-offerer>
)
add_custom_command(TARGET datachannel-copy-paste-capi-answerer POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
"$<TARGET_FILE_DIR:datachannel>/datachannel.dll"
$<TARGET_FILE_DIR:datachannel-copy-paste-capi-answerer>
)
endif()

View File

@ -12,3 +12,15 @@ set_target_properties(datachannel-copy-paste-answerer PROPERTIES
OUTPUT_NAME answerer)
target_link_libraries(datachannel-copy-paste-answerer datachannel)
if(WIN32)
add_custom_command(TARGET datachannel-copy-paste-offerer POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
"$<TARGET_FILE_DIR:datachannel>/datachannel.dll"
$<TARGET_FILE_DIR:datachannel-copy-paste-offerer>
)
add_custom_command(TARGET datachannel-copy-paste-answerer POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
"$<TARGET_FILE_DIR:datachannel>/datachannel.dll"
$<TARGET_FILE_DIR:datachannel-copy-paste-answerer>
)
endif()

View File

@ -6,3 +6,10 @@ set_target_properties(datachannel-media PROPERTIES
OUTPUT_NAME media)
target_link_libraries(datachannel-media datachannel nlohmann_json)
if(WIN32)
add_custom_command(TARGET datachannel-media POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
"$<TARGET_FILE_DIR:datachannel>/datachannel.dll"
$<TARGET_FILE_DIR:datachannel-media>
)
endif()

View File

@ -6,3 +6,10 @@ set_target_properties(datachannel-sfu-media PROPERTIES
OUTPUT_NAME sfu-media)
target_link_libraries(datachannel-sfu-media datachannel nlohmann_json)
if(WIN32)
add_custom_command(TARGET datachannel-sfu-media POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
"$<TARGET_FILE_DIR:datachannel>/datachannel.dll"
$<TARGET_FILE_DIR:datachannel-sfu-media>
)
endif()

View File

@ -3,17 +3,38 @@ if(POLICY CMP0079)
cmake_policy(SET CMP0079 NEW)
endif()
add_executable(streamer
main.cpp
dispatchqueue.cpp
dispatchqueue.hpp
h264fileparser.cpp
h264fileparser.hpp
helpers.cpp
helpers.hpp
opusfileparser.cpp
opusfileparser.hpp
fileparser.cpp
fileparser.hpp
stream.cpp
stream.hpp
ArgParser.cpp
ArgParser.hpp
)
if(WIN32)
add_executable(streamer main.cpp dispatchqueue.cpp dispatchqueue.hpp h264fileparser.cpp h264fileparser.hpp helpers.cpp helpers.hpp opusfileparser.cpp opusfileparser.hpp fileparser.cpp fileparser.hpp stream.cpp stream.hpp ArgParser.hpp ArgParser.cpp)
target_compile_definitions(streamer PUBLIC STATIC_GETOPT)
else()
add_executable(streamer main.cpp dispatchqueue.cpp dispatchqueue.hpp h264fileparser.cpp h264fileparser.hpp helpers.cpp helpers.hpp opusfileparser.cpp opusfileparser.hpp fileparser.cpp fileparser.hpp stream.cpp stream.hpp ArgParser.hpp ArgParser.cpp)
target_compile_definitions(streamer PUBLIC STATIC_GETOPT)
endif()
set_target_properties(streamer PROPERTIES
CXX_STANDARD 17
OUTPUT_NAME streamer)
target_link_libraries(streamer datachannel)
target_link_libraries(streamer datachannel nlohmann_json)
if(WIN32)
add_custom_command(TARGET streamer POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
"$<TARGET_FILE_DIR:datachannel>/datachannel.dll"
$<TARGET_FILE_DIR:streamer>
)
endif()

View File

@ -54,12 +54,10 @@ function createPeerConnection() {
// connect audio / video
pc.addEventListener('track', function (evt) {
if (evt.track.kind == 'video') {
document.getElementById('media').style.display = 'block';
document.getElementById('video').srcObject = evt.streams[0];
} else {
document.getElementById('audio').srcObject = evt.streams[0];
}
const videoTag = document.getElementById('video');
videoTag.srcObject = evt.streams[0];
videoTag.play();
});
let time_start = null;

View File

@ -60,7 +60,7 @@ int gettimeofday(struct timeval *tv, struct timezone *tz)
using namespace std;
using namespace rtc;
ClientTrackData::ClientTrackData(shared_ptr<Track> track, shared_ptr<RtcpSenderReporter> sender) {
ClientTrackData::ClientTrackData(shared_ptr<Track> track, shared_ptr<RtcpSrReporter> sender) {
this->track = track;
this->sender = sender;
}

View File

@ -23,9 +23,9 @@
struct ClientTrackData {
std::shared_ptr<rtc::Track> track;
std::shared_ptr<rtc::RtcpSenderReporter> sender;
std::shared_ptr<rtc::RtcpSrReporter> sender;
ClientTrackData(std::shared_ptr<rtc::Track> track, std::shared_ptr<rtc::RtcpSenderReporter> sender);
ClientTrackData(std::shared_ptr<rtc::Track> track, std::shared_ptr<rtc::RtcpSrReporter> sender);
};
struct Client {

View File

@ -52,7 +52,6 @@
<div id="media" style="display: none">
<h2>Media</h2>
<audio id="audio" autoplay></audio>
<video id="video" autoplay playsinline></video>
</div>

View File

@ -214,36 +214,48 @@ int main(int argc, char **argv) try {
shared_ptr<ClientTrackData> addVideo(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
auto video = Description::Video(cname);
video.addH264Codec(payloadType);
video.addSSRC(ssrc, cname, msid);
video.addSSRC(ssrc, cname, msid, cname);
auto track = pc->addTrack(video);
// create RTP configuration
auto rtpConfig = shared_ptr<RtpPacketizationConfig>(new RtpPacketizationConfig(ssrc, cname, payloadType, H264RtpPacketizer::defaultClockRate));
// create packetizer
auto packetizer = shared_ptr<H264RtpPacketizer>(new H264RtpPacketizer(rtpConfig));
// create H264 and RTCP SP handler
shared_ptr<H264PacketizationHandler> h264Handler(new H264PacketizationHandler(H264PacketizationHandler::Separator::Length, packetizer));
auto packetizer = shared_ptr<H264RtpPacketizer>(new H264RtpPacketizer(H264RtpPacketizer::Separator::Length, rtpConfig));
// create H264 handler
shared_ptr<H264PacketizationHandler> h264Handler(new H264PacketizationHandler(packetizer));
// add RTCP SR handler
auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
h264Handler->addToChain(srReporter);
// add RTCP NACK handler
auto nackResponder = make_shared<RtcpNackResponder>();
h264Handler->addToChain(nackResponder);
// set handler
track->setRtcpHandler(h264Handler);
track->onOpen(onOpen);
auto trackData = make_shared<ClientTrackData>(track, h264Handler);
auto trackData = make_shared<ClientTrackData>(track, srReporter);
return trackData;
}
shared_ptr<ClientTrackData> addAudio(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
auto audio = Description::Audio(cname);
audio.addOpusCodec(payloadType);
audio.addSSRC(ssrc, cname, msid);
audio.addSSRC(ssrc, cname, msid, cname);
auto track = pc->addTrack(audio);
// create RTP configuration
auto rtpConfig = shared_ptr<RtpPacketizationConfig>(new RtpPacketizationConfig(ssrc, cname, payloadType, OpusRtpPacketizer::defaultClockRate));
// create packetizer
auto packetizer = make_shared<OpusRtpPacketizer>(rtpConfig);
// create opus and RTCP SP handler
// create opus handler
auto opusHandler = make_shared<OpusPacketizationHandler>(packetizer);
// add RTCP SR handler
auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
opusHandler->addToChain(srReporter);
// add RTCP NACK handler
auto nackResponder = make_shared<RtcpNackResponder>();
opusHandler->addToChain(nackResponder);
// set handler
track->setRtcpHandler(opusHandler);
track->onOpen(onOpen);
auto trackData = make_shared<ClientTrackData>(track, opusHandler);
auto trackData = make_shared<ClientTrackData>(track, srReporter);
return trackData;
}

View File

@ -70,6 +70,7 @@ struct RTC_CPP_EXPORT Configuration {
bool enableIceTcp = false;
uint16_t portRangeBegin = 1024;
uint16_t portRangeEnd = 65535;
std::optional<size_t> mtu;
};
} // namespace rtc

View File

@ -30,6 +30,7 @@
#include <functional>
#include <type_traits>
#include <variant>
#include <shared_mutex>
namespace rtc {
@ -79,6 +80,8 @@ protected:
string mProtocol;
std::shared_ptr<Reliability> mReliability;
mutable std::shared_mutex mMutex;
std::atomic<bool> mIsOpen = false;
std::atomic<bool> mIsClosed = false;
@ -88,13 +91,13 @@ private:
friend class PeerConnection;
};
class RTC_CPP_EXPORT NegociatedDataChannel final : public DataChannel {
class RTC_CPP_EXPORT NegotiatedDataChannel final : public DataChannel {
public:
NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream, string label,
NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream, string label,
string protocol, Reliability reliability);
NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, std::weak_ptr<SctpTransport> transport,
NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, std::weak_ptr<SctpTransport> transport,
uint16_t stream);
~NegociatedDataChannel();
~NegotiatedDataChannel();
private:
void open(std::shared_ptr<SctpTransport> transport) override;

View File

@ -141,9 +141,9 @@ public:
void removeFormat(const string &fmt);
void addSSRC(uint32_t ssrc, std::optional<string> name,
std::optional<string> msid = nullopt);
std::optional<string> msid = nullopt, std::optional<string> trackID = nullopt);
void replaceSSRC(uint32_t oldSSRC, uint32_t ssrc, std::optional<string> name,
std::optional<string> msid = nullopt);
std::optional<string> msid = nullopt, std::optional<string> trackID = nullopt);
bool hasSSRC(uint32_t ssrc);
std::vector<uint32_t> getSSRCs();

View File

@ -23,50 +23,16 @@
#include "h264rtppacketizer.hpp"
#include "nalunit.hpp"
#include "rtcphandler.hpp"
#include "rtcpsenderreporter.hpp"
#include "mediachainablehandler.hpp"
namespace rtc {
/// Handler for H264 packetization
class RTC_CPP_EXPORT H264PacketizationHandler : public RtcpHandler, public RtcpSenderReporter {
/// RTP packetizer for H264
const std::shared_ptr<H264RtpPacketizer> packetizer;
const uint16_t maximumFragmentSize;
std::shared_ptr<NalUnits> splitMessage(message_ptr message);
class RTC_CPP_EXPORT H264PacketizationHandler : public MediaChainableHandler {
public:
/// Nalunit separator
enum class Separator {
LongStartSequence, // 0x00, 0x00, 0x00, 0x01
ShortStartSequence, // 0x00, 0x00, 0x01
StartSequence, // LongStartSequence or ShortStartSequence
Length // first 4 bytes is nal unit length
};
/// Construct handler for H264 packetization.
/// @param separator Nal units separator
/// @param packetizer RTP packetizer for h264
H264PacketizationHandler(Separator separator, std::shared_ptr<H264RtpPacketizer> packetizer,
uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
/// Returns message unchanged
/// @param ptr message
message_ptr incoming(message_ptr ptr) override;
/// Returns packetized message if message type is binary
/// @note NAL units in `ptr` message must be separated by `separator` given in constructor
/// @note If message generates multiple rtp packets, all but last are send using
/// `outgoingCallback`. It is your responsibility to send last packet.
/// @param ptr message containing all NAL units for current timestamp (one sample)
/// @return last packet
message_ptr outgoing(message_ptr ptr) override;
private:
/// Separator
const Separator separator;
H264PacketizationHandler(std::shared_ptr<H264RtpPacketizer> packetizer);
};
} // namespace rtc

View File

@ -21,22 +21,43 @@
#if RTC_ENABLE_MEDIA
#include "nalunit.hpp"
#include "rtppacketizer.hpp"
#include "mediahandlerrootelement.hpp"
namespace rtc {
/// RTP packetization of h264 payload
class RTC_CPP_EXPORT H264RtpPacketizer : public RtpPacketizer {
class RTC_CPP_EXPORT H264RtpPacketizer : public RtpPacketizer, public MediaHandlerRootElement {
std::shared_ptr<NalUnits> splitMessage(binary_ptr message);
const uint16_t maximumFragmentSize;
public:
/// Default clock rate for H264 in RTP
static const auto defaultClockRate = 90 * 1000;
/// Nalunit separator
enum class Separator {
LongStartSequence, // 0x00, 0x00, 0x00, 0x01
ShortStartSequence, // 0x00, 0x00, 0x01
StartSequence, // LongStartSequence or ShortStartSequence
Length // first 4 bytes is nal unit length
};
H264RtpPacketizer(H264RtpPacketizer::Separator separator, std::shared_ptr<RtpPacketizationConfig> rtpConfig,
uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
/// Constructs h264 payload packetizer with given RTP configuration.
/// @note RTP configuration is used in packetization process which may change some configuration
/// properties such as sequence number.
/// @param rtpConfig RTP configuration
H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
/// @param maximumFragmentSize maximum size of one NALU fragment
H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig,
uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) override;
private:
const Separator separator;
};
} // namespace rtc

View File

@ -56,6 +56,7 @@ using std::byte;
using std::string;
using std::string_view;
using binary = std::vector<byte>;
using binary_ptr = std::shared_ptr<binary>;
using std::nullopt;
@ -76,6 +77,8 @@ const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // Max per-channel queue size
const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
const size_t DEFAULT_IPV4_MTU = 1200; // IPv4 safe MTU value recommended by RFC 8261
// overloaded helper
template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
template <class... Ts> overloaded(Ts...) -> overloaded<Ts...>;

View File

@ -0,0 +1,55 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef RTC_MEDIA_CHAINABLE_HANDLER_H
#define RTC_MEDIA_CHAINABLE_HANDLER_H
#if RTC_ENABLE_MEDIA
#include "mediahandler.hpp"
#include "mediahandlerrootelement.hpp"
namespace rtc {
class RTC_CPP_EXPORT MediaChainableHandler : public MediaHandler {
const std::shared_ptr<MediaHandlerRootElement> root;
std::shared_ptr<MediaHandlerElement> leaf;
std::mutex inoutMutex;
message_ptr handleIncomingBinary(message_ptr);
message_ptr handleIncomingControl(message_ptr);
message_ptr handleOutgoingBinary(message_ptr);
message_ptr handleOutgoingControl(message_ptr);
bool sendProduct(ChainedOutgoingProduct product);
public:
MediaChainableHandler(std::shared_ptr<MediaHandlerRootElement> root);
~MediaChainableHandler();
message_ptr incoming(message_ptr ptr) override;
message_ptr outgoing(message_ptr ptr) override;
bool send(message_ptr msg);
/// Adds element to chain
/// @param chainable Chainable element
void addToChain(std::shared_ptr<MediaHandlerElement> chainable);
};
} // namespace rtc
#endif // RTC_ENABLE_MEDIA
#endif // RTC_MEDIA_CHAINABLE_HANDLER_H

View File

@ -17,15 +17,15 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef RTC_RTCP_HANDLER_H
#define RTC_RTCP_HANDLER_H
#ifndef RTC_MEDIA_HANDLER_H
#define RTC_MEDIA_HANDLER_H
#include "include.hpp"
#include "message.hpp"
namespace rtc {
class RTC_CPP_EXPORT RtcpHandler {
class RTC_CPP_EXPORT MediaHandler {
protected:
// Use this callback when trying to send custom data (such as RTCP) to the client.
synchronized_callback<message_ptr> outgoingCallback;
@ -47,4 +47,4 @@ public:
} // namespace rtc
#endif // RTC_RTCP_HANDLER_H
#endif // RTC_MEDIA_HANDLER_H

View File

@ -0,0 +1,111 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef RTC_MEDIA_HANDLER_ELEMENT_H
#define RTC_MEDIA_HANDLER_ELEMENT_H
#if RTC_ENABLE_MEDIA
#include "include.hpp"
#include "message.hpp"
#include "rtp.hpp"
namespace rtc {
using ChainedMessagesProduct = std::shared_ptr<std::vector<binary_ptr>>;
RTC_CPP_EXPORT ChainedMessagesProduct make_chained_messages_product();
RTC_CPP_EXPORT ChainedMessagesProduct make_chained_messages_product(message_ptr msg);
/// Ougoing messages
struct RTC_CPP_EXPORT ChainedOutgoingProduct {
ChainedOutgoingProduct(ChainedMessagesProduct messages = nullptr, message_ptr control = nullptr);
const ChainedMessagesProduct messages;
const message_ptr control;
};
/// Incoming messages with response
struct RTC_CPP_EXPORT ChainedIncomingProduct {
ChainedIncomingProduct(ChainedMessagesProduct incoming = nullptr, ChainedMessagesProduct outgoing = nullptr);
const ChainedMessagesProduct incoming;
const ChainedOutgoingProduct outgoing;
};
/// Incoming control messages with response
struct RTC_CPP_EXPORT ChainedIncomingControlProduct {
ChainedIncomingControlProduct(message_ptr incoming, std::optional<ChainedOutgoingProduct> outgoing = nullopt);
const message_ptr incoming;
const std::optional<ChainedOutgoingProduct> outgoing;
};
/// Chainable handler
class RTC_CPP_EXPORT MediaHandlerElement: public std::enable_shared_from_this<MediaHandlerElement> {
std::shared_ptr<MediaHandlerElement> upstream = nullptr;
std::shared_ptr<MediaHandlerElement> downstream = nullptr;
void prepareAndSendResponse(std::optional<ChainedOutgoingProduct> outgoing, std::function<bool (ChainedOutgoingProduct)> send);
void removeFromChain();
public:
MediaHandlerElement();
/// Creates response to incoming message
/// @param messages Current repsonse
/// @returns New response
std::optional<ChainedOutgoingProduct> processOutgoingResponse(ChainedOutgoingProduct messages);
// Process incoming and ougoing messages
message_ptr formIncomingControlMessage(message_ptr message, std::function<bool (ChainedOutgoingProduct)> send);
ChainedMessagesProduct formIncomingBinaryMessage(ChainedMessagesProduct messages, std::function<bool (ChainedOutgoingProduct)> send);
message_ptr formOutgoingControlMessage(message_ptr message);
std::optional<ChainedOutgoingProduct> formOutgoingBinaryMessage(ChainedOutgoingProduct product);
/// Process current control message
/// @param messages current message
/// @returns Modified message and response
virtual ChainedIncomingControlProduct processIncomingControlMessage(message_ptr messages);
/// Process current control message
/// @param messages current message
/// @returns Modified message
virtual message_ptr processOutgoingControlMessage(message_ptr messages);
/// Process current binary message
/// @param messages current message
/// @returns Modified message and response
virtual ChainedIncomingProduct processIncomingBinaryMessage(ChainedMessagesProduct messages);
/// Process current binary message
/// @param messages current message
/// @param control current control message
/// @returns Modified binary message and control message
virtual ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control);
/// Set given element as upstream to this
/// @param upstream Upstream element
/// @returns Upstream element
std::shared_ptr<MediaHandlerElement> chainWith(std::shared_ptr<MediaHandlerElement> upstream);
/// Remove all downstream elements from chain
void recursiveRemoveChain();
};
} // namespace rtc
#endif // RTC_ENABLE_MEDIA
#endif // RTC_MEDIA_HANDLER_ELEMENT_H

View File

@ -0,0 +1,45 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef RTCP_MEDIA_HANDLER_ROOT_ELEMENT_H
#define RTCP_MEDIA_HANDLER_ROOT_ELEMENT_H
#if RTC_ENABLE_MEDIA
#include "mediahandlerelement.hpp"
namespace rtc {
/// Chainable message handler
class RTC_CPP_EXPORT MediaHandlerRootElement : public MediaHandlerElement {
public:
MediaHandlerRootElement() { }
/// Reduce multiple messages into one message
/// @param messages Messages to reduce
virtual message_ptr reduce(ChainedMessagesProduct messages);
/// Splits message into multiple messages
/// @param message Message to split
virtual ChainedMessagesProduct split(message_ptr message);
};
} // namespace rtc
#endif // RTC_ENABLE_MEDIA
#endif // RTCP_MEDIA_HANDLER_ROOT_ELEMENT_H

View File

@ -103,7 +103,7 @@ struct RTC_CPP_EXPORT NalUnitFragmentA : NalUnit {
NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t nri, uint8_t unitType,
binary data);
static std::vector<NalUnitFragmentA> fragmentsFrom(NalUnit nalu, uint16_t maximumFragmentSize);
static std::vector<std::shared_ptr<NalUnitFragmentA>> fragmentsFrom(std::shared_ptr<NalUnit> nalu, uint16_t maximumFragmentSize);
uint8_t unitType() { return fragmentHeader()->unitType(); }
@ -142,11 +142,10 @@ protected:
const uint8_t nal_type_fu_A = 28;
};
class RTC_CPP_EXPORT NalUnits : public std::vector<NalUnit> {
class RTC_CPP_EXPORT NalUnits : public std::vector<std::shared_ptr<NalUnit>> {
public:
static const uint16_t defaultMaximumFragmentSize = 1400;
std::vector<binary>
generateFragments(uint16_t maximumFragmentSize = NalUnits::defaultMaximumFragmentSize);
std::vector<std::shared_ptr<binary>> generateFragments(uint16_t maximumFragmentSize);
};
} // namespace rtc

View File

@ -22,27 +22,17 @@
#if RTC_ENABLE_MEDIA
#include "opusrtppacketizer.hpp"
#include "rtcphandler.hpp"
#include "rtcpsenderreporter.hpp"
#include "mediachainablehandler.hpp"
namespace rtc {
/// Handler for opus packetization
class RTC_CPP_EXPORT OpusPacketizationHandler : public RtcpHandler, public RtcpSenderReporter {
/// RTP packetizer for opus
const std::shared_ptr<OpusRtpPacketizer> packetizer;
class RTC_CPP_EXPORT OpusPacketizationHandler : public MediaChainableHandler {
public:
/// Construct handler for opus packetization.
/// @param packetizer RTP packetizer for opus
OpusPacketizationHandler(std::shared_ptr<OpusRtpPacketizer> packetizer);
/// Returns message unchanged
/// @param ptr message
message_ptr incoming(message_ptr ptr) override;
/// Returns packetized message if message type is binary
/// @param ptr message
message_ptr outgoing(message_ptr ptr) override;
};
} // namespace rtc

View File

@ -22,12 +22,12 @@
#if RTC_ENABLE_MEDIA
#include "rtppacketizer.hpp"
#include "mediahandlerrootelement.hpp"
namespace rtc {
/// RTP packetizer for opus
class RTC_CPP_EXPORT OpusRtpPacketizer : public RtpPacketizer {
class RTC_CPP_EXPORT OpusRtpPacketizer : public RtpPacketizer, public MediaHandlerRootElement {
public:
/// default clock rate used in opus RTP communication
static const uint32_t defaultClockRate = 48 * 1000;
@ -42,7 +42,13 @@ public:
/// @note This function increase sequence number after packetization.
/// @param payload RTP payload
/// @param setMark This needs to be `false` for all RTP packets with opus payload
message_ptr packetize(binary payload, bool setMark) override;
binary_ptr packetize(binary_ptr payload, bool setMark) override;
/// Creates RTP packet for given samples (all samples share same RTP timesamp)
/// @param messages opus samples
/// @param control RTCP
/// @returns RTP packets and unchanged `control`
ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) override;
};
} // namespace rtc

View File

@ -41,6 +41,7 @@ extern "C" {
#if RTC_ENABLE_MEDIA
#define RTC_DEFAULT_MAXIMUM_FRAGMENT_SIZE ((uint16_t)1400)
#define RTC_DEFAULT_MAXIMUM_PACKET_COUNT_FOR_NACK_CACHE ((unsigned)512)
#endif
#include <stdbool.h>
@ -85,20 +86,20 @@ typedef enum { // Don't change, it must match plog severity
typedef enum {
// video
RTC_CODEC_H264,
RTC_CODEC_VP8,
RTC_CODEC_VP9,
RTC_CODEC_H264 = 0,
RTC_CODEC_VP8 = 1,
RTC_CODEC_VP9 = 2,
// audio
RTC_CODEC_OPUS
RTC_CODEC_OPUS = 128
} rtcCodec;
typedef enum {
RTC_DIRECTION_UNKNOWN,
RTC_DIRECTION_SENDONLY,
RTC_DIRECTION_RECVONLY,
RTC_DIRECTION_SENDRECV,
RTC_DIRECTION_INACTIVE
RTC_DIRECTION_UNKNOWN = 0,
RTC_DIRECTION_SENDONLY = 1,
RTC_DIRECTION_RECVONLY = 2,
RTC_DIRECTION_SENDRECV = 3,
RTC_DIRECTION_INACTIVE = 4
} rtcDirection;
#endif // RTC_ENABLE_MEDIA
@ -174,6 +175,9 @@ RTC_EXPORT int rtcAddRemoteCandidate(int pc, const char *cand, const char *mid);
RTC_EXPORT int rtcGetLocalDescription(int pc, char *buffer, int size);
RTC_EXPORT int rtcGetRemoteDescription(int pc, char *buffer, int size);
RTC_EXPORT int rtcGetLocalDescriptionType(int pc, char *buffer, int size);
RTC_EXPORT int rtcGetRemoteDescriptionType(int pc, char *buffer, int size);
RTC_EXPORT int rtcGetLocalAddress(int pc, char *buffer, int size);
RTC_EXPORT int rtcGetRemoteAddress(int pc, char *buffer, int size);
@ -215,8 +219,9 @@ RTC_EXPORT int rtcGetTrackDescription(int tr, char *buffer, int size);
/// @param _direction Direction
/// @param _name Name (optional)
/// @param _msid MSID (optional)
/// @param _trackID Track ID used in MSID (optional)
/// @returns Track id
RTC_EXPORT int rtcAddTrackEx(int pc, rtcCodec codec, int payloadType, uint32_t ssrc, const char *_mid, rtcDirection direction, const char *_name, const char *_msid);
RTC_EXPORT int rtcAddTrackEx(int pc, rtcCodec codec, int payloadType, uint32_t ssrc, const char *_mid, rtcDirection direction, const char *_name, const char *_msid, const char *_trackID);
/// Set H264PacketizationHandler for track
/// @param tr Track id
@ -239,6 +244,15 @@ RTC_EXPORT int rtcSetH264PacketizationHandler(int tr, uint32_t ssrc, const char
/// @param _timestamp Timestamp
RTC_EXPORT int rtcSetOpusPacketizationHandler(int tr, uint32_t ssrc, const char * cname, uint8_t payloadType, uint32_t clockRate, uint16_t _sequenceNumber, uint32_t _timestamp);
/// Chain RtcpSrReporter to handler chain for given track
/// @param tr Track id
int rtcChainRtcpSrReporter(int tr);
/// Chain RtcpNackResponder to handler chain for given track
/// @param tr Track id
/// @param maxStoredPacketsCount Maximum stored packet count
int rtcChainRtcpNackResponder(int tr, unsigned maxStoredPacketsCount);
/// Set start time for RTP stream
/// @param startTime_s Start time in seconds
/// @param timeIntervalSince1970 Set true if `startTime_s` is time interval since 1970, false if `startTime_s` is time interval since 1900
@ -249,7 +263,6 @@ int rtcSetRtpConfigurationStartTime(int id, double startTime_s, bool timeInterva
/// @param id Track identifier
int rtcStartRtcpSenderReporterRecording(int id);
/// Transform seconds to timestamp using track's clock rate
/// @param id Track id
/// @param seconds Seconds
@ -282,9 +295,9 @@ int rtcSetTrackRTPTimestamp(int id, uint32_t timestamp);
/// @param timestamp Pointer for result
int rtcGetPreviousTrackSenderReportTimestamp(int id, uint32_t * timestamp);
/// Set `NeedsToReport` flag in RtcpSenderReporter handler identified by given track id
/// Set `NeedsToReport` flag in RtcpSrReporter handler identified by given track id
/// @param id Track id
int rtcSetNeedsToSendRTCPSR(int id);
int rtcSetNeedsToSendRtcpSr(int id);
#endif // RTC_ENABLE_MEDIA

View File

@ -27,8 +27,11 @@
#if RTC_ENABLE_MEDIA
// RTCP handling
// Media handling
#include "rtcpreceivingsession.hpp"
#include "mediachainablehandler.hpp"
#include "rtcpsrreporter.hpp"
#include "rtcpnackresponder.hpp"
// Opus/h264 streaming
#include "h264packetizationhandler.hpp"

View File

@ -0,0 +1,94 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef RTC_RTCP_NACK_RESPONDER_H
#define RTC_RTCP_NACK_RESPONDER_H
#if RTC_ENABLE_MEDIA
#include "mediahandlerelement.hpp"
#include <unordered_map>
#include <queue>
namespace rtc {
class RTC_CPP_EXPORT RtcpNackResponder: public MediaHandlerElement {
/// Packet storage
class RTC_CPP_EXPORT Storage {
/// Packet storage element
struct RTC_CPP_EXPORT Element {
Element(binary_ptr packet, uint16_t sequenceNumber, std::shared_ptr<Element> next = nullptr);
const binary_ptr packet;
const uint16_t sequenceNumber;
/// Pointer to newer element
std::shared_ptr<Element> next = nullptr;
};
private:
/// Oldest packet in storage
std::shared_ptr<Element> oldest = nullptr;
/// Newest packet in storage
std::shared_ptr<Element> newest = nullptr;
/// Inner storage
std::unordered_map<uint16_t, std::shared_ptr<Element>> storage{};
/// Maximum storage size
const unsigned maximumSize;
/// Returnst current size
unsigned size();
public:
static const unsigned defaultMaximumSize = 512;
Storage(unsigned _maximumSize);
/// Returns packet with given sequence number
std::optional<binary_ptr> get(uint16_t sequenceNumber);
/// Stores packet
/// @param packet Packet
void store(binary_ptr packet);
};
const std::shared_ptr<Storage> storage;
std::mutex reportMutex;
public:
RtcpNackResponder(unsigned maxStoredPacketCount = Storage::defaultMaximumSize);
/// Checks for RTCP NACK and handles it,
/// @param message RTCP message
/// @returns unchanged RTCP message and requested RTP packets
ChainedIncomingControlProduct processIncomingControlMessage(message_ptr message) override;
/// Stores RTP packets in internal storage
/// @param messages RTP packets
/// @param control RTCP
/// @returns Unchanged RTP and RTCP
ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) override;
};
} // namespace rtc
#endif /* RTC_ENABLE_MEDIA */
#endif /* RTC_RTCP_NACK_RESPONDER_H */

View File

@ -23,14 +23,14 @@
#if RTC_ENABLE_MEDIA
#include "include.hpp"
#include "rtcphandler.hpp"
#include "mediahandler.hpp"
#include "message.hpp"
#include "rtp.hpp"
namespace rtc {
// An RtcpSession can be plugged into a Track to handle the whole RTCP session
class RTC_CPP_EXPORT RtcpReceivingSession : public RtcpHandler {
class RTC_CPP_EXPORT RtcpReceivingSession : public MediaHandler {
public:
message_ptr incoming(message_ptr ptr) override;
message_ptr outgoing(message_ptr ptr) override;

View File

@ -1,5 +1,4 @@
/*
* libdatachannel streamer example
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
@ -23,11 +22,12 @@
#include "message.hpp"
#include "rtppacketizationconfig.hpp"
#include "mediahandlerelement.hpp"
namespace rtc {
/// Class for sending RTCP SR
class RTC_CPP_EXPORT RtcpSenderReporter {
class RTC_CPP_EXPORT RtcpSrReporter: public MediaHandlerElement {
bool needsToReport = false;
uint32_t packetCount = 0;
@ -39,10 +39,6 @@ class RTC_CPP_EXPORT RtcpSenderReporter {
void addToReport(RTP *rtp, uint32_t rtpSize);
message_ptr getSenderReport(uint32_t timestamp);
protected:
/// Outgoing callback for sender reports
synchronized_callback<message_ptr> senderReportOutgoingCallback;
public:
static uint64_t secondsToNTP(double seconds);
@ -52,7 +48,9 @@ public:
/// RTP configuration
const std::shared_ptr<RtpPacketizationConfig> rtpConfig;
RtcpSenderReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
RtcpSrReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig);
ChainedOutgoingProduct processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) override;
/// Set `needsToReport` flag. Sender report will be sent before next RTP packet with same
/// timestamp.
@ -63,27 +61,6 @@ public:
/// @note `time_offset = rtpConfig->startTime_s -
/// rtpConfig->timestampToSeconds(rtpConfig->timestamp)`
void startRecording();
/// Send RTCP SR with given timestamp
/// @param timestamp timestamp of the RTCP SR
void sendReport(uint32_t timestamp);
protected:
/// Calls given block with function for statistics. Sends RTCP SR packet with current timestamp
/// before `block` call if `needs_to_report` flag is true.
/// @param block Block of code to run. This block has function for rtp stats recording.
template <typename T>
T withStatsRecording(std::function<T(std::function<void(message_ptr)>)> block) {
if (needsToReport) {
sendReport(rtpConfig->timestamp);
needsToReport = false;
}
auto result = block([this](message_ptr _rtp) {
auto rtp = reinterpret_cast<RTP *>(_rtp->data());
this->addToReport(rtp, _rtp->size());
});
return result;
}
};
} // namespace rtc

View File

@ -298,13 +298,16 @@ public:
private:
uint8_t _length;
char _text;
char _text[1];
public:
inline std::string text() const { return std::string(&_text, _length); }
inline std::string text() const { return std::string(_text, _length); }
inline void setText(std::string text) {
_length = text.length();
memcpy(&_text, text.data(), _length);
if(text.size() > 0xFF)
throw std::invalid_argument("text is too long");
_length = uint8_t(text.size());
memcpy(_text, text.data(), text.size());
}
inline uint8_t length() { return _length; }
@ -334,12 +337,12 @@ public:
return reinterpret_cast<RTCP_SDES_ITEM *>(base);
}
long safelyCountChunkSize(unsigned int maxChunkSize) {
long safelyCountChunkSize(size_t maxChunkSize) {
if (maxChunkSize < RTCP_SDES_CHUNK::size({})) {
// chunk is truncated
return -1;
} else {
unsigned int size = sizeof(SSRC);
size_t size = sizeof(SSRC);
unsigned int i = 0;
// We can always access first 4 bytes of first item (in case of no items there will be 4
// null bytes)
@ -407,7 +410,7 @@ public:
auto chunk = getChunk(i);
chunkSize += chunk->getSize();
}
uint16_t length = (sizeof(header) + chunkSize) / 4 - 1;
uint16_t length = uint16_t((sizeof(header) + chunkSize) / 4 - 1);
header.prepareHeader(202, chunkCount, length);
}
@ -617,8 +620,31 @@ struct RTCP_FIR {
};
struct RTCP_NACK_PART {
uint16_t pid;
uint16_t blp;
uint16_t _pid;
uint16_t _blp;
uint16_t getPID() { return ntohs(_pid); }
uint16_t getBLP() { return ntohs(_blp); }
void setPID(uint16_t pid) { _pid = htons(pid); }
void setBLP(uint16_t blp) { _blp = htons(blp); }
std::vector<uint16_t> getSequenceNumbers() {
std::vector<uint16_t> result{};
result.reserve(17);
uint16_t pid = getPID();
result.push_back(pid);
uint16_t bitmask = getBLP();
uint16_t i = pid + 1;
while (bitmask > 0) {
if (bitmask & 0x1) {
result.push_back(i);
}
i += 1;
bitmask >>= 1;
}
return result;
}
};
class RTCP_NACK {
@ -644,16 +670,16 @@ public:
*/
bool addMissingPacket(unsigned int *fciCount, uint16_t *fciPID, uint16_t missingPacket) {
if (*fciCount == 0 || missingPacket < *fciPID || missingPacket > (*fciPID + 16)) {
parts[*fciCount].pid = htons(missingPacket);
parts[*fciCount].blp = 0;
parts[*fciCount].setPID(missingPacket);
parts[*fciCount].setBLP(0);
*fciPID = missingPacket;
(*fciCount)++;
return true;
} else {
// TODO SPEEED!
auto blp = ntohs(parts[(*fciCount) - 1].blp);
auto newBit = 1u << (unsigned int)(missingPacket - (1 + *fciPID));
parts[(*fciCount) - 1].blp = htons(blp | newBit);
// TODO SPEED!
uint16_t blp = parts[(*fciCount) - 1].getBLP();
uint16_t newBit = uint16_t(1u << (missingPacket - (1 + *fciPID)));
parts[(*fciCount) - 1].setBLP(blp | newBit);
return false;
}
}

View File

@ -44,7 +44,7 @@ public:
/// @note This function increase sequence number after packetization.
/// @param payload RTP payload
/// @param setMark Set marker flag in RTP packet if true
virtual message_ptr packetize(binary payload, bool setMark);
virtual std::shared_ptr<binary> packetize(std::shared_ptr<binary> payload, bool setMark);
};
} // namespace rtc

View File

@ -24,7 +24,7 @@
#include "include.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "rtcphandler.hpp"
#include "mediahandler.hpp"
#include <atomic>
#include <variant>
@ -43,6 +43,7 @@ public:
string mid() const;
Description::Media description() const;
Description::Direction direction() const;
void setDescription(Description::Media description);
@ -62,8 +63,8 @@ public:
bool requestKeyframe();
// RTCP handler
void setRtcpHandler(std::shared_ptr<RtcpHandler> handler);
std::shared_ptr<RtcpHandler> getRtcpHandler();
void setRtcpHandler(std::shared_ptr<MediaHandler> handler);
std::shared_ptr<MediaHandler> getRtcpHandler();
private:
#if RTC_ENABLE_MEDIA
@ -75,13 +76,14 @@ private:
bool outgoing(message_ptr message);
Description::Media mMediaDescription;
std::shared_ptr<MediaHandler> mRtcpHandler;
mutable std::shared_mutex mMutex;
std::atomic<bool> mIsClosed = false;
Queue<message_ptr> mRecvQueue;
std::shared_mutex mRtcpHandlerMutex;
std::shared_ptr<RtcpHandler> mRtcpHandler;
friend class PeerConnection;
};

View File

@ -53,7 +53,8 @@ std::unordered_map<int, shared_ptr<PeerConnection>> peerConnectionMap;
std::unordered_map<int, shared_ptr<DataChannel>> dataChannelMap;
std::unordered_map<int, shared_ptr<Track>> trackMap;
#if RTC_ENABLE_MEDIA
std::unordered_map<int, shared_ptr<RtcpSenderReporter>> rtcpSenderMap;
std::unordered_map<int, shared_ptr<MediaChainableHandler>> rtcpChainableHandlerMap;
std::unordered_map<int, shared_ptr<RtcpSrReporter>> rtcpSrReporterMap;
std::unordered_map<int, shared_ptr<RtpPacketizationConfig>> rtpConfigMap;
#endif
#if RTC_ENABLE_WEBSOCKET
@ -141,7 +142,8 @@ void eraseTrack(int tr) {
if (trackMap.erase(tr) == 0)
throw std::invalid_argument("Track ID does not exist");
#if RTC_ENABLE_MEDIA
rtcpSenderMap.erase(tr);
rtcpSrReporterMap.erase(tr);
rtcpChainableHandlerMap.erase(tr);
rtpConfigMap.erase(tr);
#endif
userPointerMap.erase(tr);
@ -149,25 +151,41 @@ void eraseTrack(int tr) {
#if RTC_ENABLE_MEDIA
shared_ptr<RtcpSenderReporter> getRTCPSender(int id) {
shared_ptr<RtcpSrReporter> getRtcpSrReporter(int id) {
std::lock_guard lock(mutex);
if (auto it = rtcpSenderMap.find(id); it != rtcpSenderMap.end())
if (auto it = rtcpSrReporterMap.find(id); it != rtcpSrReporterMap.end()) {
return it->second;
else
throw std::invalid_argument("RtcpSenderReporter ID does not exist");
} else {
throw std::invalid_argument("RtcpSRReporter ID does not exist");
}
}
void emplaceRTCPSender(shared_ptr<RtcpSenderReporter> ptr, int tr) {
void emplaceRtcpSrReporter(shared_ptr<RtcpSrReporter> ptr, int tr) {
std::lock_guard lock(mutex);
rtcpSenderMap.emplace(std::make_pair(tr, ptr));
rtcpSrReporterMap.emplace(std::make_pair(tr, ptr));
}
shared_ptr<MediaChainableHandler> getMediaChainableHandler(int id) {
std::lock_guard lock(mutex);
if (auto it = rtcpChainableHandlerMap.find(id); it != rtcpChainableHandlerMap.end()) {
return it->second;
} else {
throw std::invalid_argument("RtcpChainableHandler ID does not exist");
}
}
void emplaceMediaChainableHandler(shared_ptr<MediaChainableHandler> ptr, int tr) {
std::lock_guard lock(mutex);
rtcpChainableHandlerMap.emplace(std::make_pair(tr, ptr));
}
shared_ptr<RtpPacketizationConfig> getRTPConfig(int id) {
std::lock_guard lock(mutex);
if (auto it = rtpConfigMap.find(id); it != rtpConfigMap.end())
if (auto it = rtpConfigMap.find(id); it != rtpConfigMap.end()) {
return it->second;
else
} else {
throw std::invalid_argument("RTPConfiguration ID does not exist");
}
}
void emplaceRTPConfig(shared_ptr<RtpPacketizationConfig> ptr, int tr) {
@ -254,12 +272,6 @@ template <typename F> int wrap(F func) {
}
}
#define WRAP(statement) \
wrap([&]() { \
statement; \
return RTC_ERR_SUCCESS; \
})
int copyAndReturn(string s, char *buffer, int size) {
if (!buffer)
return int(s.size() + 1);
@ -344,7 +356,7 @@ void rtcSetUserPointer(int i, void *ptr) { setUserPointer(i, ptr); }
void *rtcGetUserPointer(int i) { return getUserPointer(i).value_or(nullptr); }
int rtcCreatePeerConnection(const rtcConfiguration *config) {
return WRAP({
return wrap([config] {
Configuration c;
for (int i = 0; i < config->iceServersCount; ++i)
c.iceServers.emplace_back(string(config->iceServers[i]));
@ -360,7 +372,7 @@ int rtcCreatePeerConnection(const rtcConfiguration *config) {
}
int rtcDeletePeerConnection(int pc) {
return WRAP({
return wrap([pc] {
auto peerConnection = getPeerConnection(pc);
peerConnection->onDataChannel(nullptr);
peerConnection->onTrack(nullptr);
@ -370,13 +382,14 @@ int rtcDeletePeerConnection(int pc) {
peerConnection->onGatheringStateChange(nullptr);
erasePeerConnection(pc);
return RTC_ERR_SUCCESS;
});
}
int rtcAddDataChannel(int pc, const char *label) { return rtcAddDataChannelEx(pc, label, nullptr); }
int rtcAddDataChannelEx(int pc, const char *label, const rtcDataChannelInit *init) {
return WRAP({
return wrap([&] {
DataChannelInit dci = {};
if (init) {
auto *reliability = &init->reliability;
@ -420,7 +433,7 @@ int rtcCreateDataChannelEx(int pc, const char *label, const rtcDataChannelInit *
}
int rtcDeleteDataChannel(int dc) {
return WRAP({
return wrap([dc] {
auto dataChannel = getDataChannel(dc);
dataChannel->onOpen(nullptr);
dataChannel->onClosed(nullptr);
@ -430,12 +443,13 @@ int rtcDeleteDataChannel(int dc) {
dataChannel->onAvailable(nullptr);
eraseDataChannel(dc);
return RTC_ERR_SUCCESS;
});
}
#if RTC_ENABLE_MEDIA
void setSSRC(Description::Media *description, uint32_t ssrc, const char *_name, const char *_msid) {
void setSSRC(Description::Media *description, uint32_t ssrc, const char *_name, const char *_msid, const char *_trackID) {
optional<string> name = nullopt;
if (_name) {
@ -447,12 +461,17 @@ void setSSRC(Description::Media *description, uint32_t ssrc, const char *_name,
msid = string(_msid);
}
description->addSSRC(ssrc, name, msid);
optional<string> trackID = nullopt;
if (_trackID) {
trackID = string(_trackID);
}
description->addSSRC(ssrc, name, msid, trackID);
}
int rtcAddTrackEx(int pc, rtcCodec codec, int payloadType, uint32_t ssrc, const char *_mid,
rtcDirection _direction, const char *_name, const char *_msid) {
return WRAP({
rtcDirection _direction, const char *_name, const char *_msid, const char *_trackID) {
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
auto direction = rtcDirectionToDirection(_direction);
@ -516,7 +535,7 @@ int rtcAddTrackEx(int pc, rtcCodec codec, int payloadType, uint32_t ssrc, const
throw std::invalid_argument("Unexpected codec");
} else {
auto description = optDescription.value();
setSSRC(&description, ssrc, _name, _msid);
setSSRC(&description, ssrc, _name, _msid, _trackID);
int tr = emplaceTrack(peerConnection->addTrack(std::move(description)));
if (auto ptr = getUserPointer(pc)) {
@ -530,114 +549,144 @@ int rtcAddTrackEx(int pc, rtcCodec codec, int payloadType, uint32_t ssrc, const
int rtcSetH264PacketizationHandler(int tr, uint32_t ssrc, const char *cname, uint8_t payloadType,
uint32_t clockRate, uint16_t maxFragmentSize, uint16_t sequenceNumber,
uint32_t timestamp) {
return WRAP({
return wrap([&] {
auto track = getTrack(tr);
// create RTP configuration
auto rtpConfig = getNewRtpPacketizationConfig(ssrc, cname, payloadType, clockRate,
sequenceNumber, timestamp);
// create packetizer
auto packetizer = shared_ptr<H264RtpPacketizer>(new H264RtpPacketizer(rtpConfig));
// create H264 and RTCP SP handler
shared_ptr<H264PacketizationHandler> h264Handler(
new H264PacketizationHandler(H264PacketizationHandler::Separator::Length, packetizer, maxFragmentSize));
emplaceRTCPSender(h264Handler, tr);
auto packetizer = std::make_shared<H264RtpPacketizer>(rtpConfig, maxFragmentSize);
// create H264 handler
auto h264Handler = std::make_shared<H264PacketizationHandler>(packetizer);
emplaceMediaChainableHandler(h264Handler, tr);
emplaceRTPConfig(rtpConfig, tr);
// set handler
track->setRtcpHandler(h264Handler);
return RTC_ERR_SUCCESS;
});
}
int rtcSetOpusPacketizationHandler(int tr, uint32_t ssrc, const char *cname, uint8_t payloadType,
uint32_t clockRate, uint16_t sequenceNumber,
uint32_t timestamp) {
return WRAP({
return wrap([&] {
auto track = getTrack(tr);
// create RTP configuration
auto rtpConfig = getNewRtpPacketizationConfig(ssrc, cname, payloadType, clockRate,
sequenceNumber, timestamp);
// create packetizer
auto packetizer = shared_ptr<OpusRtpPacketizer>(new OpusRtpPacketizer(rtpConfig));
// create Opus and RTCP SP handler
shared_ptr<OpusPacketizationHandler> opusHandler(new OpusPacketizationHandler(packetizer));
emplaceRTCPSender(opusHandler, tr);
auto packetizer = std::make_shared<OpusRtpPacketizer>(rtpConfig);
// create Opus handler
auto opusHandler = std::make_shared<OpusPacketizationHandler>(packetizer);
emplaceMediaChainableHandler(opusHandler, tr);
emplaceRTPConfig(rtpConfig, tr);
// set handler
track->setRtcpHandler(opusHandler);
return RTC_ERR_SUCCESS;
});
}
int rtcChainRtcpSrReporter(int tr) {
return wrap([tr] {
auto config = getRTPConfig(tr);
auto reporter = std::make_shared<RtcpSrReporter>(config);
emplaceRtcpSrReporter(reporter, tr);
auto chainableHandler = getMediaChainableHandler(tr);
chainableHandler->addToChain(reporter);
return RTC_ERR_SUCCESS;
});
}
int rtcChainRtcpNackResponder(int tr, unsigned maxStoredPacketsCount) {
return wrap([tr, maxStoredPacketsCount] {
auto responder = std::make_shared<RtcpNackResponder>(maxStoredPacketsCount);
auto chainableHandler = getMediaChainableHandler(tr);
chainableHandler->addToChain(responder);
return RTC_ERR_SUCCESS;
});
}
int rtcSetRtpConfigurationStartTime(int id, double startTime_s, bool timeIntervalSince1970,
uint32_t timestamp) {
return WRAP({
return wrap([&] {
auto config = getRTPConfig(id);
auto epoch = RtpPacketizationConfig::EpochStart::T1900;
if (timeIntervalSince1970) {
epoch = RtpPacketizationConfig::EpochStart::T1970;
}
config->setStartTime(startTime_s, epoch, timestamp);
return RTC_ERR_SUCCESS;
});
}
int rtcStartRtcpSenderReporterRecording(int id) {
return WRAP({
auto sender = getRTCPSender(id);
return wrap([id] {
auto sender = getRtcpSrReporter(id);
sender->startRecording();
return RTC_ERR_SUCCESS;
});
}
int rtcTransformSecondsToTimestamp(int id, double seconds, uint32_t *timestamp) {
return WRAP({
return wrap([&] {
auto config = getRTPConfig(id);
*timestamp = config->secondsToTimestamp(seconds);
return RTC_ERR_SUCCESS;
});
}
int rtcTransformTimestampToSeconds(int id, uint32_t timestamp, double *seconds) {
return WRAP({
return wrap([&] {
auto config = getRTPConfig(id);
*seconds = config->timestampToSeconds(timestamp);
return RTC_ERR_SUCCESS;
});
}
int rtcGetCurrentTrackTimestamp(int id, uint32_t *timestamp) {
return WRAP({
return wrap([&] {
auto config = getRTPConfig(id);
*timestamp = config->timestamp;
return RTC_ERR_SUCCESS;
});
}
int rtcGetTrackStartTimestamp(int id, uint32_t *timestamp) {
return WRAP({
return wrap([&] {
auto config = getRTPConfig(id);
*timestamp = config->startTimestamp;
return RTC_ERR_SUCCESS;
});
}
int rtcSetTrackRTPTimestamp(int id, uint32_t timestamp) {
return WRAP({
return wrap([&] {
auto config = getRTPConfig(id);
config->timestamp = timestamp;
return RTC_ERR_SUCCESS;
});
}
int rtcGetPreviousTrackSenderReportTimestamp(int id, uint32_t *timestamp) {
return WRAP({
auto sender = getRTCPSender(id);
return wrap([&] {
auto sender = getRtcpSrReporter(id);
*timestamp = sender->previousReportedTimestamp;
return RTC_ERR_SUCCESS;
});
}
int rtcSetNeedsToSendRTCPSR(int id) {
return WRAP({
auto sender = getRTCPSender(id);
int rtcSetNeedsToSendRtcpSr(int id) {
return wrap([id]{
auto sender = getRtcpSrReporter(id);
sender->setNeedsToReport();
return RTC_ERR_SUCCESS;
});
}
#endif // RTC_ENABLE_MEDIA
int rtcAddTrack(int pc, const char *mediaDescriptionSdp) {
return WRAP({
return wrap([&] {
if (!mediaDescriptionSdp)
throw std::invalid_argument("Unexpected null pointer for track media description");
@ -652,7 +701,7 @@ int rtcAddTrack(int pc, const char *mediaDescriptionSdp) {
}
int rtcDeleteTrack(int tr) {
return WRAP({
return wrap([&] {
auto track = getTrack(tr);
track->onOpen(nullptr);
track->onClosed(nullptr);
@ -662,11 +711,12 @@ int rtcDeleteTrack(int tr) {
track->onAvailable(nullptr);
eraseTrack(tr);
return RTC_ERR_SUCCESS;
});
}
int rtcGetTrackDescription(int tr, char *buffer, int size) {
return WRAP({
return wrap([&] {
auto track = getTrack(tr);
return copyAndReturn(track->description(), buffer, size);
});
@ -674,7 +724,7 @@ int rtcGetTrackDescription(int tr, char *buffer, int size) {
#if RTC_ENABLE_WEBSOCKET
int rtcCreateWebSocket(const char *url) {
return WRAP({
return wrap([&] {
auto ws = std::make_shared<WebSocket>();
ws->open(url);
return emplaceWebSocket(ws);
@ -682,7 +732,7 @@ int rtcCreateWebSocket(const char *url) {
}
int rtcCreateWebSocketEx(const char *url, const rtcWsConfiguration *config) {
return WRAP({
return wrap([&] {
WebSocket::Configuration c;
c.disableTlsVerification = config->disableTlsVerification;
auto ws = std::make_shared<WebSocket>(c);
@ -692,7 +742,7 @@ int rtcCreateWebSocketEx(const char *url, const rtcWsConfiguration *config) {
}
int rtcDeleteWebsocket(int ws) {
return WRAP({
return wrap([&] {
auto webSocket = getWebSocket(ws);
webSocket->onOpen(nullptr);
webSocket->onClosed(nullptr);
@ -702,12 +752,13 @@ int rtcDeleteWebsocket(int ws) {
webSocket->onAvailable(nullptr);
eraseWebSocket(ws);
return RTC_ERR_SUCCESS;
});
}
#endif
int rtcSetLocalDescriptionCallback(int pc, rtcDescriptionCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onLocalDescription([pc, cb](Description desc) {
@ -716,11 +767,12 @@ int rtcSetLocalDescriptionCallback(int pc, rtcDescriptionCallbackFunc cb) {
});
else
peerConnection->onLocalDescription(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetLocalCandidateCallback(int pc, rtcCandidateCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onLocalCandidate([pc, cb](Candidate cand) {
@ -729,11 +781,12 @@ int rtcSetLocalCandidateCallback(int pc, rtcCandidateCallbackFunc cb) {
});
else
peerConnection->onLocalCandidate(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetStateChangeCallback(int pc, rtcStateChangeCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onStateChange([pc, cb](PeerConnection::State state) {
@ -742,11 +795,12 @@ int rtcSetStateChangeCallback(int pc, rtcStateChangeCallbackFunc cb) {
});
else
peerConnection->onStateChange(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetGatheringStateChangeCallback(int pc, rtcGatheringStateCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onGatheringStateChange([pc, cb](PeerConnection::GatheringState state) {
@ -755,11 +809,12 @@ int rtcSetGatheringStateChangeCallback(int pc, rtcGatheringStateCallbackFunc cb)
});
else
peerConnection->onGatheringStateChange(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetSignalingStateChangeCallback(int pc, rtcSignalingStateCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onSignalingStateChange([pc, cb](PeerConnection::SignalingState state) {
@ -768,11 +823,12 @@ int rtcSetSignalingStateChangeCallback(int pc, rtcSignalingStateCallbackFunc cb)
});
else
peerConnection->onGatheringStateChange(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetDataChannelCallback(int pc, rtcDataChannelCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onDataChannel([pc, cb](std::shared_ptr<DataChannel> dataChannel) {
@ -784,11 +840,12 @@ int rtcSetDataChannelCallback(int pc, rtcDataChannelCallbackFunc cb) {
});
else
peerConnection->onDataChannel(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetTrackCallback(int pc, rtcTrackCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (cb)
peerConnection->onTrack([pc, cb](std::shared_ptr<Track> track) {
@ -800,41 +857,45 @@ int rtcSetTrackCallback(int pc, rtcTrackCallbackFunc cb) {
});
else
peerConnection->onTrack(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetLocalDescription(int pc, const char *type) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
peerConnection->setLocalDescription(type ? Description::stringToType(type)
: Description::Type::Unspec);
return RTC_ERR_SUCCESS;
});
}
int rtcSetRemoteDescription(int pc, const char *sdp, const char *type) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (!sdp)
throw std::invalid_argument("Unexpected null pointer for remote description");
peerConnection->setRemoteDescription({string(sdp), type ? string(type) : ""});
return RTC_ERR_SUCCESS;
});
}
int rtcAddRemoteCandidate(int pc, const char *cand, const char *mid) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (!cand)
throw std::invalid_argument("Unexpected null pointer for remote candidate");
peerConnection->addRemoteCandidate({string(cand), mid ? string(mid) : ""});
return RTC_ERR_SUCCESS;
});
}
int rtcGetLocalDescription(int pc, char *buffer, int size) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (auto desc = peerConnection->localDescription())
@ -845,7 +906,7 @@ int rtcGetLocalDescription(int pc, char *buffer, int size) {
}
int rtcGetRemoteDescription(int pc, char *buffer, int size) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (auto desc = peerConnection->remoteDescription())
@ -855,8 +916,30 @@ int rtcGetRemoteDescription(int pc, char *buffer, int size) {
});
}
int rtcGetLocalDescriptionType(int pc, char *buffer, int size) {
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (auto desc = peerConnection->localDescription())
return copyAndReturn(desc->typeString(), buffer, size);
else
return RTC_ERR_NOT_AVAIL;
});
}
int rtcGetRemoteDescriptionType(int pc, char *buffer, int size) {
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (auto desc = peerConnection->remoteDescription())
return copyAndReturn(desc->typeString(), buffer, size);
else
return RTC_ERR_NOT_AVAIL;
});
}
int rtcGetLocalAddress(int pc, char *buffer, int size) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (auto addr = peerConnection->localAddress())
@ -867,7 +950,7 @@ int rtcGetLocalAddress(int pc, char *buffer, int size) {
}
int rtcGetRemoteAddress(int pc, char *buffer, int size) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
if (auto addr = peerConnection->remoteAddress())
@ -878,7 +961,7 @@ int rtcGetRemoteAddress(int pc, char *buffer, int size) {
}
int rtcGetSelectedCandidatePair(int pc, char *local, int localSize, char *remote, int remoteSize) {
return WRAP({
return wrap([&] {
auto peerConnection = getPeerConnection(pc);
Candidate localCand;
@ -899,28 +982,28 @@ int rtcGetSelectedCandidatePair(int pc, char *local, int localSize, char *remote
}
int rtcGetDataChannelStream(int dc) {
return WRAP({
return wrap([dc] {
auto dataChannel = getDataChannel(dc);
return int(dataChannel->id());
});
}
int rtcGetDataChannelLabel(int dc, char *buffer, int size) {
return WRAP({
return wrap([&] {
auto dataChannel = getDataChannel(dc);
return copyAndReturn(dataChannel->label(), buffer, size);
});
}
int rtcGetDataChannelProtocol(int dc, char *buffer, int size) {
return WRAP({
return wrap([&] {
auto dataChannel = getDataChannel(dc);
return copyAndReturn(dataChannel->protocol(), buffer, size);
});
}
int rtcGetDataChannelReliability(int dc, rtcReliability *reliability) {
return WRAP({
return wrap([&] {
auto dataChannel = getDataChannel(dc);
if (!reliability)
@ -943,7 +1026,7 @@ int rtcGetDataChannelReliability(int dc, rtcReliability *reliability) {
}
int rtcSetOpenCallback(int id, rtcOpenCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
if (cb)
channel->onOpen([id, cb]() {
@ -952,11 +1035,12 @@ int rtcSetOpenCallback(int id, rtcOpenCallbackFunc cb) {
});
else
channel->onOpen(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetClosedCallback(int id, rtcClosedCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
if (cb)
channel->onClosed([id, cb]() {
@ -965,11 +1049,12 @@ int rtcSetClosedCallback(int id, rtcClosedCallbackFunc cb) {
});
else
channel->onClosed(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetErrorCallback(int id, rtcErrorCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
if (cb)
channel->onError([id, cb](string error) {
@ -978,11 +1063,12 @@ int rtcSetErrorCallback(int id, rtcErrorCallbackFunc cb) {
});
else
channel->onError(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSetMessageCallback(int id, rtcMessageCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
if (cb)
channel->onMessage(
@ -996,11 +1082,12 @@ int rtcSetMessageCallback(int id, rtcMessageCallbackFunc cb) {
});
else
channel->onMessage(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcSendMessage(int id, const char *data, int size) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
if (!data && size != 0)
@ -1020,21 +1107,22 @@ int rtcSendMessage(int id, const char *data, int size) {
}
int rtcGetBufferedAmount(int id) {
return WRAP({
return wrap([id] {
auto channel = getChannel(id);
return int(channel->bufferedAmount());
});
}
int rtcSetBufferedAmountLowThreshold(int id, int amount) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
channel->setBufferedAmountLowThreshold(size_t(amount));
return RTC_ERR_SUCCESS;
});
}
int rtcSetBufferedAmountLowCallback(int id, rtcBufferedAmountLowCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
if (cb)
channel->onBufferedAmountLow([id, cb]() {
@ -1043,15 +1131,16 @@ int rtcSetBufferedAmountLowCallback(int id, rtcBufferedAmountLowCallbackFunc cb)
});
else
channel->onBufferedAmountLow(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcGetAvailableAmount(int id) {
return WRAP({ return int(getChannel(id)->availableAmount()); });
return wrap([id] { return int(getChannel(id)->availableAmount()); });
}
int rtcSetAvailableCallback(int id, rtcAvailableCallbackFunc cb) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
if (cb)
channel->onAvailable([id, cb]() {
@ -1060,11 +1149,12 @@ int rtcSetAvailableCallback(int id, rtcAvailableCallbackFunc cb) {
});
else
channel->onAvailable(nullptr);
return RTC_ERR_SUCCESS;
});
}
int rtcReceiveMessage(int id, char *buffer, int *size) {
return WRAP({
return wrap([&] {
auto channel = getChannel(id);
if (!size)

View File

@ -87,21 +87,34 @@ DataChannel::~DataChannel() { close(); }
uint16_t DataChannel::stream() const { return mStream; }
uint16_t DataChannel::id() const { return uint16_t(mStream); }
uint16_t DataChannel::id() const { return mStream; }
string DataChannel::label() const { return mLabel; }
string DataChannel::label() const {
std::shared_lock lock(mMutex);
return mLabel;
}
string DataChannel::protocol() const { return mProtocol; }
string DataChannel::protocol() const {
std::shared_lock lock(mMutex);
return mProtocol;
}
Reliability DataChannel::reliability() const { return *mReliability; }
Reliability DataChannel::reliability() const {
std::shared_lock lock(mMutex);
return *mReliability;
}
void DataChannel::close() {
std::shared_ptr<SctpTransport> transport;
{
std::shared_lock lock(mMutex);
transport = mSctpTransport.lock();
}
mIsClosed = true;
if (mIsOpen.exchange(false))
if (auto transport = mSctpTransport.lock())
if (mIsOpen.exchange(false) && transport)
transport->closeStream(mStream);
mSctpTransport.reset();
resetCallbacks();
}
@ -110,7 +123,6 @@ void DataChannel::remoteClose() {
triggerClosed();
mIsOpen = false;
mSctpTransport.reset();
}
bool DataChannel::send(message_variant data) { return outgoing(make_message(std::move(data))); }
@ -167,7 +179,10 @@ size_t DataChannel::maxMessageSize() const {
size_t DataChannel::availableAmount() const { return mRecvQueue.amount(); }
void DataChannel::open(shared_ptr<SctpTransport> transport) {
{
std::unique_lock lock(mMutex);
mSctpTransport = transport;
}
if (!mIsOpen.exchange(true))
triggerOpen();
@ -179,19 +194,22 @@ void DataChannel::processOpenMessage(message_ptr) {
}
bool DataChannel::outgoing(message_ptr message) {
if (mIsClosed)
std::shared_ptr<SctpTransport> transport;
{
std::shared_lock lock(mMutex);
transport = mSctpTransport.lock();
if (!transport || mIsClosed)
throw std::runtime_error("DataChannel is closed");
if (message->size() > maxMessageSize())
throw std::runtime_error("Message size exceeds limit");
auto transport = mSctpTransport.lock();
if (!transport)
throw std::runtime_error("DataChannel transport is not open");
// Before the ACK has been received on a DataChannel, all messages must be sent ordered
message->reliability = mIsOpen ? mReliability : nullptr;
message->stream = mStream;
}
return transport->send(message);
}
@ -235,20 +253,21 @@ void DataChannel::incoming(message_ptr message) {
}
}
NegociatedDataChannel::NegociatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream,
NegotiatedDataChannel::NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc, uint16_t stream,
string label, string protocol, Reliability reliability)
: DataChannel(pc, stream, std::move(label), std::move(protocol), std::move(reliability)) {}
NegociatedDataChannel::NegociatedDataChannel(std::weak_ptr<PeerConnection> pc,
NegotiatedDataChannel::NegotiatedDataChannel(std::weak_ptr<PeerConnection> pc,
std::weak_ptr<SctpTransport> transport,
uint16_t stream)
: DataChannel(pc, stream, "", "", {}) {
mSctpTransport = transport;
}
NegociatedDataChannel::~NegociatedDataChannel() {}
NegotiatedDataChannel::~NegotiatedDataChannel() {}
void NegociatedDataChannel::open(shared_ptr<SctpTransport> transport) {
void NegotiatedDataChannel::open(shared_ptr<SctpTransport> transport) {
std::unique_lock lock(mMutex);
mSctpTransport = transport;
uint8_t channelType;
@ -287,10 +306,13 @@ void NegociatedDataChannel::open(shared_ptr<SctpTransport> transport) {
std::copy(mLabel.begin(), mLabel.end(), end);
std::copy(mProtocol.begin(), mProtocol.end(), end + mLabel.size());
lock.unlock();
transport->send(make_message(buffer.begin(), buffer.end(), Message::Control, mStream));
}
void NegociatedDataChannel::processOpenMessage(message_ptr message) {
void NegotiatedDataChannel::processOpenMessage(message_ptr message) {
std::unique_lock lock(mMutex);
auto transport = mSctpTransport.lock();
if (!transport)
throw std::runtime_error("DataChannel has no transport");
@ -326,6 +348,8 @@ void NegociatedDataChannel::processOpenMessage(message_ptr message) {
mReliability->rexmit = int(0);
}
lock.unlock();
binary buffer(sizeof(AckMessage), byte(0));
auto &ack = *reinterpret_cast<AckMessage *>(buffer.data());
ack.type = MESSAGE_ACK;

View File

@ -525,20 +525,20 @@ Description::Entry::removeAttribute(std::vector<string>::iterator it) {
}
void Description::Media::addSSRC(uint32_t ssrc, std::optional<string> name,
std::optional<string> msid) {
std::optional<string> msid, std::optional<string> trackID) {
if (name)
mAttributes.emplace_back("ssrc:" + std::to_string(ssrc) + " cname:" + *name);
else
mAttributes.emplace_back("ssrc:" + std::to_string(ssrc));
if (msid)
mAttributes.emplace_back("ssrc:" + std::to_string(ssrc) + " msid:" + *msid + " " + *msid);
mAttributes.emplace_back("ssrc:" + std::to_string(ssrc) + " msid:" + *msid + " " + trackID.value_or(*msid));
mSsrcs.emplace_back(ssrc);
}
void Description::Media::replaceSSRC(uint32_t oldSSRC, uint32_t ssrc, std::optional<string> name,
std::optional<string> msid) {
std::optional<string> msid, std::optional<string> trackID) {
auto it = mAttributes.begin();
while (it != mAttributes.end()) {
if (it->find("ssrc:" + std::to_string(oldSSRC)) == 0) {
@ -546,7 +546,7 @@ void Description::Media::replaceSSRC(uint32_t oldSSRC, uint32_t ssrc, std::optio
} else
it++;
}
addSSRC(ssrc, std::move(name), std::move(msid));
addSSRC(ssrc, std::move(name), std::move(msid), std::move(trackID));
}
void Description::Media::removeSSRC(uint32_t oldSSRC) {

View File

@ -58,10 +58,11 @@ void DtlsSrtpTransport::Cleanup() { srtp_shutdown(); }
DtlsSrtpTransport::DtlsSrtpTransport(std::shared_ptr<IceTransport> lower,
shared_ptr<Certificate> certificate,
std::optional<size_t> mtu,
verifier_callback verifierCallback,
message_callback srtpRecvCallback,
state_callback stateChangeCallback)
: DtlsTransport(lower, certificate, std::move(verifierCallback),
: DtlsTransport(lower, certificate, mtu, std::move(verifierCallback),
std::move(stateChangeCallback)),
mSrtpRecvCallback(std::move(srtpRecvCallback)) { // distinct from Transport recv callback

View File

@ -39,9 +39,9 @@ public:
static void Init();
static void Cleanup();
DtlsSrtpTransport(std::shared_ptr<IceTransport> lower, std::shared_ptr<Certificate> certificate,
verifier_callback verifierCallback, message_callback srtpRecvCallback,
state_callback stateChangeCallback);
DtlsSrtpTransport(std::shared_ptr<IceTransport> lower, certificate_ptr certificate,
std::optional<size_t> mtu, verifier_callback verifierCallback,
message_callback srtpRecvCallback, state_callback stateChangeCallback);
~DtlsSrtpTransport();
bool sendMedia(message_ptr message);

View File

@ -50,8 +50,9 @@ void DtlsTransport::Init() {
void DtlsTransport::Cleanup() { gnutls_global_deinit(); }
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr certificate,
verifier_callback verifierCallback, state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate),
std::optional<size_t> mtu, verifier_callback verifierCallback,
state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mMtu(mtu), mCertificate(certificate),
mVerifierCallback(std::move(verifierCallback)),
mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
@ -156,11 +157,15 @@ void DtlsTransport::postHandshake() {
}
void DtlsTransport::runRecvLoop() {
const size_t maxMtu = 4096;
const size_t bufferSize = 4096;
// Handshake loop
try {
changeState(State::Connecting);
gnutls_dtls_set_mtu(mSession, 1280 - 40 - 8); // min MTU over UDP/IPv6
size_t mtu = mMtu.value_or(DEFAULT_IPV4_MTU + 20) - 8 - 40; // UDP/IPv6
gnutls_dtls_set_mtu(mSession, static_cast<unsigned int>(mtu));
PLOG_VERBOSE << "SSL MTU set to " << mtu;
int ret;
do {
@ -174,7 +179,7 @@ void DtlsTransport::runRecvLoop() {
// RFC 8261: DTLS MUST support sending messages larger than the current path MTU
// See https://tools.ietf.org/html/rfc8261#section-5
gnutls_dtls_set_mtu(mSession, maxMtu + 1);
gnutls_dtls_set_mtu(mSession, bufferSize + 1);
} catch (const std::exception &e) {
PLOG_ERROR << "DTLS handshake: " << e.what();
@ -188,7 +193,6 @@ void DtlsTransport::runRecvLoop() {
postHandshake();
changeState(State::Connected);
const size_t bufferSize = maxMtu;
char buffer[bufferSize];
while (true) {
@ -314,8 +318,9 @@ void DtlsTransport::Cleanup() {
}
DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, shared_ptr<Certificate> certificate,
verifier_callback verifierCallback, state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mCertificate(certificate),
std::optional<size_t> mtu, verifier_callback verifierCallback,
state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mMtu(mtu), mCertificate(certificate),
mVerifierCallback(std::move(verifierCallback)),
mIsClient(lower->role() == Description::Role::Active), mCurrentDscp(0) {
PLOG_DEBUG << "Initializing DTLS transport (OpenSSL)";
@ -440,16 +445,18 @@ void DtlsTransport::postHandshake() {
}
void DtlsTransport::runRecvLoop() {
const size_t maxMtu = 4096;
const size_t bufferSize = 4096;
try {
changeState(State::Connecting);
SSL_set_mtu(mSsl, 1280 - 40 - 8); // min MTU over UDP/IPv6
size_t mtu = mMtu.value_or(DEFAULT_IPV4_MTU + 20) - 8 - 40; // UDP/IPv6
SSL_set_mtu(mSsl, static_cast<unsigned int>(mtu));
PLOG_VERBOSE << "SSL MTU set to " << mtu;
// Initiate the handshake
int ret = SSL_do_handshake(mSsl);
openssl::check(mSsl, ret, "Handshake failed");
const size_t bufferSize = maxMtu;
byte buffer[bufferSize];
while (mIncomingQueue.running()) {
// Process pending messages
@ -466,7 +473,7 @@ void DtlsTransport::runRecvLoop() {
if (SSL_is_init_finished(mSsl)) {
// RFC 8261: DTLS MUST support sending messages larger than the current path
// MTU See https://tools.ietf.org/html/rfc8261#section-5
SSL_set_mtu(mSsl, maxMtu + 1);
SSL_set_mtu(mSsl, bufferSize + 1);
PLOG_INFO << "DTLS handshake finished";
postHandshake();

View File

@ -44,7 +44,8 @@ public:
using verifier_callback = std::function<bool(const std::string &fingerprint)>;
DtlsTransport(std::shared_ptr<IceTransport> lower, certificate_ptr certificate,
verifier_callback verifierCallback, state_callback stateChangeCallback);
std::optional<size_t> mtu, verifier_callback verifierCallback,
state_callback stateChangeCallback);
~DtlsTransport();
virtual void start() override;
@ -57,6 +58,7 @@ protected:
virtual void postHandshake();
void runRecvLoop();
const std::optional<size_t> mMtu;
const certificate_ptr mCertificate;
const verifier_callback mVerifierCallback;
const bool mIsClient;

View File

@ -22,146 +22,7 @@
namespace rtc {
using std::function;
using std::make_shared;
using std::shared_ptr;
typedef enum {
NUSM_noMatch,
NUSM_firstZero,
NUSM_secondZero,
NUSM_thirdZero,
NUSM_shortMatch,
NUSM_longMatch
} NalUnitStartSequenceMatch;
NalUnitStartSequenceMatch StartSequenceMatchSucc(NalUnitStartSequenceMatch match, byte _byte,
H264PacketizationHandler::Separator separator) {
assert(separator != H264PacketizationHandler::Separator::Length);
auto byte = (uint8_t)_byte;
auto detectShort = separator == H264PacketizationHandler::Separator::ShortStartSequence ||
separator == H264PacketizationHandler::Separator::StartSequence;
auto detectLong = separator == H264PacketizationHandler::Separator::LongStartSequence ||
separator == H264PacketizationHandler::Separator::StartSequence;
switch (match) {
case NUSM_noMatch:
if (byte == 0x00) {
return NUSM_firstZero;
}
break;
case NUSM_firstZero:
if (byte == 0x00) {
return NUSM_secondZero;
}
break;
case NUSM_secondZero:
if (byte == 0x00 && detectLong) {
return NUSM_thirdZero;
} else if (byte == 0x01 && detectShort) {
return NUSM_shortMatch;
}
break;
case NUSM_thirdZero:
if (byte == 0x01 && detectLong) {
return NUSM_longMatch;
}
break;
case NUSM_shortMatch:
return NUSM_shortMatch;
case NUSM_longMatch:
return NUSM_longMatch;
}
return NUSM_noMatch;
}
message_ptr H264PacketizationHandler::incoming(message_ptr ptr) { return ptr; }
shared_ptr<NalUnits> H264PacketizationHandler::splitMessage(message_ptr message) {
auto nalus = make_shared<NalUnits>();
if (separator == Separator::Length) {
unsigned long long index = 0;
while (index < message->size()) {
assert(index + 4 < message->size());
if (index + 4 >= message->size()) {
LOG_WARNING << "Invalid NAL Unit data (incomplete length), ignoring!";
break;
}
auto lengthPtr = (uint32_t *)(message->data() + index);
uint32_t length = ntohl(*lengthPtr);
auto naluStartIndex = index + 4;
auto naluEndIndex = naluStartIndex + length;
assert(naluEndIndex <= message->size());
if (naluEndIndex > message->size()) {
LOG_WARNING << "Invalid NAL Unit data (incomplete unit), ignoring!";
break;
}
nalus->push_back(
NalUnit(message->begin() + naluStartIndex, message->begin() + naluEndIndex));
index = naluEndIndex;
}
} else {
NalUnitStartSequenceMatch match = NUSM_noMatch;
unsigned long long index = 0;
while (index < message->size()) {
match = StartSequenceMatchSucc(match, (*message)[index++], separator);
if (match == NUSM_longMatch || match == NUSM_shortMatch) {
match = NUSM_noMatch;
break;
}
}
index++;
unsigned long long naluStartIndex = index;
while (index < message->size()) {
match = StartSequenceMatchSucc(match, (*message)[index], separator);
if (match == NUSM_longMatch || match == NUSM_shortMatch) {
auto sequenceLength = match == NUSM_longMatch ? 4 : 3;
unsigned long long naluEndIndex = index - sequenceLength;
match = NUSM_noMatch;
nalus->push_back(NalUnit(message->begin() + naluStartIndex,
message->begin() + naluEndIndex + 1));
naluStartIndex = index + 1;
}
index++;
}
nalus->push_back(NalUnit(message->begin() + naluStartIndex, message->end()));
}
return nalus;
}
message_ptr H264PacketizationHandler::outgoing(message_ptr ptr) {
if (ptr->type == Message::Binary) {
auto nalus = splitMessage(ptr);
auto fragments = nalus->generateFragments(maximumFragmentSize);
auto lastPacket = withStatsRecording<message_ptr>(
[fragments, this](function<void(message_ptr)> addToReport) {
for (unsigned long long index = 0; index < fragments.size() - 1; index++) {
auto packet = packetizer->packetize(fragments[index], false);
addToReport(packet);
outgoingCallback(std::move(packet));
}
// packet is last, marker must be set
auto lastPacket = packetizer->packetize(fragments[fragments.size() - 1], true);
addToReport(lastPacket);
return lastPacket;
});
return lastPacket;
}
return ptr;
}
H264PacketizationHandler::H264PacketizationHandler(Separator separator,
std::shared_ptr<H264RtpPacketizer> packetizer,
uint16_t maximumFragmentSize)
: RtcpHandler(), RtcpSenderReporter(packetizer->rtpConfig), packetizer(packetizer),
maximumFragmentSize(maximumFragmentSize), separator(separator) {
senderReportOutgoingCallback = [this](message_ptr msg) { outgoingCallback(msg); };
}
H264PacketizationHandler::H264PacketizationHandler(std::shared_ptr<H264RtpPacketizer> packetizer): MediaChainableHandler(packetizer) { }
} // namespace rtc

View File

@ -22,8 +22,139 @@
namespace rtc {
H264RtpPacketizer::H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
: RtpPacketizer(rtpConfig) {}
using std::make_shared;
using std::shared_ptr;
typedef enum {
NUSM_noMatch,
NUSM_firstZero,
NUSM_secondZero,
NUSM_thirdZero,
NUSM_shortMatch,
NUSM_longMatch
} NalUnitStartSequenceMatch;
NalUnitStartSequenceMatch StartSequenceMatchSucc(NalUnitStartSequenceMatch match, byte _byte,
H264RtpPacketizer::Separator separator) {
assert(separator != H264RtpPacketizer::Separator::Length);
auto byte = (uint8_t)_byte;
auto detectShort = separator == H264RtpPacketizer::Separator::ShortStartSequence ||
separator == H264RtpPacketizer::Separator::StartSequence;
auto detectLong = separator == H264RtpPacketizer::Separator::LongStartSequence ||
separator == H264RtpPacketizer::Separator::StartSequence;
switch (match) {
case NUSM_noMatch:
if (byte == 0x00) {
return NUSM_firstZero;
}
break;
case NUSM_firstZero:
if (byte == 0x00) {
return NUSM_secondZero;
}
break;
case NUSM_secondZero:
if (byte == 0x00 && detectLong) {
return NUSM_thirdZero;
} else if (byte == 0x01 && detectShort) {
return NUSM_shortMatch;
}
break;
case NUSM_thirdZero:
if (byte == 0x01 && detectLong) {
return NUSM_longMatch;
}
break;
case NUSM_shortMatch:
return NUSM_shortMatch;
case NUSM_longMatch:
return NUSM_longMatch;
}
return NUSM_noMatch;
}
shared_ptr<NalUnits> H264RtpPacketizer::splitMessage(binary_ptr message) {
auto nalus = make_shared<NalUnits>();
if (separator == Separator::Length) {
unsigned long long index = 0;
while (index < message->size()) {
assert(index + 4 < message->size());
if (index + 4 >= message->size()) {
LOG_WARNING << "Invalid NAL Unit data (incomplete length), ignoring!";
break;
}
auto lengthPtr = (uint32_t *)(message->data() + index);
uint32_t length = ntohl(*lengthPtr);
auto naluStartIndex = index + 4;
auto naluEndIndex = naluStartIndex + length;
assert(naluEndIndex <= message->size());
if (naluEndIndex > message->size()) {
LOG_WARNING << "Invalid NAL Unit data (incomplete unit), ignoring!";
break;
}
auto begin = message->begin() + naluStartIndex;
auto end = message->begin() + naluEndIndex;
nalus->push_back(std::make_shared<NalUnit>(begin, end));
index = naluEndIndex;
}
} else {
NalUnitStartSequenceMatch match = NUSM_noMatch;
unsigned long long index = 0;
while (index < message->size()) {
match = StartSequenceMatchSucc(match, (*message)[index++], separator);
if (match == NUSM_longMatch || match == NUSM_shortMatch) {
match = NUSM_noMatch;
break;
}
}
unsigned long long naluStartIndex = index;
while (index < message->size()) {
match = StartSequenceMatchSucc(match, (*message)[index], separator);
if (match == NUSM_longMatch || match == NUSM_shortMatch) {
auto sequenceLength = match == NUSM_longMatch ? 4 : 3;
unsigned long long naluEndIndex = index - sequenceLength;
match = NUSM_noMatch;
auto begin = message->begin() + naluStartIndex;
auto end = message->begin() + naluEndIndex + 1;
nalus->push_back(std::make_shared<NalUnit>(begin, end));
naluStartIndex = index + 1;
}
index++;
}
auto begin = message->begin() + naluStartIndex;
auto end = message->end();
nalus->push_back(std::make_shared<NalUnit>(begin, end));
}
return nalus;
}
H264RtpPacketizer::H264RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig,
uint16_t maximumFragmentSize)
: RtpPacketizer(rtpConfig), MediaHandlerRootElement(), maximumFragmentSize(maximumFragmentSize), separator(Separator::Length) {}
H264RtpPacketizer::H264RtpPacketizer(H264RtpPacketizer::Separator separator, std::shared_ptr<RtpPacketizationConfig> rtpConfig,
uint16_t maximumFragmentSize)
: RtpPacketizer(rtpConfig), MediaHandlerRootElement(), maximumFragmentSize(maximumFragmentSize), separator(separator) {}
ChainedOutgoingProduct H264RtpPacketizer::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
ChainedMessagesProduct packets = std::make_shared<std::vector<binary_ptr>>();
for (auto message: *messages) {
auto nalus = splitMessage(message);
auto fragments = nalus->generateFragments(maximumFragmentSize);
if (fragments.size() == 0) {
return ChainedOutgoingProduct();
}
unsigned i = 0;
for (; i < fragments.size() - 1; i++) {
packets->push_back(packetize(fragments[i], false));
}
packets->push_back(packetize(fragments[i], true));
}
return {packets, control};
}
} // namespace rtc

View File

@ -0,0 +1,167 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#if RTC_ENABLE_MEDIA
#include "mediachainablehandler.hpp"
namespace rtc {
MediaChainableHandler::MediaChainableHandler(std::shared_ptr<MediaHandlerRootElement> root): MediaHandler(), root(root), leaf(root) { }
MediaChainableHandler::~MediaChainableHandler() {
leaf->recursiveRemoveChain();
}
bool MediaChainableHandler::sendProduct(ChainedOutgoingProduct product) {
bool result = true;
if (product.control) {
assert(product.control->type == Message::Control);
auto sendResult = send(product.control);
if(!sendResult) {
LOG_DEBUG << "Failed to send control message";
}
result = result && sendResult;
}
if (product.messages) {
auto messages = product.messages;
for (unsigned i = 0; i < messages->size(); i++) {
auto message = messages->at(i);
if (!message) {
LOG_DEBUG << "Invalid message to send " << i + 1 << "/" << messages->size();
}
auto sendResult = send(make_message(*message));
if(!sendResult) {
LOG_DEBUG << "Failed to send message " << i + 1 << "/" << messages->size();
}
result = result && sendResult;
}
}
return result;
}
message_ptr MediaChainableHandler::handleIncomingBinary(message_ptr msg) {
assert(msg->type == Message::Binary);
auto messages = root->split(msg);
auto incoming = leaf->formIncomingBinaryMessage(messages, [this](ChainedOutgoingProduct outgoing) {
return sendProduct(outgoing);
});
if (incoming) {
return root->reduce(incoming);
} else {
return nullptr;
}
}
message_ptr MediaChainableHandler::handleIncomingControl(message_ptr msg) {
assert(msg->type == Message::Control);
auto incoming = leaf->formIncomingControlMessage(msg, [this](ChainedOutgoingProduct outgoing) {
return sendProduct(outgoing);
});
assert(!incoming || incoming->type == Message::Control);
return incoming;
}
message_ptr MediaChainableHandler::handleOutgoingBinary(message_ptr msg) {
assert(msg->type == Message::Binary);
auto messages = make_chained_messages_product(msg);
auto optOutgoing = root->formOutgoingBinaryMessage(ChainedOutgoingProduct(messages));
if (!optOutgoing.has_value()) {
LOG_ERROR << "Generating outgoing message failed";
return nullptr;
}
auto outgoing = optOutgoing.value();
if (outgoing.control) {
if(!send(outgoing.control)) {
LOG_DEBUG << "Failed to send control message";
}
}
auto lastMessage = outgoing.messages->back();
if (!lastMessage) {
LOG_DEBUG << "Invalid message to send";
return nullptr;
}
for (unsigned i = 0; i < outgoing.messages->size() - 1; i++) {
auto message = outgoing.messages->at(i);
if (!message) {
LOG_DEBUG << "Invalid message to send " << i + 1 << "/" << outgoing.messages->size();
}
if(!send(make_message(*message))) {
LOG_DEBUG << "Failed to send message " << i + 1 << "/" << outgoing.messages->size();
}
}
return make_message(*lastMessage);
}
message_ptr MediaChainableHandler::handleOutgoingControl(message_ptr msg) {
assert(msg->type == Message::Control);
auto outgoing = root->formOutgoingControlMessage(msg);
assert(!outgoing || outgoing->type == Message::Control);
if (!outgoing) {
LOG_ERROR << "Generating outgoing control message failed";
return nullptr;
}
return outgoing;
}
message_ptr MediaChainableHandler::outgoing(message_ptr ptr) {
assert(ptr);
if (!ptr) {
LOG_ERROR << "Outgoing message is nullptr, ignoring";
return nullptr;
}
std::lock_guard<std::mutex> guard(inoutMutex);
if (ptr->type == Message::Binary) {
return handleOutgoingBinary(ptr);
} else if (ptr->type == Message::Control) {
return handleOutgoingControl(ptr);
}
return ptr;
}
message_ptr MediaChainableHandler::incoming(message_ptr ptr) {
if (!ptr) {
LOG_ERROR << "Incoming message is nullptr, ignoring";
return nullptr;
}
std::lock_guard<std::mutex> guard(inoutMutex);
if (ptr->type == Message::Binary) {
return handleIncomingBinary(ptr);
} else if (ptr->type == Message::Control) {
return handleIncomingControl(ptr);
}
return ptr;
}
bool MediaChainableHandler::send(message_ptr msg) {
try {
outgoingCallback(std::move(msg));
return true;
} catch (const std::exception &e) {
LOG_DEBUG << "Send in RTCP chain handler failed: " << e.what();
}
return false;
}
void MediaChainableHandler::addToChain(std::shared_ptr<MediaHandlerElement> chainable) {
assert(leaf);
leaf = leaf->chainWith(chainable);
}
} // namespace rtc
#endif /* RTC_ENABLE_MEDIA */

201
src/mediahandlerelement.cpp Normal file
View File

@ -0,0 +1,201 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#if RTC_ENABLE_MEDIA
#include "mediahandlerelement.hpp"
namespace rtc {
ChainedMessagesProduct make_chained_messages_product() {
return std::make_shared<std::vector<binary_ptr>>();
}
ChainedMessagesProduct make_chained_messages_product(message_ptr msg) {
std::vector<binary_ptr> msgs = {msg};
return std::make_shared<std::vector<binary_ptr>>(msgs);
}
ChainedOutgoingProduct::ChainedOutgoingProduct(ChainedMessagesProduct messages, message_ptr control)
: messages(messages), control(control) { }
ChainedIncomingProduct::ChainedIncomingProduct(ChainedMessagesProduct incoming, ChainedMessagesProduct outgoing)
: incoming(incoming), outgoing(outgoing) { }
ChainedIncomingControlProduct::ChainedIncomingControlProduct(message_ptr incoming, std::optional<ChainedOutgoingProduct> outgoing)
: incoming(incoming), outgoing(outgoing) { }
MediaHandlerElement::MediaHandlerElement() { }
void MediaHandlerElement::removeFromChain() {
if (upstream) {
upstream->downstream = downstream;
}
if (downstream) {
downstream->upstream = upstream;
}
upstream = nullptr;
downstream = nullptr;
}
void MediaHandlerElement::recursiveRemoveChain() {
if (downstream) {
// `recursiveRemoveChain` removes last strong reference to downstream element
// we need to keep strong reference to prevent deallocation of downstream element
// during `recursiveRemoveChain`
auto strongDownstreamPtr = downstream;
downstream->recursiveRemoveChain();
}
removeFromChain();
}
std::optional<ChainedOutgoingProduct> MediaHandlerElement::processOutgoingResponse(ChainedOutgoingProduct messages) {
if (messages.messages) {
if (upstream) {
auto msgs = upstream->formOutgoingBinaryMessage(ChainedOutgoingProduct(messages.messages, messages.control));
if (msgs.has_value()) {
return msgs.value();
} else {
LOG_ERROR << "Generating outgoing message failed";
return nullopt;
}
} else {
return messages;
}
} else if (messages.control) {
if (upstream) {
auto control = upstream->formOutgoingControlMessage(messages.control);
if (control) {
return ChainedOutgoingProduct(nullptr, control);
} else {
LOG_ERROR << "Generating outgoing control message failed";
return nullopt;
}
} else {
return messages;
}
} else {
return ChainedOutgoingProduct();
}
}
void MediaHandlerElement::prepareAndSendResponse(std::optional<ChainedOutgoingProduct> outgoing, std::function<bool (ChainedOutgoingProduct)> send) {
if (outgoing.has_value()) {
auto message = outgoing.value();
auto response = processOutgoingResponse(message);
if (response.has_value()) {
if(!send(response.value())) {
LOG_DEBUG << "Send failed";
}
} else {
LOG_DEBUG << "No response to send";
}
}
}
message_ptr MediaHandlerElement::formIncomingControlMessage(message_ptr message, std::function<bool (ChainedOutgoingProduct)> send) {
assert(message);
auto product = processIncomingControlMessage(message);
prepareAndSendResponse(product.outgoing, send);
if (product.incoming) {
if (downstream) {
return downstream->formIncomingControlMessage(product.incoming, send);
} else {
return product.incoming;
}
} else {
return nullptr;
}
}
ChainedMessagesProduct MediaHandlerElement::formIncomingBinaryMessage(ChainedMessagesProduct messages, std::function<bool (ChainedOutgoingProduct)> send) {
assert(messages && !messages->empty());
auto product = processIncomingBinaryMessage(messages);
prepareAndSendResponse(product.outgoing, send);
if (product.incoming) {
if (downstream) {
return downstream->formIncomingBinaryMessage(product.incoming, send);
} else {
return product.incoming;
}
} else {
return nullptr;
}
}
message_ptr MediaHandlerElement::formOutgoingControlMessage(message_ptr message) {
assert(message);
auto newMessage = processOutgoingControlMessage(message);
assert(newMessage);
if(!newMessage) {
LOG_ERROR << "Failed to generate outgoing message";
return nullptr;
}
if (upstream) {
return upstream->formOutgoingControlMessage(newMessage);
} else {
return newMessage;
}
}
std::optional<ChainedOutgoingProduct> MediaHandlerElement::formOutgoingBinaryMessage(ChainedOutgoingProduct product) {
assert(product.messages && !product.messages->empty());
auto newProduct = processOutgoingBinaryMessage(product.messages, product.control);
assert(!product.control || newProduct.control);
assert(newProduct.messages && !newProduct.messages->empty());
if (product.control && !newProduct.control) {
LOG_ERROR << "Outgoing message must not remove control message";
return nullopt;
}
if (!newProduct.messages || newProduct.messages->empty()) {
LOG_ERROR << "Failed to generate message";
return nullopt;
}
if (upstream) {
return upstream->formOutgoingBinaryMessage(newProduct);
} else {
return newProduct;
}
}
ChainedIncomingControlProduct MediaHandlerElement::processIncomingControlMessage(message_ptr messages) {
return {messages};
}
message_ptr MediaHandlerElement::processOutgoingControlMessage(message_ptr messages) {
return messages;
}
ChainedIncomingProduct MediaHandlerElement::processIncomingBinaryMessage(ChainedMessagesProduct messages) {
return {messages};
}
ChainedOutgoingProduct MediaHandlerElement::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
return {messages, control};
}
std::shared_ptr<MediaHandlerElement> MediaHandlerElement::chainWith(std::shared_ptr<MediaHandlerElement> upstream) {
assert(this->upstream == nullptr);
assert(upstream->downstream == nullptr);
this->upstream = upstream;
upstream->downstream = shared_from_this();
return upstream;
}
} // namespace rtc
#endif /* RTC_ENABLE_MEDIA */

View File

@ -0,0 +1,43 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#if RTC_ENABLE_MEDIA
#include "mediahandlerrootelement.hpp"
namespace rtc {
message_ptr MediaHandlerRootElement::reduce(ChainedMessagesProduct messages) {
if (messages && !messages->empty()) {
auto msg_ptr = messages->front();
if (msg_ptr) {
return make_message(*msg_ptr);
} else {
return nullptr;
}
} else {
return nullptr;
}
}
ChainedMessagesProduct MediaHandlerRootElement::split(message_ptr message) {
return make_chained_messages_product(message);
}
} // namespace rtc
#endif /* RTC_ENABLE_MEDIA */

View File

@ -25,7 +25,7 @@ namespace rtc {
NalUnitFragmentA::NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t nri,
uint8_t unitType, binary data)
: NalUnit(data.size() + 2) {
: NalUnit(data.size() + 2) {
setForbiddenBit(forbiddenBit);
setNRI(nri);
fragmentIndicator()->setUnitType(NalUnitFragmentA::nal_type_fu_A);
@ -34,23 +34,23 @@ NalUnitFragmentA::NalUnitFragmentA(FragmentType type, bool forbiddenBit, uint8_t
copy(data.begin(), data.end(), begin() + 2);
}
std::vector<NalUnitFragmentA> NalUnitFragmentA::fragmentsFrom(NalUnit nalu,
std::vector<std::shared_ptr<NalUnitFragmentA>> NalUnitFragmentA::fragmentsFrom(std::shared_ptr<NalUnit> nalu,
uint16_t maximumFragmentSize) {
assert(nalu.size() > maximumFragmentSize);
if (nalu.size() <= maximumFragmentSize) {
assert(nalu->size() > maximumFragmentSize);
if (nalu->size() <= maximumFragmentSize) {
// we need to change `maximum_fragment_size` to have at least two fragments
maximumFragmentSize = nalu.size() / 2;
maximumFragmentSize = nalu->size() / 2;
}
auto fragments_count = ceil(double(nalu.size()) / maximumFragmentSize);
maximumFragmentSize = ceil(nalu.size() / fragments_count);
auto fragments_count = ceil(double(nalu->size()) / maximumFragmentSize);
maximumFragmentSize = ceil(nalu->size() / fragments_count);
// 2 bytes for FU indicator and FU header
maximumFragmentSize -= 2;
auto f = nalu.forbiddenBit();
uint8_t nri = nalu.nri() & 0x03;
uint8_t naluType = nalu.unitType() & 0x1F;
auto payload = nalu.payload();
vector<NalUnitFragmentA> result{};
auto f = nalu->forbiddenBit();
uint8_t nri = nalu->nri() & 0x03;
uint8_t naluType = nalu->unitType() & 0x1F;
auto payload = nalu->payload();
vector<std::shared_ptr<NalUnitFragmentA>> result{};
uint64_t offset = 0;
while (offset < payload.size()) {
vector<byte> fragmentData;
@ -66,7 +66,7 @@ std::vector<NalUnitFragmentA> NalUnitFragmentA::fragmentsFrom(NalUnit nalu,
fragmentType = FragmentType::End;
}
fragmentData = {payload.begin() + offset, payload.begin() + offset + maximumFragmentSize};
NalUnitFragmentA fragment{fragmentType, f, nri, naluType, fragmentData};
auto fragment = std::make_shared<NalUnitFragmentA>(fragmentType, f, nri, naluType, fragmentData);
result.push_back(fragment);
offset += maximumFragmentSize;
}
@ -90,11 +90,11 @@ void NalUnitFragmentA::setFragmentType(FragmentType type) {
}
}
std::vector<binary> NalUnits::generateFragments(uint16_t maximumFragmentSize) {
vector<binary> result{};
std::vector<std::shared_ptr<binary>> NalUnits::generateFragments(uint16_t maximumFragmentSize) {
vector<std::shared_ptr<binary>> result{};
for (auto nalu : *this) {
if (nalu.size() > maximumFragmentSize) {
std::vector<NalUnitFragmentA> fragments =
if (nalu->size() > maximumFragmentSize) {
std::vector<std::shared_ptr<NalUnitFragmentA>> fragments =
NalUnitFragmentA::fragmentsFrom(nalu, maximumFragmentSize);
result.insert(result.end(), fragments.begin(), fragments.end());
} else {

View File

@ -23,23 +23,7 @@
namespace rtc {
OpusPacketizationHandler::OpusPacketizationHandler(std::shared_ptr<OpusRtpPacketizer> packetizer)
: RtcpHandler(), RtcpSenderReporter(packetizer->rtpConfig), packetizer(packetizer) {
senderReportOutgoingCallback = [this](message_ptr msg) { outgoingCallback(msg); };
}
message_ptr OpusPacketizationHandler::incoming(message_ptr ptr) { return ptr; }
message_ptr OpusPacketizationHandler::outgoing(message_ptr ptr) {
if (ptr->type == Message::Binary) {
return withStatsRecording<message_ptr>(
[this, ptr](std::function<void(message_ptr)> addToReport) {
auto rtp = packetizer->packetize(*ptr, false);
addToReport(rtp);
return rtp;
});
}
return ptr;
}
: MediaChainableHandler(packetizer) { }
} // namespace rtc

View File

@ -23,13 +23,22 @@
namespace rtc {
OpusRtpPacketizer::OpusRtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
: RtpPacketizer(rtpConfig) {}
: RtpPacketizer(rtpConfig), MediaHandlerRootElement() {}
message_ptr OpusRtpPacketizer::packetize(binary payload, bool setMark) {
binary_ptr OpusRtpPacketizer::packetize(binary_ptr payload, [[maybe_unused]] bool setMark) {
assert(!setMark);
return RtpPacketizer::packetize(payload, false);
}
ChainedOutgoingProduct OpusRtpPacketizer::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
ChainedMessagesProduct packets = make_chained_messages_product();
packets->reserve(messages->size());
for (auto message: *messages) {
packets->push_back(packetize(message, false));
}
return {packets, control};
}
} // namespace rtc
#endif /* RTC_ENABLE_MEDIA */

View File

@ -22,8 +22,8 @@
#include "include.hpp"
#include "logcounter.hpp"
#include "processor.hpp"
#include "threadpool.hpp"
#include "rtp.hpp"
#include "threadpool.hpp"
#include "dtlstransport.hpp"
#include "icetransport.hpp"
@ -75,6 +75,17 @@ PeerConnection::PeerConnection(const Configuration &config)
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
throw std::invalid_argument("Invalid port range");
if (config.mtu) {
if (*config.mtu < 576) // Min MTU for IPv4
throw std::invalid_argument("Invalid MTU value");
if (*config.mtu > 1500) { // Standard Ethernet
PLOG_WARNING << "MTU set to " << *config.mtu;
} else {
PLOG_VERBOSE << "MTU set to " << *config.mtu;
}
}
}
PeerConnection::~PeerConnection() {
@ -515,7 +526,7 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
// DTLS-SRTP
transport = std::make_shared<DtlsSrtpTransport>(
lower, certificate, verifierCallback,
lower, certificate, mConfig.mtu, verifierCallback,
weak_bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback);
#else
PLOG_WARNING << "Ignoring media support (not compiled with media support)";
@ -524,8 +535,8 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
if (!transport) {
// DTLS only
transport = std::make_shared<DtlsTransport>(lower, certificate, verifierCallback,
stateChangeCallback);
transport = std::make_shared<DtlsTransport>(lower, certificate, mConfig.mtu,
verifierCallback, stateChangeCallback);
}
std::atomic_store(&mDtlsTransport, transport);
@ -557,7 +568,7 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
auto lower = std::atomic_load(&mDtlsTransport);
auto transport = std::make_shared<SctpTransport>(
lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
lower, sctpPort, mConfig.mtu, weak_bind(&PeerConnection::forwardMessage, this, _1),
weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
[this, weak_this = weak_from_this()](SctpTransport::State state) {
auto shared_this = weak_this.lock();
@ -663,10 +674,12 @@ void PeerConnection::forwardMessage(message_ptr message) {
if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
stream % 2 == remoteParity) {
channel = std::make_shared<NegociatedDataChannel>(shared_from_this(), sctpTransport,
stream);
channel =
std::make_shared<NegotiatedDataChannel>(shared_from_this(), sctpTransport, stream);
channel->onOpen(weak_bind(&PeerConnection::triggerDataChannel, this,
weak_ptr<DataChannel>{channel}));
std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
mDataChannels.emplace(stream, channel);
} else {
// Invalid, close the DataChannel
@ -833,7 +846,7 @@ shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role rol
init.negotiated
? std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
std::move(init.protocol), std::move(init.reliability))
: std::make_shared<NegociatedDataChannel>(shared_from_this(), stream, std::move(label),
: std::make_shared<NegotiatedDataChannel>(shared_from_this(), stream, std::move(label),
std::move(init.protocol),
std::move(init.reliability));
mDataChannels.emplace(std::make_pair(stream, channel));

View File

@ -25,39 +25,18 @@ Processor::Processor(size_t limit) : mTasks(limit) {}
Processor::~Processor() { join(); }
void Processor::join() {
// We need to detect situations where the thread pool does not execute a pending task at exit
std::optional<unsigned int> counter;
while (true) {
std::shared_future<void> pending;
{
std::unique_lock lock(mMutex);
if (!mPending // no pending task
|| (counter && *counter == mCounter)) { // or no scheduled task after the last one
// Processing is stopped, clear everything and return
mPending.reset();
while (!mTasks.empty())
mTasks.pop();
return;
}
pending = *mPending;
counter = mCounter;
}
// Wait for the pending task
pending.wait();
}
mCondition.wait(lock, [this]() { return !mPending && mTasks.empty(); });
}
void Processor::schedule() {
std::unique_lock lock(mMutex);
if (auto next = mTasks.tryPop()) {
mPending = ThreadPool::Instance().enqueue(std::move(*next)).share();
++mCounter;
ThreadPool::Instance().enqueue(std::move(*next));
} else {
mPending.reset(); // No more tasks
// No more tasks
mPending = false;
mCondition.notify_all();
}
}

View File

@ -54,10 +54,10 @@ protected:
const init_token mInitToken = Init::Token();
Queue<std::function<void()>> mTasks;
std::optional<std::shared_future<void>> mPending; // future of the pending task
unsigned int mCounter = 0; // Number of scheduled tasks
bool mPending = false; // true iff a task is pending in the thread pool
mutable std::mutex mMutex;
std::condition_variable mCondition;
};
template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args) {
@ -65,12 +65,12 @@ template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args)
auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task = [this, bound = std::move(bound)]() mutable {
scope_guard guard(std::bind(&Processor::schedule, this)); // chain the next task
bound();
return bound();
};
if (!mPending) {
mPending = ThreadPool::Instance().enqueue(std::move(task)).share();
++mCounter;
ThreadPool::Instance().enqueue(std::move(task));
mPending = true;
} else {
mTasks.push(std::move(task));
}

118
src/rtcpnackresponder.cpp Normal file
View File

@ -0,0 +1,118 @@
/**
* libdatachannel streamer example
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#if RTC_ENABLE_MEDIA
#include "rtcpnackresponder.hpp"
namespace rtc {
RtcpNackResponder::Storage::Element::Element(binary_ptr packet, uint16_t sequenceNumber, std::shared_ptr<Element> next)
: packet(packet), sequenceNumber(sequenceNumber), next(next) { }
unsigned RtcpNackResponder::Storage::size() { return storage.size(); }
RtcpNackResponder::Storage::Storage(unsigned _maximumSize): maximumSize(_maximumSize) {
assert(maximumSize > 0);
storage.reserve(maximumSize);
}
std::optional<binary_ptr> RtcpNackResponder::Storage::get(uint16_t sequenceNumber) {
auto position = storage.find(sequenceNumber);
return position != storage.end() ? std::make_optional(storage.at(sequenceNumber)->packet) : nullopt;
}
void RtcpNackResponder::Storage::store(binary_ptr packet) {
if (!packet || packet->size() < 12) {
return;
}
auto rtp = reinterpret_cast<RTP *>(packet->data());
auto sequenceNumber = rtp->seqNumber();
assert((storage.empty() && !oldest && !newest) || (!storage.empty() && oldest && newest));
if (size() == 0) {
newest = std::make_shared<Element>(packet, sequenceNumber);
oldest = newest;
} else {
auto current = std::make_shared<Element>(packet, sequenceNumber);
newest->next = current;
newest = current;
}
storage.emplace(sequenceNumber, newest);
if (size() > maximumSize) {
assert(oldest);
if (oldest) {
storage.erase(oldest->sequenceNumber);
oldest = oldest->next;
}
}
}
RtcpNackResponder::RtcpNackResponder(unsigned maxStoredPacketCount)
: MediaHandlerElement(), storage(std::make_shared<Storage>(maxStoredPacketCount)) { }
ChainedIncomingControlProduct RtcpNackResponder::processIncomingControlMessage(message_ptr message) {
std::optional<ChainedOutgoingProduct> optPackets = ChainedOutgoingProduct(nullptr);
auto packets = make_chained_messages_product();
unsigned int i = 0;
while (i < message->size()) {
auto nack = reinterpret_cast<RTCP_NACK *>(message->data() + i);
i += nack->header.header.lengthInBytes();
// check if rtcp is nack
if (nack->header.header.payloadType() != 205 || nack->header.header.reportCount() != 1) {
continue;
}
auto fieldsCount = nack->getSeqNoCount();
std::vector<uint16_t> missingSequenceNumbers{};
for(unsigned int i = 0; i < fieldsCount; i++) {
auto field = nack->parts[i];
auto newMissingSeqenceNumbers = field.getSequenceNumbers();
missingSequenceNumbers.insert(missingSequenceNumbers.end(), newMissingSeqenceNumbers.begin(), newMissingSeqenceNumbers.end());
}
packets->reserve(packets->size() + missingSequenceNumbers.size());
for (auto sequenceNumber: missingSequenceNumbers) {
auto optPacket = storage->get(sequenceNumber);
if (optPacket.has_value()) {
auto packet = optPacket.value();
packets->push_back(packet);
}
}
}
if (!packets->empty()) {
return {message, ChainedOutgoingProduct(packets)};
} else {
return {message, nullopt};
}
}
ChainedOutgoingProduct RtcpNackResponder::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
for (auto message: *messages) {
storage->store(message);
}
return {messages, control};
}
} // namespace rtc
#endif /* RTC_ENABLE_MEDIA */

View File

@ -1,5 +1,4 @@
/*
* libdatachannel streamer example
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
@ -18,37 +17,49 @@
#if RTC_ENABLE_MEDIA
#include "rtcpsenderreporter.hpp"
#include "rtcpsrreporter.hpp"
namespace rtc {
void RtcpSenderReporter::startRecording() {
ChainedOutgoingProduct RtcpSrReporter::processOutgoingBinaryMessage(ChainedMessagesProduct messages, message_ptr control) {
if (needsToReport) {
auto timestamp = rtpConfig->timestamp;
auto sr = getSenderReport(timestamp);
if (control) {
control->insert(control->end(), sr->begin(), sr->end());
} else {
control = sr;
}
needsToReport = false;
}
for (auto message: *messages) {
auto rtp = reinterpret_cast<RTP *>(message->data());
addToReport(rtp, message->size());
}
return {messages, control};
}
void RtcpSrReporter::startRecording() {
_previousReportedTimestamp = rtpConfig->timestamp;
timeOffset = rtpConfig->startTime_s - rtpConfig->timestampToSeconds(rtpConfig->timestamp);
}
void RtcpSenderReporter::sendReport(uint32_t timestamp) {
auto sr = getSenderReport(timestamp);
_previousReportedTimestamp = timestamp;
senderReportOutgoingCallback(move(sr));
}
void RtcpSenderReporter::addToReport(RTP *rtp, uint32_t rtpSize) {
void RtcpSrReporter::addToReport(RTP *rtp, uint32_t rtpSize) {
packetCount += 1;
assert(!rtp->padding());
payloadOctets += rtpSize - rtp->getSize();
}
RtcpSenderReporter::RtcpSenderReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
: rtpConfig(rtpConfig) {}
RtcpSrReporter::RtcpSrReporter(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
: MediaHandlerElement(), rtpConfig(rtpConfig) {}
uint64_t RtcpSenderReporter::secondsToNTP(double seconds) {
uint64_t RtcpSrReporter::secondsToNTP(double seconds) {
return std::round(seconds * double(uint64_t(1) << 32));
}
void RtcpSenderReporter::setNeedsToReport() { needsToReport = true; }
void RtcpSrReporter::setNeedsToReport() { needsToReport = true; }
message_ptr RtcpSenderReporter::getSenderReport(uint32_t timestamp) {
message_ptr RtcpSrReporter::getSenderReport(uint32_t timestamp) {
auto srSize = RTCP_SR::size(0);
auto msg = make_message(srSize + RTCP_SDES::size({{uint8_t(rtpConfig->cname.size())}}),
Message::Type::Control);
@ -68,6 +79,9 @@ message_ptr RtcpSenderReporter::getSenderReport(uint32_t timestamp) {
item->type = 1;
item->setText(rtpConfig->cname);
sdes->preparePacket(1);
_previousReportedTimestamp = timestamp;
return msg;
}

View File

@ -25,8 +25,8 @@ namespace rtc {
RtpPacketizer::RtpPacketizer(std::shared_ptr<RtpPacketizationConfig> rtpConfig)
: rtpConfig(rtpConfig) {}
message_ptr RtpPacketizer::packetize(binary payload, bool setMark) {
auto msg = make_message(rtpHeaderSize + payload.size());
binary_ptr RtpPacketizer::packetize(std::shared_ptr<binary> payload, bool setMark) {
auto msg = std::make_shared<binary>(rtpHeaderSize + payload->size());
auto *rtp = (RTP *)msg->data();
rtp->setPayloadType(rtpConfig->payloadType);
// increase sequence number
@ -37,7 +37,7 @@ message_ptr RtpPacketizer::packetize(binary payload, bool setMark) {
rtp->setMarker(true);
}
rtp->preparePacket();
copy(payload.begin(), payload.end(), msg->begin() + rtpHeaderSize);
memcpy(msg->data() + rtpHeaderSize, payload->data(), payload->size());
return msg;
}

View File

@ -17,6 +17,7 @@
*/
#include "sctptransport.hpp"
#include "dtlstransport.hpp"
#include "logcounter.hpp"
#include <chrono>
@ -27,8 +28,7 @@
// The IETF draft says:
// SCTP MUST support performing Path MTU discovery without relying on ICMP or ICMPv6 as specified in
// [RFC4821] using probing messages specified in [RFC4820]. The initial Path MTU at the IP layer
// SHOULD NOT exceed 1200 bytes for IPv4 and 1280 for IPv6.
// [RFC4821] using probing messages specified in [RFC4820].
// See https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-5
//
// However, usrsctp does not implement Path MTU discovery, so we need to disable it for now.
@ -54,8 +54,6 @@
using namespace std::chrono_literals;
using namespace std::chrono;
using std::shared_ptr;
namespace rtc {
static LogCounter COUNTER_UNKNOWN_PPID(plog::warning,
@ -102,7 +100,8 @@ void SctpTransport::Cleanup() {
}
SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
message_callback recvCallback, amount_callback bufferedAmountCallback,
std::optional<size_t> mtu, message_callback recvCallback,
amount_callback bufferedAmountCallback,
state_callback stateChangeCallback)
: Transport(lower, std::move(stateChangeCallback)), mPort(port), mPendingRecvCount(0),
mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
@ -180,13 +179,24 @@ SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
// 1200 bytes.
// See https://tools.ietf.org/html/rfc8261#section-5
#if USE_PMTUD
if (!mtu.has_value()) {
#else
if (false) {
#endif
// Enable SCTP path MTU discovery
spp.spp_flags |= SPP_PMTUD_ENABLE;
#else
PLOG_VERBOSE << "Path MTU discovery enabled";
} else {
// Fall back to a safe MTU value.
spp.spp_flags |= SPP_PMTUD_DISABLE;
spp.spp_pathmtu = 1200;
#endif
// The MTU value provided specifies the space available for chunks in the
// packet, so we also subtract the SCTP header size.
size_t pmtu = mtu.value_or(DEFAULT_IPV4_MTU + 20) - 12 - 37 - 8 - 40; // SCTP/DTLS/UDP/IPv6
spp.spp_pathmtu = uint32_t(pmtu);
PLOG_VERBOSE << "Path MTU discovery disabled, SCTP MTU set to " << pmtu;
}
if (usrsctp_setsockopt(mSock, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &spp, sizeof(spp)))
throw std::runtime_error("Could not set socket option SCTP_PEER_ADDR_PARAMS, errno=" +
std::to_string(errno));
@ -349,6 +359,10 @@ void SctpTransport::incoming(message_ptr message) {
}
PLOG_VERBOSE << "Incoming size=" << message->size();
// TODO: There seems to be a possible data race between usrsctp_sendv() and usrsctp_conninput()
// As a mitigation, lock the send mutex before calling usrsctp_conninput()
std::lock_guard lock(mSendMutex);
usrsctp_conninput(this, message->data(), message->size(), 0);
}

View File

@ -43,8 +43,9 @@ public:
using amount_callback = std::function<void(uint16_t streamId, size_t amount)>;
SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, message_callback recvCallback,
amount_callback bufferedAmountCallback, state_callback stateChangeCallback);
SctpTransport(std::shared_ptr<Transport> lower, uint16_t port, std::optional<size_t> mtu,
message_callback recvCallback, amount_callback bufferedAmountCallback,
state_callback stateChangeCallback);
~SctpTransport();
void start() override;

View File

@ -63,7 +63,7 @@ int SelectInterrupter::prepare(fd_set &readfds, [[maybe_unused]] fd_set &writefd
return SOCKET_TO_INT(mDummySock) + 1;
#else
char dummy;
::read(mPipeIn, &dummy, 1);
(void)::read(mPipeIn, &dummy, 1);
FD_SET(mPipeIn, &readfds);
return mPipeIn + 1;
#endif
@ -78,7 +78,7 @@ void SelectInterrupter::interrupt() {
}
#else
char dummy = 0;
::write(mPipeOut, &dummy, 1);
(void)::write(mPipeOut, &dummy, 1);
#endif
}

View File

@ -21,10 +21,10 @@
#include <cstdlib>
namespace {
void joinThreadPoolInstance() {
rtc::ThreadPool::Instance().join();
}
}
void joinThreadPoolInstance() { rtc::ThreadPool::Instance().join(); }
} // namespace
namespace rtc {
@ -33,9 +33,7 @@ ThreadPool &ThreadPool::Instance() {
return *instance;
}
ThreadPool::ThreadPool() {
std::atexit(joinThreadPoolInstance);
}
ThreadPool::ThreadPool() { std::atexit(joinThreadPoolInstance); }
ThreadPool::~ThreadPool() {}
@ -46,20 +44,25 @@ int ThreadPool::count() const {
void ThreadPool::spawn(int count) {
std::unique_lock lock(mWorkersMutex);
mJoining = false;
while (count-- > 0)
mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
}
void ThreadPool::join() {
std::unique_lock lock(mWorkersMutex);
{
std::unique_lock lock(mMutex);
mWaitingCondition.wait(lock, [&]() { return mWaitingWorkers == int(mWorkers.size()); });
mJoining = true;
mCondition.notify_all();
mTasksCondition.notify_all();
}
std::unique_lock lock(mWorkersMutex);
for (auto &w : mWorkers)
w.join();
mWorkers.clear();
mJoining = false;
}
void ThreadPool::run() {
@ -77,7 +80,7 @@ bool ThreadPool::runOne() {
std::function<void()> ThreadPool::dequeue() {
std::unique_lock lock(mMutex);
while (true) {
while (!mJoining) {
if (!mTasks.empty()) {
if (mTasks.top().time <= clock::now()) {
auto func = std::move(mTasks.top().func);
@ -85,21 +88,18 @@ std::function<void()> ThreadPool::dequeue() {
return func;
}
if (mJoining)
break;
++mWaitingWorkers;
mWaitingCondition.notify_all();
mTasksCondition.wait_until(lock, mTasks.top().time);
mCondition.wait_until(lock, mTasks.top().time);
} else {
if (mJoining)
break;
mCondition.wait(lock);
}
++mWaitingWorkers;
mWaitingCondition.notify_all();
mTasksCondition.wait(lock);
}
while (!mTasks.empty())
mTasks.pop();
--mWaitingWorkers;
}
return nullptr;
}

View File

@ -72,6 +72,7 @@ protected:
std::function<void()> dequeue(); // returns null function if joining
std::vector<std::thread> mWorkers;
int mWaitingWorkers = 0;
std::atomic<bool> mJoining = false;
struct Task {
@ -82,8 +83,8 @@ protected:
};
std::priority_queue<Task, std::deque<Task>, std::greater<Task>> mTasks;
std::condition_variable mTasksCondition, mWaitingCondition;
mutable std::mutex mMutex, mWorkersMutex;
std::condition_variable mCondition;
};
template <class F, class... Args>
@ -100,16 +101,8 @@ auto ThreadPool::schedule(clock::duration delay, F &&f, Args &&...args)
template <class F, class... Args>
auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args)
-> invoke_future_t<F, Args...> {
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
std::unique_lock lock(mMutex);
if (mJoining) {
std::promise<R> promise;
std::future<R> result = promise.get_future();
promise.set_exception(std::make_exception_ptr(
std::runtime_error("Scheduled a task while joining the thread pool")));
return result;
}
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task = std::make_shared<std::packaged_task<R()>>([bound = std::move(bound)]() mutable {
try {
@ -122,7 +115,7 @@ auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args)
std::future<R> result = task->get_future();
mTasks.push({time, [task = std::move(task), token = Init::Token()]() { return (*task)(); }});
mCondition.notify_one();
mTasksCondition.notify_one();
return result;
}

View File

@ -35,11 +35,23 @@ using std::weak_ptr;
Track::Track(Description::Media description)
: mMediaDescription(std::move(description)), mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {}
string Track::mid() const { return mMediaDescription.mid(); }
string Track::mid() const {
std::shared_lock lock(mMutex);
return mMediaDescription.mid();
}
Description::Media Track::description() const { return mMediaDescription; }
Description::Media Track::description() const {
std::shared_lock lock(mMutex);
return mMediaDescription;
}
Description::Direction Track::direction() const {
std::shared_lock lock(mMutex);
return mMediaDescription.direction();
}
void Track::setDescription(Description::Media description) {
std::unique_lock lock(mMutex);
if (description.mid() != mMediaDescription.mid())
throw std::logic_error("Media description mid does not match track mid");
@ -48,17 +60,17 @@ void Track::setDescription(Description::Media description) {
void Track::close() {
mIsClosed = true;
resetCallbacks();
setRtcpHandler(nullptr);
resetCallbacks();
}
bool Track::send(message_variant data) {
if (mIsClosed)
throw std::runtime_error("Track is closed");
auto direction = mMediaDescription.direction();
if ((direction == Description::Direction::RecvOnly ||
direction == Description::Direction::Inactive)) {
auto dir = direction();
if ((dir == Description::Direction::RecvOnly || dir == Description::Direction::Inactive)) {
COUNTER_MEDIA_BAD_DIRECTION++;
return false;
}
@ -92,6 +104,7 @@ std::optional<message_variant> Track::peek() {
bool Track::isOpen(void) const {
#if RTC_ENABLE_MEDIA
std::shared_lock lock(mMutex);
return !mIsClosed && mDtlsSrtpTransport.lock();
#else
return !mIsClosed;
@ -108,7 +121,11 @@ size_t Track::availableAmount() const { return mRecvQueue.amount(); }
#if RTC_ENABLE_MEDIA
void Track::open(shared_ptr<DtlsSrtpTransport> transport) {
{
std::lock_guard lock(mMutex);
mDtlsSrtpTransport = transport;
}
triggerOpen();
}
#endif
@ -117,9 +134,8 @@ void Track::incoming(message_ptr message) {
if (!message)
return;
auto direction = mMediaDescription.direction();
if ((direction == Description::Direction::SendOnly ||
direction == Description::Direction::Inactive) &&
auto dir = direction();
if ((dir == Description::Direction::SendOnly || dir == Description::Direction::Inactive) &&
message->type != Message::Control) {
COUNTER_MEDIA_BAD_DIRECTION++;
return;
@ -142,10 +158,13 @@ void Track::incoming(message_ptr message) {
}
bool Track::outgoing([[maybe_unused]] message_ptr message) {
#if RTC_ENABLE_MEDIA
auto transport = mDtlsSrtpTransport.lock();
#if RTC_ENABLfiE_MEDIA
std::shared_ptr<DtlsSrtpTransport> transport;
{
std::shared_lock lock(mMutex);
transport = mDtlsSrtpTransport.lock();
if (!transport)
throw std::runtime_error("Track transport is not open");
throw std::runtime_error("Track is closed");
// Set recommended medium-priority DSCP value
// See https://tools.ietf.org/html/draft-ietf-tsvwg-rtcweb-qos-18
@ -153,6 +172,7 @@ bool Track::outgoing([[maybe_unused]] message_ptr message) {
message->dscp = 46; // EF: Expedited Forwarding
else
message->dscp = 36; // AF42: Assured Forwarding class 4, medium drop probability
}
return transport->sendMedia(message);
#else
@ -161,25 +181,24 @@ bool Track::outgoing([[maybe_unused]] message_ptr message) {
#endif
}
void Track::setRtcpHandler(std::shared_ptr<RtcpHandler> handler) {
std::unique_lock lock(mRtcpHandlerMutex);
mRtcpHandler = std::move(handler);
if (mRtcpHandler) {
auto copy = mRtcpHandler;
lock.unlock();
copy->onOutgoing(std::bind(&Track::outgoing, this, std::placeholders::_1));
void Track::setRtcpHandler(std::shared_ptr<MediaHandler> handler) {
{
std::unique_lock lock(mMutex);
mRtcpHandler = handler;
}
handler->onOutgoing(std::bind(&Track::outgoing, this, std::placeholders::_1));
}
bool Track::requestKeyframe() {
if (auto handler = getRtcpHandler()) {
if (auto handler = getRtcpHandler())
return handler->requestKeyframe();
}
return false;
}
std::shared_ptr<RtcpHandler> Track::getRtcpHandler() {
std::shared_lock lock(mRtcpHandlerMutex);
std::shared_ptr<MediaHandler> Track::getRtcpHandler() {
std::shared_lock lock(mMutex);
return mRtcpHandler;
}

View File

@ -40,11 +40,13 @@ size_t benchmark(milliseconds duration) {
Configuration config1;
// config1.iceServers.emplace_back("stun:stun.l.google.com:19302");
// config1.mtu = 1500;
auto pc1 = std::make_shared<PeerConnection>(config1);
Configuration config2;
// config2.iceServers.emplace_back("stun:stun.l.google.com:19302");
// config2.mtu = 1500;
auto pc2 = std::make_shared<PeerConnection>(config2);

View File

@ -235,24 +235,48 @@ int test_capi_connectivity_main() {
char buffer[BUFFER_SIZE];
char buffer2[BUFFER_SIZE];
if (rtcGetLocalDescriptionType(peer1->pc, buffer, BUFFER_SIZE) < 0) {
fprintf(stderr, "rtcGetLocalDescriptionType failed\n");
goto error;
}
printf("Local description type 1: %s\n", buffer);
if (rtcGetLocalDescription(peer1->pc, buffer, BUFFER_SIZE) < 0) {
fprintf(stderr, "rtcGetLocalDescription failed\n");
goto error;
}
printf("Local description 1: %s\n", buffer);
if (rtcGetRemoteDescriptionType(peer1->pc, buffer, BUFFER_SIZE) < 0) {
fprintf(stderr, "rtcGetRemoteDescriptionType failed\n");
goto error;
}
printf("Remote description type 1: %s\n", buffer);
if (rtcGetRemoteDescription(peer1->pc, buffer, BUFFER_SIZE) < 0) {
fprintf(stderr, "rtcGetRemoteDescription failed\n");
goto error;
}
printf("Remote description 1: %s\n", buffer);
if (rtcGetLocalDescriptionType(peer2->pc, buffer, BUFFER_SIZE) < 0) {
fprintf(stderr, "rtcGetLocalDescriptionType failed\n");
goto error;
}
printf("Local description type 2: %s\n", buffer);
if (rtcGetLocalDescription(peer2->pc, buffer, BUFFER_SIZE) < 0) {
fprintf(stderr, "rtcGetLocalDescription failed\n");
goto error;
}
printf("Local description 2: %s\n", buffer);
if (rtcGetRemoteDescriptionType(peer2->pc, buffer, BUFFER_SIZE) < 0) {
fprintf(stderr, "rtcGetRemoteDescriptionType failed\n");
goto error;
}
printf("Remote description type 2: %s\n", buffer);
if (rtcGetRemoteDescription(peer2->pc, buffer, BUFFER_SIZE) < 0) {
fprintf(stderr, "rtcGetRemoteDescription failed\n");
goto error;

View File

@ -36,6 +36,8 @@ void test_connectivity() {
// STUN server example (not necessary to connect locally)
// Please do not use outside of libdatachannel tests
config1.iceServers.emplace_back("stun:stun.ageneau.net:3478");
// Custom MTU example
config1.mtu = 1500;
auto pc1 = std::make_shared<PeerConnection>(config1);
@ -43,6 +45,8 @@ void test_connectivity() {
// STUN server example (not necessary to connect locally)
// Please do not use outside of libdatachannel tests
config2.iceServers.emplace_back("stun:stun.ageneau.net:3478");
// Custom MTU example
config2.mtu = 1500;
// Port range example
config2.portRangeBegin = 5000;
config2.portRangeEnd = 6000;
@ -221,7 +225,7 @@ void test_connectivity() {
auto negotiated2 = pc2->createDataChannel("negoctated", init);
if (!negotiated1->isOpen() || !negotiated2->isOpen())
throw runtime_error("Negociated DataChannel is not open");
throw runtime_error("Negotiated DataChannel is not open");
std::atomic<bool> received = false;
negotiated2->onMessage([&received](const variant<binary, string> &message) {
@ -239,7 +243,7 @@ void test_connectivity() {
this_thread::sleep_for(1s);
if (!received)
throw runtime_error("Negociated DataChannel failed");
throw runtime_error("Negotiated DataChannel failed");
// Delay close of peer 2 to check closing works properly
pc1->close();

View File

@ -232,7 +232,7 @@ void test_turn_connectivity() {
auto negotiated2 = pc2->createDataChannel("negoctated", init);
if (!negotiated1->isOpen() || !negotiated2->isOpen())
throw runtime_error("Negociated DataChannel is not open");
throw runtime_error("Negotiated DataChannel is not open");
std::atomic<bool> received = false;
negotiated2->onMessage([&received](const variant<binary, string> &message) {
@ -250,7 +250,7 @@ void test_turn_connectivity() {
this_thread::sleep_for(1s);
if (!received)
throw runtime_error("Negociated DataChannel failed");
throw runtime_error("Negotiated DataChannel failed");
// Delay close of peer 2 to check closing works properly
pc1->close();