Add example project for h264 and opus streaming

This commit is contained in:
Filip Klembara
2020-12-10 18:45:21 +01:00
parent 1d27f5b876
commit d9aa1818b2
18 changed files with 1512 additions and 0 deletions

View File

@ -310,6 +310,7 @@ if(NOT NO_EXAMPLES AND NOT CMAKE_SYSTEM_NAME STREQUAL "WindowsStore")
add_subdirectory(examples/client)
add_subdirectory(examples/media)
add_subdirectory(examples/sfu-media)
add_subdirectory(examples/streamer)
add_subdirectory(examples/copy-paste)
add_subdirectory(examples/copy-paste-capi)
endif()

View File

@ -0,0 +1,22 @@
cmake_minimum_required(VERSION 3.7)
if(POLICY CMP0079)
cmake_policy(SET CMP0079 NEW)
endif()
if(WIN32)
add_executable(streamer main.cpp dispatchqueue.cpp dispatchqueue.hpp h264fileparser.cpp h264fileparser.hpp helpers.cpp helpers.hpp opusfileparser.cpp opusfileparser.hpp fileparser.cpp fileparser.hpp stream.cpp stream.hpp)
target_compile_definitions(streamer PUBLIC STATIC_GETOPT)
else()
add_executable(streamer main.cpp dispatchqueue.cpp dispatchqueue.hpp h264fileparser.cpp h264fileparser.hpp helpers.cpp helpers.hpp opusfileparser.cpp opusfileparser.hpp fileparser.cpp fileparser.hpp stream.cpp stream.hpp)
endif()
set_target_properties(streamer PROPERTIES
CXX_STANDARD 17
OUTPUT_NAME streamer)
if(WIN32)
target_link_libraries(streamer datachannel-static) # DLL exports only the C API
else()
target_link_libraries(streamer datachannel)
endif()
target_link_libraries(streamer datachannel nlohmann_json)

View File

