Prevent scheduling tasks while joining thread pool

This commit is contained in:
Paul-Louis Ageneau
2021-02-03 22:21:22 +01:00
parent b5589dbd57
commit ab392fe0da
4 changed files with 45 additions and 12 deletions

View File

@ -25,18 +25,39 @@ Processor::Processor(size_t limit) : mTasks(limit) {}
Processor::~Processor() { join(); }
void Processor::join() {
std::unique_lock lock(mMutex);
mCondition.wait(lock, [this]() { return !mPending && mTasks.empty(); });
// We need to detect situations where the thread pool does not execute a pending task at exit
std::optional<unsigned int> counter;
while (true) {
std::shared_future<void> pending;
{
std::unique_lock lock(mMutex);
if (!mPending // no pending task
|| (counter && *counter == mCounter)) { // or no scheduled task after the last one
// Processing is stopped, clear everything and return
mPending.reset();
while (!mTasks.empty())
mTasks.pop();
return;
}
pending = *mPending;
counter = mCounter;
}
// Wait for the pending task
pending.wait();
}
}
void Processor::schedule() {
std::unique_lock lock(mMutex);
if (auto next = mTasks.tryPop()) {
ThreadPool::Instance().enqueue(std::move(*next));
mPending = ThreadPool::Instance().enqueue(std::move(*next)).share();
++mCounter;
} else {
// No more tasks
mPending = false;
mCondition.notify_all();
mPending.reset(); // No more tasks
}
}

View File

@ -54,10 +54,10 @@ protected:
const init_token mInitToken = Init::Token();
Queue<std::function<void()>> mTasks;
bool mPending = false; // true iff a task is pending in the thread pool
std::optional<std::shared_future<void>> mPending; // future of the pending task
unsigned int mCounter = 0; // Number of scheduled tasks
mutable std::mutex mMutex;
std::condition_variable mCondition;
};
template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args) {
@ -65,12 +65,12 @@ template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args)
auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task = [this, bound = std::move(bound)]() mutable {
scope_guard guard(std::bind(&Processor::schedule, this)); // chain the next task
return bound();
bound();
};
if (!mPending) {
ThreadPool::Instance().enqueue(std::move(task));
mPending = true;
mPending = ThreadPool::Instance().enqueue(std::move(task)).share();
++mCounter;
} else {
mTasks.push(std::move(task));
}

View File

@ -96,6 +96,10 @@ std::function<void()> ThreadPool::dequeue() {
mCondition.wait(lock);
}
}
while (!mTasks.empty())
mTasks.pop();
return nullptr;
}

View File

@ -100,8 +100,16 @@ auto ThreadPool::schedule(clock::duration delay, F &&f, Args &&...args)
template <class F, class... Args>
auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args)
-> invoke_future_t<F, Args...> {
std::unique_lock lock(mMutex);
using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
std::unique_lock lock(mMutex);
if (mJoining) {
std::promise<R> promise;
std::future<R> result = promise.get_future();
promise.set_exception(std::make_exception_ptr(
std::runtime_error("Scheduled a task while joining the thread pool")));
return result;
}
auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task = std::make_shared<std::packaged_task<R()>>([bound = std::move(bound)]() mutable {
try {