constant throughput functionality

This commit is contained in:
Murat Dogan
2021-03-19 09:27:01 +03:00
parent 84e114ebfc
commit 31f384f4e0

View File

@ -43,6 +43,7 @@ using namespace std;
using namespace std::chrono_literals;
using chrono::duration_cast;
using chrono::microseconds;
using chrono::milliseconds;
using chrono::steady_clock;
@ -65,6 +66,13 @@ binary messageData(messageSize);
atomic<size_t> receivedSize = 0, sentSize = 0;
bool noSend = false;
// Benchmark - enableThroughputSet params
bool enableThroughputSet;
int throughtputSetAsKB;
int bufferSize;
const float STEP_COUNT_FOR_1_SEC = 100.0;
const int stepDurationInMs = 1000 / STEP_COUNT_FOR_1_SEC;
int main(int argc, char **argv) try {
Cmdline params(argc, argv);
@ -73,6 +81,11 @@ int main(int argc, char **argv) try {
// Benchmark - construct message to send
fill(messageData.begin(), messageData.end(), std::byte(0xFF));
// Benchmark - enableThroughputSet params
enableThroughputSet = params.enableThroughputSet();
throughtputSetAsKB = params.throughtputSetAsKB();
bufferSize = params.bufferSize();
// No Send option
noSend = params.noSend();
if (noSend)
@ -181,14 +194,20 @@ int main(int argc, char **argv) try {
cout << "Creating DataChannel with label \"" << label << "\"" << endl;
auto dc = pc->createDataChannel(label);
// Set Buffer Size
dc->setBufferedAmountLowThreshold(bufferSize);
dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
cout << "DataChannel from " << id << " open" << endl;
if (noSend)
return;
if (enableThroughputSet)
return;
if (auto dcLocked = wdc.lock()) {
cout << "Starting benchmark test. Sending data..." << endl;
try {
while (dcLocked->bufferedAmount() == 0) {
while (dcLocked->bufferedAmount() <= bufferSize) {
dcLocked->send(messageData);
sentSize += messageData.size();
}
@ -202,13 +221,16 @@ int main(int argc, char **argv) try {
if (noSend)
return;
if (enableThroughputSet)
return;
auto dcLocked = wdc.lock();
if (!dcLocked)
return;
// Continue sending
try {
while (dcLocked->isOpen() && dcLocked->bufferedAmount() == 0) {
while (dcLocked->isOpen() && dcLocked->bufferedAmount() <= bufferSize) {
dcLocked->send(messageData);
sentSize += messageData.size();
}
@ -227,28 +249,61 @@ int main(int argc, char **argv) try {
dataChannelMap.emplace(id, dc);
const int duration = params.durationInSec() > 0 ? params.durationInSec() : INT32_MAX;
cout << "Becnhmark will run for " << duration << " seconds" << endl;
cout << "Benchmark will run for " << duration << " seconds" << endl;
int printCounter = 0;
int printStatCounter = 0;
steady_clock::time_point startTime = steady_clock::now();
for (int i = 1; i <= duration; ++i) {
this_thread::sleep_for(1000ms);
unsigned long _receivedSize = receivedSize.exchange(0);
unsigned long _sentSize = sentSize.exchange(0);
float timePassedAsSec = ((steady_clock::now() - startTime).count()) / (1000.0 * 1000.0); // sec
startTime = steady_clock::now();
steady_clock::time_point printTime = steady_clock::now();
steady_clock::time_point stepTime = steady_clock::now();
// Byte count to send for every loop
int byteToSendOnEveryLoop = throughtputSetAsKB * stepDurationInMs;
for (int i = 1; i <= duration * STEP_COUNT_FOR_1_SEC; ++i) {
this_thread::sleep_for(milliseconds(stepDurationInMs));
printCounter++;
cout << "#" << i << " Received: " << static_cast<int>(_receivedSize / timePassedAsSec) << " KB/s"
<< " Sent: " << static_cast<int>(_sentSize / timePassedAsSec) << " KB/s"
<< " BufferSize: " << dc->bufferedAmount() << endl;
printStatCounter++;
if (enableThroughputSet) {
float dataCountMultiplier =
duration_cast<microseconds>((steady_clock::now() - stepTime)).count() /
(1000.0 * stepDurationInMs);
if (printStatCounter % 5 == 0) {
stepTime = steady_clock::now();
int byteToSendThisLoop = static_cast<int>(byteToSendOnEveryLoop * dataCountMultiplier);
binary tempMessageData(byteToSendThisLoop);
fill(tempMessageData.begin(), tempMessageData.end(), std::byte(0xFF));
if (dc->isOpen() && dc->bufferedAmount() <= bufferSize * byteToSendOnEveryLoop) {
dc->send(tempMessageData);
sentSize += tempMessageData.size();
}
}
if (printCounter >= STEP_COUNT_FOR_1_SEC) {
unsigned long _receivedSize = receivedSize.exchange(0);
unsigned long _sentSize = sentSize.exchange(0);
float dataCountMultiplier =
duration_cast<milliseconds>((steady_clock::now() - printTime)).count() /
(1000.0); // sec
printTime = steady_clock::now();
cout << "#" << i / STEP_COUNT_FOR_1_SEC
<< " Received: " << static_cast<int>(_receivedSize / (dataCountMultiplier * 1000))
<< " KB/s"
<< " Sent: " << static_cast<int>(_sentSize / (dataCountMultiplier * 1000))
<< " KB/s"
<< " BufferSize: " << dc->bufferedAmount() << endl;
printStatCounter++;
printCounter = 0;
}
if (printStatCounter >= 5) {
cout << "Stats# "
<< "Received Total: " << pc->bytesReceived() / (1000 * 1000) << " MB"
<< " Sent Total: " << pc->bytesSent() / (1000 * 1000) << " MB"
<< " RTT: " << pc->rtt().value_or(0ms).count() << " ms" << endl;
cout << endl;
printStatCounter = 0;
}
}
@ -301,10 +356,12 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
cout << "### Check other peer's screen for stats ###" << endl;
cout << "###########################################" << endl;
if (!noSend) {
cout << "Starting benchmark test. Sending data ..." << endl;
// Set Buffer Size
dc->setBufferedAmountLowThreshold(bufferSize);
if (!noSend && !enableThroughputSet) {
try {
while (dc->bufferedAmount() == 0) {
while (dc->bufferedAmount() <= bufferSize) {
dc->send(messageData);
sentSize += messageData.size();
}
@ -313,17 +370,62 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
}
}
if (!noSend && enableThroughputSet) {
// Create Send Data Thread
// Thread will join when data channel destroyed or closed
std::thread([wdc = make_weak_ptr(dc)]() {
steady_clock::time_point stepTime = steady_clock::now();
// Byte count to send for every loop
int byteToSendOnEveryLoop = throughtputSetAsKB * stepDurationInMs;
while (true) {
this_thread::sleep_for(milliseconds(stepDurationInMs));
auto dcLocked = wdc.lock();
if (!dcLocked)
break;
if (!dcLocked->isOpen())
break;
try {
float dataCountMultiplier =
duration_cast<microseconds>((steady_clock::now() - stepTime)).count() /
(1000.0 * stepDurationInMs);
stepTime = steady_clock::now();
int byteToSendThisLoop =
static_cast<int>(byteToSendOnEveryLoop * dataCountMultiplier);
binary tempMessageData(byteToSendThisLoop);
fill(tempMessageData.begin(), tempMessageData.end(), std::byte(0xFF));
if (dcLocked->bufferedAmount() <= bufferSize) {
dcLocked->send(tempMessageData);
sentSize += tempMessageData.size();
}
} catch (const std::exception &e) {
std::cout << "Send failed: " << e.what() << std::endl;
}
}
cout << "Send Data Thread exiting..." << endl;
}).detach();
}
dc->onBufferedAmountLow([wdc = make_weak_ptr(dc)]() {
if (noSend)
return;
if (enableThroughputSet)
return;
auto dcLocked = wdc.lock();
if (!dcLocked)
return;
// Continue sending
try {
while (dcLocked->isOpen() && dcLocked->bufferedAmount() == 0) {
while (dcLocked->isOpen() && dcLocked->bufferedAmount() <= bufferSize) {
dcLocked->send(messageData);
sentSize += messageData.size();
}