@ -0,0 +1,32 @@
# Streaming H264 and opus
This example streams H264 and opus<sup id="a1">[1](#f1)</sup> samples to the connected browser client.
## Starting signaling server
```sh
$ python3 ../signaling-server-python/signaling-server.py
```
## Starting php
```sh
$ php -S 127.0.0.1:8080
```
Now you can open demo at [127.0.0.1:8080](127.0.0.1:8080).
## Arguments
- `-a` Directory with OPUS samples (default: *../../../../examples/streamer/samples/opus/*).
- `-b` Directory with H264 samples (default: *../../../../examples/streamer/samples/h264/*).
- `-d` Signaling server IP address (default: 127.0.0.1).
- `-p` Signaling server port (default: 8000).
- `-v` Enable debug logs.
- `-h` Print this help and exit.
## Generating H264 and Opus samples
You can generate H264 and Opus sample with *samples/generate_h264.py* and *samples/generate_opus.py* respectively. This require ffmpeg, python3 and kaitaistruct library to be installed. Use `-h`/`--help` to learn more about arguments.
<b id="f1">1</b> Opus samples are generated from music downloaded at [bensound](https://www.bensound.com). [](#a1)

209
examples/streamer/client.js Normal file
View File

@ -0,0 +1,209 @@
/** @type {RTCPeerConnection} */
let rtc;
const iceConnectionLog = document.getElementById('ice-connection-state'),
iceGatheringLog = document.getElementById('ice-gathering-state'),
signalingLog = document.getElementById('signaling-state'),
dataChannelLog = document.getElementById('data-channel');
function randomString(len) {
const charSet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
let randomString = '';
for (let i = 0; i < len; i++) {
const randomPoz = Math.floor(Math.random() * charSet.length);
randomString += charSet.substring(randomPoz, randomPoz + 1);
}
return randomString;
}
const receiveID = randomString(10);
const websocket = new WebSocket('ws://127.0.0.1:8000/' + receiveID);
websocket.onopen = function () {
document.getElementById('start').disabled = false;
}
// data channel
let dc = null, dcTimeout = null;
function createPeerConnection() {
const config = {
sdpSemantics: 'unified-plan',
bundlePolicy: "max-bundle",
};
if (document.getElementById('use-stun').checked) {
config.iceServers = [{urls: ['stun:stun.l.google.com:19302']}];
}
let pc = new RTCPeerConnection(config);
// register some listeners to help debugging
pc.addEventListener('icegatheringstatechange', function () {
iceGatheringLog.textContent += ' -> ' + pc.iceGatheringState;
}, false);
iceGatheringLog.textContent = pc.iceGatheringState;
pc.addEventListener('iceconnectionstatechange', function () {
iceConnectionLog.textContent += ' -> ' + pc.iceConnectionState;
}, false);
iceConnectionLog.textContent = pc.iceConnectionState;
pc.addEventListener('signalingstatechange', function () {
signalingLog.textContent += ' -> ' + pc.signalingState;
}, false);
signalingLog.textContent = pc.signalingState;
// connect audio / video
pc.addEventListener('track', function (evt) {
if (evt.track.kind == 'video') {
document.getElementById('media').style.display = 'block';
document.getElementById('video').srcObject = evt.streams[0];
} else {
document.getElementById('audio').srcObject = evt.streams[0];
}
});
let time_start = null;
function current_stamp() {
if (time_start === null) {
time_start = new Date().getTime();
return 0;
} else {
return new Date().getTime() - time_start;
}
}
pc.ondatachannel = function (event) {
dc = event.channel;
dc.onopen = function () {
dataChannelLog.textContent += '- open\n';
dataChannelLog.scrollTop = dataChannelLog.scrollHeight;
};
dc.onmessage = function (evt) {
dataChannelLog.textContent += '< ' + evt.data + '\n';
dataChannelLog.scrollTop = dataChannelLog.scrollHeight;
dcTimeout = setTimeout(function () {
if (dc == null && dcTimeout != null) {
dcTimeout = null;
return
}
const message = 'Pong ' + current_stamp();
dataChannelLog.textContent += '> ' + message + '\n';
dataChannelLog.scrollTop = dataChannelLog.scrollHeight;
dc.send(message);
}, 1000);
}
dc.onclose = function () {
clearTimeout(dcTimeout);
dcTimeout = null;
dataChannelLog.textContent += '- close\n';
dataChannelLog.scrollTop = dataChannelLog.scrollHeight;
};
}
return pc;
}
function sendAnswer(pc) {
return pc.createAnswer()
.then((answer) => rtc.setLocalDescription(answer))
.then(function () {
// wait for ICE gathering to complete
return new Promise(function (resolve) {
if (pc.iceGatheringState === 'complete') {
resolve();
} else {
function checkState() {
if (pc.iceGatheringState === 'complete') {
pc.removeEventListener('icegatheringstatechange', checkState);
resolve();
}
}
pc.addEventListener('icegatheringstatechange', checkState);
}
});
}).then(function () {
const answer = pc.localDescription;
document.getElementById('answer-sdp').textContent = answer.sdp;
return websocket.send(JSON.stringify(
{
id: "server",
type: answer.type,
sdp: answer.sdp,
}));
}).catch(function (e) {
alert(e);
});
}
function handleOffer(offer) {
rtc = createPeerConnection();
return rtc.setRemoteDescription(offer)
.then(() => sendAnswer(rtc));
}
function sendStreamRequest() {
websocket.send(JSON.stringify(
{
id: "server",
type: "streamRequest",
receiver: receiveID,
}));
}
async function start() {
document.getElementById('start').style.display = 'none';
document.getElementById('stop').style.display = 'inline-block';
document.getElementById('media').style.display = 'block';
sendStreamRequest();
}
function stop() {
document.getElementById('stop').style.display = 'none';
document.getElementById('media').style.display = 'none';
document.getElementById('start').style.display = 'inline-block';
// close data channel
if (dc) {
dc.close();
dc = null;
}
// close transceivers
if (rtc.getTransceivers) {
rtc.getTransceivers().forEach(function (transceiver) {
if (transceiver.stop) {
transceiver.stop();
}
});
}
// close local audio / video
rtc.getSenders().forEach(function (sender) {
const track = sender.track;
if (track !== null) {
sender.track.stop();
}
});
// close peer connection
setTimeout(function () {
rtc.close();
rtc = null;
}, 500);
}
websocket.onmessage = async function (evt) {
const received_msg = evt.data;
const object = JSON.parse(received_msg);
if (object.type == "offer") {
document.getElementById('offer-sdp').textContent = object.sdp;
await handleOffer(object)
}
}

View File

@ -0,0 +1,94 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#include "dispatchqueue.hpp"
DispatchQueue::DispatchQueue(std::string name, size_t threadCount) :
name{std::move(name)}, threads(threadCount) {
for(size_t i = 0; i < threads.size(); i++)
{
threads[i] = std::thread(&DispatchQueue::dispatchThreadHandler, this);
}
}
DispatchQueue::~DispatchQueue() {
// Signal to dispatch threads that it's time to wrap up
std::unique_lock<std::mutex> lock(lockMutex);
quit = true;
lock.unlock();
condition.notify_all();
// Wait for threads to finish before we exit
for(size_t i = 0; i < threads.size(); i++)
{
if(threads[i].joinable())
{
threads[i].join();
}
}
}
void DispatchQueue::removePending() {
std::unique_lock<std::mutex> lock(lockMutex);
queue = {};
}
void DispatchQueue::dispatch(const fp_t& op) {
std::unique_lock<std::mutex> lock(lockMutex);
queue.push(op);
// Manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again (see notify_one for details)
lock.unlock();
condition.notify_one();
}
void DispatchQueue::dispatch(fp_t&& op) {
std::unique_lock<std::mutex> lock(lockMutex);
queue.push(std::move(op));
// Manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again (see notify_one for details)
lock.unlock();
condition.notify_one();
}
void DispatchQueue::dispatchThreadHandler(void) {
std::unique_lock<std::mutex> lock(lockMutex);
do {
//Wait until we have data or a quit signal
condition.wait(lock, [this]{
return (queue.size() || quit);
});
//after wait, we own the lock
if(!quit && queue.size())
{
auto op = std::move(queue.front());
queue.pop();
//unlock now that we're done messing with the queue
lock.unlock();
op();
lock.lock();
}
} while (!quit);
}

View File

@ -0,0 +1,56 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef dispatchqueue_hpp
#define dispatchqueue_hpp
#include <thread>
#include <queue>
class DispatchQueue {
typedef std::function<void(void)> fp_t;
public:
DispatchQueue(std::string name, size_t threadCount = 1);
~DispatchQueue();
// dispatch and copy
void dispatch(const fp_t& op);
// dispatch and move
void dispatch(fp_t&& op);
void removePending();
// Deleted operations
DispatchQueue(const DispatchQueue& rhs) = delete;
DispatchQueue& operator=(const DispatchQueue& rhs) = delete;
DispatchQueue(DispatchQueue&& rhs) = delete;
DispatchQueue& operator=(DispatchQueue&& rhs) = delete;
private:
std::string name;
std::mutex lockMutex;
std::vector<std::thread> threads;
std::queue<fp_t> queue;
std::condition_variable condition;
bool quit = false;
void dispatchThreadHandler(void);
};
#endif /* dispatchqueue_hpp */

View File

@ -0,0 +1,59 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#include "fileparser.hpp"
#include <fstream>
using namespace std;
FileParser::FileParser(string directory, string extension, uint32_t samplesPerSecond, bool loop): sampleDuration_us(1000 * 1000 / samplesPerSecond), StreamSource() {
this->directory = directory;
this->extension = extension;
this->loop = loop;
}
void FileParser::start() {
sampleTime_us = -sampleDuration_us;
loadNextSample();
}
void FileParser::stop() {
StreamSource::stop();
counter = -1;
}
void FileParser::loadNextSample() {
string frame_id = to_string(++counter);
string url = directory + "/sample-" + frame_id + extension;
ifstream source(url, ios_base::binary);
if (!source) {
if (loop && counter > 0) {
loopTimestampOffset = sampleTime_us;
counter = -1;
loadNextSample();
return;
}
sample = {};
return;
}
vector<uint8_t> fileContents((std::istreambuf_iterator<char>(source)), std::istreambuf_iterator<char>());
sample = *reinterpret_cast<vector<byte> *>(&fileContents);
sampleTime_us += sampleDuration_us;
}

View File

@ -0,0 +1,40 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef fileparser_hpp
#define fileparser_hpp
#include <string>
#include <vector>
#include "stream.hpp"
class FileParser: public StreamSource {
std::string directory;
std::string extension;
uint32_t counter = -1;
bool loop;
uint64_t loopTimestampOffset = 0;
public:
const uint64_t sampleDuration_us;
virtual void start();
virtual void stop();
FileParser(std::string directory, std::string extension, uint32_t samplesPerSecond, bool loop);
virtual void loadNextSample();
};
#endif /* fileparser_hpp */

View File

@ -0,0 +1,70 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#include "h264fileparser.hpp"
#include <fstream>
#include "rtc/rtc.hpp"
using namespace std;
H264FileParser::H264FileParser(string directory, uint32_t fps, bool loop): FileParser(directory, ".h264", fps, loop) { }
void H264FileParser::loadNextSample() {
FileParser::loadNextSample();
unsigned long long i = 0;
while (i < sample.size()) {
assert(i + 4 < sample.size());
auto lengthPtr = (uint32_t *) (sample.data() + i);
uint32_t length = ntohl(*lengthPtr);
auto naluStartIndex = i + 4;
auto naluEndIndex = naluStartIndex + length;
assert(naluEndIndex <= sample.size());
auto header = reinterpret_cast<rtc::NalUnitHeader *>(sample.data() + naluStartIndex);
auto type = header->unitType();
switch (type) {
case 7:
previousUnitType7 = {sample.begin() + i, sample.begin() + naluEndIndex};
break;
case 8:
previousUnitType8 = {sample.begin() + i, sample.begin() + naluEndIndex};;
break;
case 5:
previousUnitType5 = {sample.begin() + i, sample.begin() + naluEndIndex};;
break;
}
i = naluEndIndex;
}
}
vector<byte> H264FileParser::initialNALUS() {
vector<byte> units{};
if (previousUnitType7.has_value()) {
auto nalu = previousUnitType7.value();
units.insert(units.end(), nalu.begin(), nalu.end());
}
if (previousUnitType8.has_value()) {
auto nalu = previousUnitType8.value();
units.insert(units.end(), nalu.begin(), nalu.end());
}
if (previousUnitType5.has_value()) {
auto nalu = previousUnitType5.value();
units.insert(units.end(), nalu.begin(), nalu.end());
}
return units;
}

View File

@ -0,0 +1,36 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef h264fileparser_hpp
#define h264fileparser_hpp
#include "fileparser.hpp"
#include <optional>
class H264FileParser: public FileParser {
std::optional<std::vector<std::byte>> previousUnitType5 = std::nullopt;
std::optional<std::vector<std::byte>> previousUnitType7 = std::nullopt;
std::optional<std::vector<std::byte>> previousUnitType8 = std::nullopt;
public:
H264FileParser(std::string directory, uint32_t fps, bool loop);
void loadNextSample() override;
std::vector<std::byte> initialNALUS();
};
#endif /* h264fileparser_hpp */

View File

@ -0,0 +1,49 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#include "helpers.hpp"
#include <sys/time.h>
using namespace std;
using namespace rtc;
ClientTrackData::ClientTrackData(shared_ptr<Track> track, shared_ptr<RTCPSenderReportable> sender) {
this->track = track;
this->sender = sender;
}
void Client::setState(State state) {
std::unique_lock lock(_mutex);
this->state = state;
}
Client::State Client::getState() {
std::shared_lock lock(_mutex);
return state;
}
ClientTrack::ClientTrack(string id, shared_ptr<ClientTrackData> trackData) {
this->id = id;
this->trackData = trackData;
}
uint64_t currentTimeInMicroSeconds() {
struct timeval time;
gettimeofday(&time, NULL);
return uint64_t(time.tv_sec) * 1000 * 1000 + time.tv_usec;
}

View File

@ -0,0 +1,63 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef helpers_hpp
#define helpers_hpp
#include "rtc/rtc.hpp"
struct ClientTrackData {
std::shared_ptr<rtc::Track> track;
std::shared_ptr<rtc::RTCPSenderReportable> sender;
ClientTrackData(std::shared_ptr<rtc::Track> track, std::shared_ptr<rtc::RTCPSenderReportable> sender);
};
struct Client {
enum class State {
Waiting,
WaitingForVideo,
WaitingForAudio,
Ready
};
const std::shared_ptr<rtc::PeerConnection> & peerConnection = _peerConnection;
Client(std::shared_ptr<rtc::PeerConnection> pc) {
_peerConnection = pc;
}
std::optional<std::shared_ptr<ClientTrackData>> video;
std::optional<std::shared_ptr<ClientTrackData>> audio;
std::optional<std::shared_ptr<rtc::DataChannel>> dataChannel{};
void setState(State state);
State getState();
private:
std::shared_mutex _mutex;
State state = State::Waiting;
std::string id;
std::shared_ptr<rtc::PeerConnection> _peerConnection;
};
struct ClientTrack {
std::string id;
std::shared_ptr<ClientTrackData> trackData;
ClientTrack(std::string id, std::shared_ptr<ClientTrackData> trackData);
};
uint64_t currentTimeInMicroSeconds();
#endif /* helpers_hpp */

View File

@ -0,0 +1,73 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>libdatachannel media example</title>
<style>
button {
padding: 8px 16px;
}
pre {
overflow-x: hidden;
overflow-y: auto;
}
video {
width: 100%;
}
.option {
margin-bottom: 8px;
}
#media {
max-width: 1280px;
}
</style>
</head>
<body>
<script src="https://webrtc.github.io/adapter/adapter-latest.js"></script>
<h2>Options</h2>
<div class="option">
<input id="use-stun" type="checkbox"/>
<label for="use-stun">Use STUN server</label>
</div>
<button id="start" onclick="start()" disabled>Start</button>
<button id="stop" style="display: none" onclick="stop()">Stop</button>
<h2>State</h2>
<p>
ICE gathering state: <span id="ice-gathering-state"></span>
</p>
<p>
ICE connection state: <span id="ice-connection-state"></span>
</p>
<p>
Signaling state: <span id="signaling-state"></span>
</p>
<div id="media" style="display: none">
<h2>Media</h2>
<audio id="audio" autoplay></audio>
<video id="video" autoplay playsinline></video>
</div>
<h2>Data channel</h2>
<pre id="data-channel" style="height: 200px;"></pre>
<h2>SDP</h2>
<h3>Offer</h3>
<pre id="offer-sdp"></pre>
<h3>Answer</h3>
<pre id="answer-sdp"></pre>
<script src="client.js"></script>
</body>
</html>

473
examples/streamer/main.cpp Normal file
View File

@ -0,0 +1,473 @@
/*
* libdatachannel client example
* Copyright (c) 2019-2020 Paul-Louis Ageneau
* Copyright (c) 2019 Murat Dogan
* Copyright (c) 2020 Will Munn
* Copyright (c) 2020 Nico Chatzi
* Copyright (c) 2020 Lara Mackey
* Copyright (c) 2020 Erik Cota-Robles
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#include "nlohmann/json.hpp"
#include "h264fileparser.hpp"
#include "opusfileparser.hpp"
#include "helpers.hpp"
using namespace rtc;
using namespace std;
using namespace std::chrono_literals;
using json = nlohmann::json;
template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
/// all connected clients
unordered_map<string, shared_ptr<Client>> clients{};
/// Creates peer connection and client representation
/// @param config Configuration
/// @param wws Websocket for signaling
/// @param id Client ID
/// @returns Client
shared_ptr<Client> createPeerConnection(const Configuration &config,
weak_ptr<WebSocket> wws,
string id);
/// Creates stream
/// @param h264Samples Directory with H264 samples
/// @param fps Video FPS
/// @param opusSamples Directory with opus samples
/// @returns Stream object
shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples);
/// Add client to stream
/// @param client Client
/// @param adding_video True if adding video
void addToStream(shared_ptr<Client> client, bool isAddingVideo);
/// Start stream
void startStream();
/// Main dispatch queue
DispatchQueue MainThread("Main");
/// Audio and video stream
optional<shared_ptr<Stream>> avStream = nullopt;
const string defaultRootDirectory = "../../../../examples/streamer/samples/";
const string defaultH264SamplesDirectory = defaultRootDirectory + "h264/";
string h264SamplesDirectory = defaultH264SamplesDirectory;
const string defaultOpusSamplesDirectory = defaultRootDirectory + "opus/";
string opusSamplesDirectory = defaultOpusSamplesDirectory;
const string defaultIPAddress = "127.0.0.1";
const uint16_t defaultPort = 8000;
string ip_address = defaultIPAddress;
uint16_t port = defaultPort;
/// Incomming message handler for websocket
/// @param message Incommint message
/// @param config Configuration
/// @param ws Websocket
void wsOnMessage(json message, Configuration config, shared_ptr<WebSocket> ws) {
auto it = message.find("id");
if (it == message.end())
return;
string id = it->get<string>();
it = message.find("type");
if (it == message.end())
return;
string type = it->get<string>();
if (type == "streamRequest") {
shared_ptr<Client> c = createPeerConnection(config, make_weak_ptr(ws), id);
clients.emplace(id, c);
} else if (type == "answer") {
shared_ptr<Client> c;
if (auto jt = clients.find(id); jt != clients.end()) {
auto pc = clients.at(id)->peerConnection;
auto sdp = message["sdp"].get<string>();
auto description = Description(sdp, type);
pc->setRemoteDescription(description);
}
}
}
int main(int argc, char **argv) try {
bool enableDebugLogs = false;
bool printHelp = false;
int c = 0;
// parse arguments
while ((c = getopt (argc, argv, "a:b:d:p:vh")) != -1) {
switch (c) {
case 'a':
opusSamplesDirectory = string(optarg) + "/";
break;
case 'b':
h264SamplesDirectory = string(optarg) + "/";
break;
case 'd':
ip_address = string(optarg);
break;
case 'p':
port = atoi(optarg);
break;
case 'v':
enableDebugLogs = true;
break;
case 'h':
printHelp = true;
break;
case '?':
if (optopt == 'a' || optopt == 'b' || optopt == 'd' || optopt == 'p') {
string s(1, optopt);
cerr << "Option -" << s <<" requires an argument." << endl;
} else if (isprint(optopt)) {
string s(1, optopt);
cerr << "Unknown option `-" << s << "'." << endl;
} else {
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (printHelp) {
cout << "usage: stream-h264 [-a opus_samples_folder] [-b h264_samples_folder] [-d ip_address] [-p port] [-v] [-h]" << endl
<< "Arguments:" << endl
<< "\t -a " << "Directory with opus samples (default: " << defaultOpusSamplesDirectory << ")." << endl
<< "\t -b " << "Directory with H264 samples (default: " << defaultH264SamplesDirectory << ")." << endl
<< "\t -d " << "Signaling server IP address (default: " << defaultIPAddress << ")." << endl
<< "\t -p " << "Signaling server port (default: " << defaultPort << ")." << endl
<< "\t -v " << "Enable debug logs." << endl
<< "\t -h " << "Print this help and exit." << endl;
return 0;
}
if (enableDebugLogs) {
InitLogger(LogLevel::Debug);
}
Configuration config;
string stunServer = "stun:stun.l.google.com:19302";
cout << "Stun server is " << stunServer << endl;
config.iceServers.emplace_back(stunServer);
string localId = "server";
cout << "The local ID is: " << localId << endl;
auto ws = make_shared<WebSocket>();
ws->onOpen([]() { cout << "WebSocket connected, signaling ready" << endl; });
ws->onClosed([]() { cout << "WebSocket closed" << endl; });
ws->onError([](const string &error) { cout << "WebSocket failed: " << error << endl; });
ws->onMessage([&](variant<binary, string> data) {
if (!holds_alternative<string>(data))
return;
json message = json::parse(get<string>(data));
MainThread.dispatch([message, config, ws]() {
wsOnMessage(message, config, ws);
});
});
const string url = "ws://" + ip_address + ":" + to_string(port) + "/" + localId;
cout << "Url is " << url << endl;
ws->open(url);
cout << "Waiting for signaling to be connected..." << endl;
while (!ws->isOpen()) {
if (ws->isClosed())
return 1;
this_thread::sleep_for(100ms);
}
while (true) {
string id;
cout << "Enter to exit" << endl;
cin >> id;
cin.ignore();
cout << "exiting" << endl;
break;
}
cout << "Cleaning up..." << endl;
return 0;
} catch (const std::exception &e) {
std::cout << "Error: " << e.what() << std::endl;
return -1;
}
shared_ptr<ClientTrackData> addVideo(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
auto video = Description::Video(cname);
video.addH264Codec(payloadType);
video.addSSRC(ssrc, cname, msid);
auto track = pc->addTrack(video);
// create RTP configuration
auto rtpConfig = shared_ptr<RTPPacketizationConfig>(new RTPPacketizationConfig(ssrc, cname, payloadType, H264RTPPacketizer::defaultClockRate));
// create packetizer
auto packetizer = shared_ptr<H264RTPPacketizer>(new H264RTPPacketizer(rtpConfig));
// create H264 and RTCP SP handler
shared_ptr<H264PacketizationHandler> h264Handler(new H264PacketizationHandler(H264PacketizationHandler::Separator::Length, packetizer));
// set handler
track->setRtcpHandler(h264Handler);
track->onOpen(onOpen);
auto trackData = make_shared<ClientTrackData>(track, h264Handler);
return trackData;
}
shared_ptr<ClientTrackData> addAudio(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
auto audio = Description::Audio(cname);
audio.addOpusCodec(payloadType);
audio.addSSRC(ssrc, cname, msid);
auto track = pc->addTrack(audio);
// create RTP configuration
auto rtpConfig = shared_ptr<RTPPacketizationConfig>(new RTPPacketizationConfig(ssrc, cname, payloadType, OpusRTPPacketizer::defaultClockRate));
// create packetizer
auto packetizer = make_shared<OpusRTPPacketizer>(rtpConfig);
// create opus and RTCP SP handler
auto opusHandler = make_shared<OpusPacketizationHandler>(packetizer);
// set handler
track->setRtcpHandler(opusHandler);
track->onOpen(onOpen);
auto trackData = make_shared<ClientTrackData>(track, opusHandler);
return trackData;
}
// Create and setup a PeerConnection
shared_ptr<Client> createPeerConnection(const Configuration &config,
weak_ptr<WebSocket> wws,
string id) {
auto pc = make_shared<PeerConnection>(config);
shared_ptr<Client> client(new Client(pc));
pc->onStateChange([id](PeerConnection::State state) {
cout << "State: " << state << endl;
if (state == PeerConnection::State::Disconnected ||
state == PeerConnection::State::Failed ||
state == PeerConnection::State::Closed) {
// remove disconnected client
MainThread.dispatch([id]() {
clients.erase(id);
});
}
});
pc->onGatheringStateChange(
[wpc = make_weak_ptr(pc), id, wws](PeerConnection::GatheringState state) {
cout << "Gathering State: " << state << endl;
if (state == PeerConnection::GatheringState::Complete) {
if(auto pc = wpc.lock()) {
auto description = pc->localDescription();
json message = {
{"id", id},
{"type", description->typeString()},
{"sdp", string(description.value())}
};
// Gathering complete, send answer
if (auto ws = wws.lock()) {
ws->send(message.dump());
}
}
}
});
client->video = addVideo(pc, 102, 1, "video-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
MainThread.dispatch([wc]() {
if (auto c = wc.lock()) {
addToStream(c, true);
}
});
cout << "Video from " << id << " opened" << endl;
});
client->audio = addAudio(pc, 111, 2, "audio-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
MainThread.dispatch([wc]() {
if (auto c = wc.lock()) {
addToStream(c, false);
}
});
cout << "Audio from " << id << " opened" << endl;
});
auto dc = pc->addDataChannel("ping-pong");
dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
if (auto dc = wdc.lock()) {
dc->send("Ping");
}
});
dc->onMessage(nullptr, [id, wdc = make_weak_ptr(dc)](string msg) {
cout << "Message from " << id << " received: " << msg << endl;
if (auto dc = wdc.lock()) {
dc->send("Ping");
}
});
client->dataChannel = dc;
pc->setLocalDescription();
return client;
};
/// Create stream
shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples) {
// video source
auto video = make_shared<H264FileParser>(h264Samples, fps, true);
// audio source
auto audio = make_shared<OPUSFileParser>(opusSamples, true);
auto stream = make_shared<Stream>(video, audio);
// set callback responsible for sample sending
stream->onSample([ws = make_weak_ptr(stream)](Stream::StreamSourceType type, uint64_t sampleTime, rtc::binary sample) {
vector<ClientTrack> tracks{};
string streamType = type == Stream::StreamSourceType::Video ? "video" : "audio";
// get track for given type
function<optional<shared_ptr<ClientTrackData>> (shared_ptr<Client>)> getTrackData = [type](shared_ptr<Client> client) {
return type == Stream::StreamSourceType::Video ? client->video : client->audio;
};
// get all clients with Ready state
for(auto id_client: clients) {
auto id = id_client.first;
auto client = id_client.second;
auto optTrackData = getTrackData(client);
if (client->getState() == Client::State::Ready && optTrackData.has_value()) {
auto trackData = optTrackData.value();
tracks.push_back(ClientTrack(id, trackData));
}
}
if (!tracks.empty()) {
auto message = make_message(move(sample));
for (auto clientTrack: tracks) {
auto client = clientTrack.id;
auto trackData = clientTrack.trackData;
// sample time is in us, we need to convert it to seconds
auto elapsedSeconds = double(sampleTime) / (1000 * 1000);
auto rtpConfig = trackData->sender->rtpConfig;
// get elapsed time in clock rate
uint32_t elapsedTimestamp = rtpConfig->secondsToTimestamp(elapsedSeconds);
// set new timestamp
rtpConfig->timestamp = rtpConfig->startTimestamp + elapsedTimestamp;
// get elapsed time in clock rate from last RTCP sender report
auto reportElapsedTimestamp = rtpConfig->timestamp - trackData->sender->previousReportedTimestamp;
// check if last report was at least 1 second ago
if (rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) {
trackData->sender->setNeedsToReport();
}
cout << "Sending " << streamType << " sample with size: " << to_string(message->size()) << " to " << client << endl;
bool send = false;
try {
// send sample
send = trackData->track->send(*message);
} catch (...) {
send = false;
}
if (!send) {
cerr << "Unable to send "<< streamType << " packet" << endl;
break;
}
}
}
MainThread.dispatch([ws]() {
if (clients.empty()) {
// we have no clients, stop the stream
if (auto stream = ws.lock()) {
stream->stop();
}
}
});
});
return stream;
}
/// Start stream
void startStream() {
shared_ptr<Stream> stream;
if (avStream.has_value()) {
stream = avStream.value();
if (stream->isRunning) {
// stream is already running
return;
}
} else {
stream = createStream(h264SamplesDirectory, 30, opusSamplesDirectory);
avStream = stream;
}
stream->start();
}
/// Send previous key frame so browser can show something to user
/// @param stream Stream
/// @param video Video track data
void sendInitialNalus(shared_ptr<Stream> stream, shared_ptr<ClientTrackData> video) {
auto h264 = dynamic_cast<H264FileParser *>(stream->video.get());
auto initialNalus = h264->initialNALUS();
// send previous NALU key frame so users don't have to wait to see stream works
if (!initialNalus.empty()) {
const double frameDuration_s = double(h264->sampleDuration_us) / (1000 * 1000);
const uint32_t frameTimestampDuration = video->sender->rtpConfig->secondsToTimestamp(frameDuration_s);
video->sender->rtpConfig->timestamp = video->sender->rtpConfig->startTimestamp - frameTimestampDuration * 2;
video->track->send(initialNalus);
video->sender->rtpConfig->timestamp += frameTimestampDuration;
// Send initial NAL units again to start stream in firefox browser
video->track->send(initialNalus);
}
}
/// Add client to stream
/// @param client Client
/// @param adding_video True if adding video
void addToStream(shared_ptr<Client> client, bool isAddingVideo) {
if (client->getState() == Client::State::Waiting) {
client->setState(isAddingVideo ? Client::State::WaitingForAudio : Client::State::WaitingForVideo);
} else if ((client->getState() == Client::State::WaitingForAudio && !isAddingVideo)
|| (client->getState() == Client::State::WaitingForVideo && isAddingVideo)) {
// Audio and video tracks are collected now
assert(client->video.has_value() && client->audio.has_value());
auto video = client->video.value();
auto audio = client->audio.value();
auto currentTime_us = double(currentTimeInMicroSeconds());
auto currentTime_s = currentTime_us / (1000 * 1000);
// set start time of stream
video->sender->rtpConfig->setStartTime(currentTime_s, RTPPacketizationConfig::EpochStart::T1970);
audio->sender->rtpConfig->setStartTime(currentTime_s, RTPPacketizationConfig::EpochStart::T1970);
// start stat recording of RTCP SR
video->sender->startRecording();
audio->sender->startRecording();
if (avStream.has_value()) {
sendInitialNalus(avStream.value(), video);
}
client->setState(Client::State::Ready);
}
if (client->getState() == Client::State::Ready) {
startStream();
}
}

View File

@ -0,0 +1,23 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#include "opusfileparser.hpp"
using namespace std;
OPUSFileParser::OPUSFileParser(string directory, bool loop, uint32_t samplesPerSecond): FileParser(directory, ".opus", samplesPerSecond, loop) { }

View File

@ -0,0 +1,32 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef opusfileparser_hpp
#define opusfileparser_hpp
#include "fileparser.hpp"
class OPUSFileParser: public FileParser {
static const uint32_t defaultSamplesPerSecond = 50;
public:
OPUSFileParser(std::string directory, bool loop, uint32_t samplesPerSecond = OPUSFileParser::defaultSamplesPerSecond);
};
#endif /* opusfileparser_hpp */

View File

@ -0,0 +1,107 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#include "stream.hpp"
#include "helpers.hpp"
void StreamSource::stop() {
sampleTime_us = 0;
sample = {};
}
StreamSource::~StreamSource() {
stop();
}
Stream::Stream(std::shared_ptr<StreamSource> video, std::shared_ptr<StreamSource> audio): std::enable_shared_from_this<Stream>(), video(video), audio(audio) { }
Stream::~Stream() {
stop();
}
std::pair<std::shared_ptr<StreamSource>, Stream::StreamSourceType> Stream::unsafePrepareForSample() {
std::shared_ptr<StreamSource> ss;
StreamSourceType sst;
uint64_t nextTime;
if (audio->getSampleTime_us() < video->getSampleTime_us()) {
ss = audio;
sst = StreamSourceType::Audio;
nextTime = audio->getSampleTime_us();
} else {
ss = video;
sst = StreamSourceType::Video;
nextTime = video->getSampleTime_us();
}
auto currentTime = currentTimeInMicroSeconds();
auto elapsed = currentTime - startTime;
if (nextTime > elapsed) {
auto waitTime = nextTime - elapsed;
mutex.unlock();
usleep(waitTime);
mutex.lock();
}
return {ss, sst};
}
void Stream::sendSample() {
std::lock_guard lock(mutex);
if (!isRunning) {
return;
}
auto ssSST = unsafePrepareForSample();
auto ss = ssSST.first;
auto sst = ssSST.second;
auto sample = ss->getSample();
sampleHandler(sst, ss->getSampleTime_us(), sample);
ss->loadNextSample();
dispatchQueue.dispatch([this]() {
this->sendSample();
});
}
void Stream::onSample(std::function<void (StreamSourceType, uint64_t, rtc::binary)> handler) {
sampleHandler = handler;
}
void Stream::start() {
std::lock_guard lock(mutex);
if (isRunning) {
return;
}
_isRunning = true;
startTime = currentTimeInMicroSeconds();
audio->start();
video->start();
dispatchQueue.dispatch([this]() {
this->sendSample();
});
}
void Stream::stop() {
std::lock_guard lock(mutex);
if (!isRunning) {
return;
}
_isRunning = false;
dispatchQueue.removePending();
audio->stop();
video->stop();
};

View File

@ -0,0 +1,73 @@
/*
* libdatachannel streamer example
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef stream_hpp
#define stream_hpp
#include "dispatchqueue.hpp"
#include "rtc/rtc.hpp"
class StreamSource {
protected:
uint64_t sampleTime_us = 0;
rtc::binary sample = {};
public:
StreamSource() { }
virtual void start() = 0;
virtual void stop();
virtual void loadNextSample() = 0;
inline uint64_t getSampleTime_us() { return sampleTime_us; }
inline rtc::binary getSample() { return sample; }
~StreamSource();
};
class Stream: std::enable_shared_from_this<Stream> {
uint64_t startTime = 0;
std::mutex mutex;
DispatchQueue dispatchQueue = DispatchQueue("StreamQueue");
bool _isRunning = false;
public:
const std::shared_ptr<StreamSource> audio;
const std::shared_ptr<StreamSource> video;
Stream(std::shared_ptr<StreamSource> video, std::shared_ptr<StreamSource> audio);
enum class StreamSourceType {
Audio,
Video
};
~Stream();
private:
rtc::synchronized_callback<StreamSourceType, uint64_t, rtc::binary> sampleHandler;
std::pair<std::shared_ptr<StreamSource>, StreamSourceType> unsafePrepareForSample();
void sendSample();
public:
void onSample(std::function<void (StreamSourceType, uint64_t, rtc::binary)> handler);
void start();
void stop();
const bool & isRunning = _isRunning;
};
#endif /* stream_hpp */