/** * Copyright (c) 2020 Paul-Louis Ageneau * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef RTC_IMPL_THREADPOOL_H #define RTC_IMPL_THREADPOOL_H #include "common.hpp" #include "init.hpp" #include "internals.hpp" #include #include #include #include #include #include #include #include #include #include #include namespace rtc::impl { template using invoke_future_t = std::future, std::decay_t...>>; class ThreadPool final { public: using clock = std::chrono::steady_clock; static ThreadPool &Instance(); ThreadPool(const ThreadPool &) = delete; ThreadPool &operator=(const ThreadPool &) = delete; ThreadPool(ThreadPool &&) = delete; ThreadPool &operator=(ThreadPool &&) = delete; int count() const; void spawn(int count = 1); void join(); void run(); bool runOne(); template auto enqueue(F &&f, Args &&...args) -> invoke_future_t; template auto schedule(clock::duration delay, F &&f, Args &&...args) -> invoke_future_t; template auto schedule(clock::time_point time, F &&f, Args &&...args) -> invoke_future_t; protected: ThreadPool(); ~ThreadPool(); std::function dequeue(); // returns null function if joining std::vector mWorkers; std::atomic mBusyWorkers = 0; std::atomic mJoining = false; struct Task { clock::time_point time; std::function func; bool operator>(const Task &other) const { return time > other.time; } bool operator<(const Task &other) const { return time < other.time; } }; std::priority_queue, std::greater> mTasks; std::condition_variable mTasksCondition, mWaitingCondition; mutable std::mutex mMutex, mWorkersMutex; }; template auto ThreadPool::enqueue(F &&f, Args &&...args) -> invoke_future_t { return schedule(clock::now(), std::forward(f), std::forward(args)...); } template auto ThreadPool::schedule(clock::duration delay, F &&f, Args &&...args) -> invoke_future_t { return schedule(clock::now() + delay, std::forward(f), std::forward(args)...); } template auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args) -> invoke_future_t { std::unique_lock lock(mMutex); using R = std::invoke_result_t, std::decay_t...>; auto bound = std::bind(std::forward(f), std::forward(args)...); auto task = std::make_shared>([bound = std::move(bound)]() mutable { try { return bound(); } catch (const std::exception &e) { PLOG_WARNING << e.what(); throw; } }); std::future result = task->get_future(); mTasks.push({time, [task = std::move(task), token = Init::Token()]() { return (*task)(); }}); mTasksCondition.notify_one(); return result; } } // namespace rtc::impl #endif