diff --git a/src/processor.cpp b/src/processor.cpp index 5e83e47..d5f98c4 100644 --- a/src/processor.cpp +++ b/src/processor.cpp @@ -25,18 +25,39 @@ Processor::Processor(size_t limit) : mTasks(limit) {} Processor::~Processor() { join(); } void Processor::join() { - std::unique_lock lock(mMutex); - mCondition.wait(lock, [this]() { return !mPending && mTasks.empty(); }); + // We need to detect situations where the thread pool does not execute a pending task at exit + std::optional counter; + while (true) { + std::shared_future pending; + { + std::unique_lock lock(mMutex); + if (!mPending // no pending task + || (counter && *counter == mCounter)) { // or no scheduled task after the last one + + // Processing is stopped, clear everything and return + mPending.reset(); + while (!mTasks.empty()) + mTasks.pop(); + + return; + } + + pending = *mPending; + counter = mCounter; + } + + // Wait for the pending task + pending.wait(); + } } void Processor::schedule() { std::unique_lock lock(mMutex); if (auto next = mTasks.tryPop()) { - ThreadPool::Instance().enqueue(std::move(*next)); + mPending = ThreadPool::Instance().enqueue(std::move(*next)).share(); + ++mCounter; } else { - // No more tasks - mPending = false; - mCondition.notify_all(); + mPending.reset(); // No more tasks } } diff --git a/src/processor.hpp b/src/processor.hpp index a63259b..47a82bc 100644 --- a/src/processor.hpp +++ b/src/processor.hpp @@ -54,10 +54,10 @@ protected: const init_token mInitToken = Init::Token(); Queue> mTasks; - bool mPending = false; // true iff a task is pending in the thread pool + std::optional> mPending; // future of the pending task + unsigned int mCounter = 0; // Number of scheduled tasks mutable std::mutex mMutex; - std::condition_variable mCondition; }; template void Processor::enqueue(F &&f, Args &&...args) { @@ -65,12 +65,12 @@ template void Processor::enqueue(F &&f, Args &&...args) auto bound = std::bind(std::forward(f), std::forward(args)...); auto task = [this, bound = std::move(bound)]() mutable { scope_guard guard(std::bind(&Processor::schedule, this)); // chain the next task - return bound(); + bound(); }; if (!mPending) { - ThreadPool::Instance().enqueue(std::move(task)); - mPending = true; + mPending = ThreadPool::Instance().enqueue(std::move(task)).share(); + ++mCounter; } else { mTasks.push(std::move(task)); } diff --git a/src/threadpool.cpp b/src/threadpool.cpp index 261195e..9a7be69 100644 --- a/src/threadpool.cpp +++ b/src/threadpool.cpp @@ -96,6 +96,10 @@ std::function ThreadPool::dequeue() { mCondition.wait(lock); } } + + while (!mTasks.empty()) + mTasks.pop(); + return nullptr; } diff --git a/src/threadpool.hpp b/src/threadpool.hpp index bb134a7..8f56894 100644 --- a/src/threadpool.hpp +++ b/src/threadpool.hpp @@ -100,8 +100,16 @@ auto ThreadPool::schedule(clock::duration delay, F &&f, Args &&...args) template auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args) -> invoke_future_t { - std::unique_lock lock(mMutex); using R = std::invoke_result_t, std::decay_t...>; + std::unique_lock lock(mMutex); + if (mJoining) { + std::promise promise; + std::future result = promise.get_future(); + promise.set_exception(std::make_exception_ptr( + std::runtime_error("Scheduled a task while joining the thread pool"))); + return result; + } + auto bound = std::bind(std::forward(f), std::forward(args)...); auto task = std::make_shared>([bound = std::move(bound)]() mutable { try {