explicit ThreadPool(std::size_t numThreads) {
threads_.reserve(numThreads);
for (std::size_t i = 0; i < numThreads; ++i) {
- threads_.emplace_back([&] {
+ threads_.emplace_back([this] {
std::function<void()> task;
while (tasks_.pop(task)) {
task();
/**
* Push an item onto the work queue. Notify a single thread that work is
* available. If `finish()` has been called, do nothing and return false.
+ * If `push()` returns false, then `item` has not been moved from.
*
* @param item Item to push onto the queue.
* @returns True upon success, false if `finish()` has been called. An
* item was pushed iff `push()` returns true.
*/
- bool push(T item) {
+ bool push(T&& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
#include "utils/WorkQueue.h"
#include <gtest/gtest.h>
+#include <memory>
#include <mutex>
#include <thread>
#include <vector>
const int max = 100;
for (int i = 0; i < 10; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
std::thread thread([ &queue, max ] {
std::this_thread::yield();
for (int i = 10; i < max; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
queue.finish();
}
for (int i = 0; i < 50; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
queue.finish();
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
});
}
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
});
}
}
}
+TEST(WorkQueue, FailedPush) {
+ WorkQueue<std::unique_ptr<int>> queue;
+ std::unique_ptr<int> x(new int{5});
+ EXPECT_TRUE(queue.push(std::move(x)));
+ EXPECT_EQ(nullptr, x);
+ queue.finish();
+ x.reset(new int{6});
+ EXPECT_FALSE(queue.push(std::move(x)));
+ EXPECT_NE(nullptr, x);
+ EXPECT_EQ(6, *x);
+}
+
TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
{
BufferWorkQueue queue;