#include <atomic>
#include <cassert>
+#include <cstddef>
#include <condition_variable>
#include <cstddef>
#include <functional>
class WorkQueue {
// Protects all member variable access
std::mutex mutex_;
- std::condition_variable cv_;
+ std::condition_variable readerCv_;
+ std::condition_variable writerCv_;
std::queue<T> queue_;
bool done_;
+ std::size_t maxSize_;
+
+ // Must have lock to call this function
+ bool full() const {
+ if (maxSize_ == 0) {
+ return false;
+ }
+ return queue_.size() >= maxSize_;
+ }
public:
- /// Constructs an empty work queue.
- WorkQueue() : done_(false) {}
+ /**
+ * Constructs an empty work queue with an optional max size.
+ * If `maxSize == 0` the queue size is unbounded.
+ *
+ * @param maxSize The maximum allowed size of the work queue.
+ */
+ WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
/**
* Push an item onto the work queue. Notify a single thread that work is
*/
bool push(T item) {
{
- std::lock_guard<std::mutex> lock(mutex_);
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (full() && !done_) {
+ writerCv_.wait(lock);
+ }
if (done_) {
return false;
}
queue_.push(std::move(item));
}
- cv_.notify_one();
+ readerCv_.notify_one();
return true;
}
* `finish()` has been called.
*/
bool pop(T& item) {
- std::unique_lock<std::mutex> lock(mutex_);
- while (queue_.empty() && !done_) {
- cv_.wait(lock);
- }
- if (queue_.empty()) {
- assert(done_);
- return false;
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (queue_.empty() && !done_) {
+ readerCv_.wait(lock);
+ }
+ if (queue_.empty()) {
+ assert(done_);
+ return false;
+ }
+ item = std::move(queue_.front());
+ queue_.pop();
}
- item = std::move(queue_.front());
- queue_.pop();
+ writerCv_.notify_one();
return true;
}
assert(!done_);
done_ = true;
}
- cv_.notify_all();
+ readerCv_.notify_all();
+ writerCv_.notify_all();
}
/// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
- cv_.wait(lock);
+ readerCv_.wait(lock);
// If we were woken by a push, we need to wake a thread waiting on pop().
if (!done_) {
lock.unlock();
- cv_.notify_one();
+ readerCv_.notify_one();
lock.lock();
}
}
std::atomic<std::size_t> size_;
public:
- BufferWorkQueue() : size_(0) {}
+ BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
void push(Buffer buffer) {
size_.fetch_add(buffer.size());
}
}
+TEST(WorkQueue, BoundedSizeWorks) {
+ WorkQueue<int> queue(1);
+ int result;
+ queue.push(5);
+ queue.pop(result);
+ queue.push(5);
+ queue.pop(result);
+ queue.push(5);
+ queue.finish();
+ queue.pop(result);
+ EXPECT_EQ(5, result);
+}
+
+TEST(WorkQueue, BoundedSizePushAfterFinish) {
+ WorkQueue<int> queue(1);
+ int result;
+ queue.push(5);
+ std::thread pusher([&queue] {
+ queue.push(6);
+ });
+ // Dirtily try and make sure that pusher has run.
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ queue.finish();
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(5, result);
+ EXPECT_FALSE(queue.pop(result));
+
+ pusher.join();
+}
+
+TEST(WorkQueue, BoundedSizeMPMC) {
+ WorkQueue<int> queue(100);
+ std::vector<int> results(10000, -1);
+ std::mutex mutex;
+ std::vector<std::thread> popperThreads;
+ for (int i = 0; i < 10; ++i) {
+ popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
+ }
+
+ std::vector<std::thread> pusherThreads;
+ for (int i = 0; i < 100; ++i) {
+ auto min = i * 100;
+ auto max = (i + 1) * 100;
+ pusherThreads.emplace_back(
+ [ &queue, min, max ] {
+ for (int i = min; i < max; ++i) {
+ queue.push(i);
+ }
+ });
+ }
+
+ for (auto& thread : pusherThreads) {
+ thread.join();
+ }
+ queue.finish();
+
+ for (auto& thread : popperThreads) {
+ thread.join();
+ }
+
+ for (int i = 0; i < 10000; ++i) {
+ EXPECT_EQ(i, results[i]);
+ }
+}
+
TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
{
BufferWorkQueue queue;