]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Add optional max size to work queue 332/head
authorNick Terrell <terrelln@fb.com>
Fri, 2 Sep 2016 20:53:23 +0000 (13:53 -0700)
committerNick Terrell <terrelln@fb.com>
Fri, 2 Sep 2016 20:53:23 +0000 (13:53 -0700)
contrib/pzstd/Makefile
contrib/pzstd/utils/WorkQueue.h
contrib/pzstd/utils/test/Makefile
contrib/pzstd/utils/test/WorkQueueTest.cpp

index 5338a5a9ed16923d4393d022d7ef0b00fa94e047..c59a6d107a76835315fdce41d4dc887a8686caf0 100644 (file)
@@ -70,5 +70,5 @@ clean:
        $(MAKE) -C $(ZSTDDIR) clean
        $(MAKE) -C utils/test clean
        $(MAKE) -C test clean
-       @$(RM) -rf googletest/ libzstd.a *.o pzstd$(EXT)
+       @$(RM) -rf libzstd.a *.o pzstd$(EXT)
        @echo Cleaning completed
index 3d926cc8013cecd8a4167aad90bb914f9e557527..2fa417f419fec09a58080029238e46e5136aebbd 100644 (file)
@@ -12,6 +12,7 @@
 
 #include <atomic>
 #include <cassert>
+#include <cstddef>
 #include <condition_variable>
 #include <cstddef>
 #include <functional>
@@ -25,14 +26,29 @@ template <typename T>
 class WorkQueue {
   // Protects all member variable access
   std::mutex mutex_;
-  std::condition_variable cv_;
+  std::condition_variable readerCv_;
+  std::condition_variable writerCv_;
 
   std::queue<T> queue_;
   bool done_;
+  std::size_t maxSize_;
+
+  // Must have lock to call this function
+  bool full() const {
+    if (maxSize_ == 0) {
+      return false;
+    }
+    return queue_.size() >= maxSize_;
+  }
 
  public:
-  /// Constructs an empty work queue.
-  WorkQueue() : done_(false) {}
+  /**
+   * Constructs an empty work queue with an optional max size.
+   * If `maxSize == 0` the queue size is unbounded.
+   *
+   * @param maxSize The maximum allowed size of the work queue.
+   */
+  WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
 
   /**
    * Push an item onto the work queue.  Notify a single thread that work is
@@ -44,13 +60,16 @@ class WorkQueue {
    */
   bool push(T item) {
     {
-      std::lock_guard<std::mutex> lock(mutex_);
+      std::unique_lock<std::mutex> lock(mutex_);
+      while (full() && !done_) {
+        writerCv_.wait(lock);
+      }
       if (done_) {
         return false;
       }
       queue_.push(std::move(item));
     }
-    cv_.notify_one();
+    readerCv_.notify_one();
     return true;
   }
 
@@ -64,16 +83,19 @@ class WorkQueue {
    *                    `finish()` has been called.
    */
   bool pop(T& item) {
-    std::unique_lock<std::mutex> lock(mutex_);
-    while (queue_.empty() && !done_) {
-      cv_.wait(lock);
-    }
-    if (queue_.empty()) {
-      assert(done_);
-      return false;
+    {
+      std::unique_lock<std::mutex> lock(mutex_);
+      while (queue_.empty() && !done_) {
+        readerCv_.wait(lock);
+      }
+      if (queue_.empty()) {
+        assert(done_);
+        return false;
+      }
+      item = std::move(queue_.front());
+      queue_.pop();
     }
-    item = std::move(queue_.front());
-    queue_.pop();
+    writerCv_.notify_one();
     return true;
   }
 
@@ -87,18 +109,19 @@ class WorkQueue {
       assert(!done_);
       done_ = true;
     }
-    cv_.notify_all();
+    readerCv_.notify_all();
+    writerCv_.notify_all();
   }
 
   /// Blocks until `finish()` has been called (but the queue may not be empty).
   void waitUntilFinished() {
     std::unique_lock<std::mutex> lock(mutex_);
     while (!done_) {
-      cv_.wait(lock);
+      readerCv_.wait(lock);
       // If we were woken by a push, we need to wake a thread waiting on pop().
       if (!done_) {
         lock.unlock();
-        cv_.notify_one();
+        readerCv_.notify_one();
         lock.lock();
       }
     }
@@ -111,7 +134,7 @@ class BufferWorkQueue {
   std::atomic<std::size_t> size_;
 
  public:
-  BufferWorkQueue() : size_(0) {}
+  BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
 
   void push(Buffer buffer) {
     size_.fetch_add(buffer.size());
index 23f111e55d2762c2b55b03380ac14a3ffbcfc762..b9ea73e32aed115a804a47c9100a5deef98a6b1b 100644 (file)
@@ -23,7 +23,7 @@ GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
 CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB)
 CXXFLAGS  ?= -O3
 CXXFLAGS  += -std=c++11
-CFLAGS  += $(MOREFLAGS)
+CXXFLAGS  += $(MOREFLAGS)
 FLAGS    = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
 
 %: %.cpp
index 1b548d1604875fc57511bbb92044ea36ac13c4a1..074891fda9bb0076d5b700aa7f06767e0142f354 100644 (file)
@@ -145,6 +145,71 @@ TEST(WorkQueue, MPMC) {
   }
 }
 
+TEST(WorkQueue, BoundedSizeWorks) {
+  WorkQueue<int> queue(1);
+  int result;
+  queue.push(5);
+  queue.pop(result);
+  queue.push(5);
+  queue.pop(result);
+  queue.push(5);
+  queue.finish();
+  queue.pop(result);
+  EXPECT_EQ(5, result);
+}
+
+TEST(WorkQueue, BoundedSizePushAfterFinish) {
+  WorkQueue<int> queue(1);
+  int result;
+  queue.push(5);
+  std::thread pusher([&queue] {
+    queue.push(6);
+  });
+  // Dirtily try and make sure that pusher has run.
+  std::this_thread::sleep_for(std::chrono::seconds(1));
+  queue.finish();
+  EXPECT_TRUE(queue.pop(result));
+  EXPECT_EQ(5, result);
+  EXPECT_FALSE(queue.pop(result));
+
+  pusher.join();
+}
+
+TEST(WorkQueue, BoundedSizeMPMC) {
+  WorkQueue<int> queue(100);
+  std::vector<int> results(10000, -1);
+  std::mutex mutex;
+  std::vector<std::thread> popperThreads;
+  for (int i = 0; i < 10; ++i) {
+    popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
+  }
+
+  std::vector<std::thread> pusherThreads;
+  for (int i = 0; i < 100; ++i) {
+    auto min = i * 100;
+    auto max = (i + 1) * 100;
+    pusherThreads.emplace_back(
+        [ &queue, min, max ] {
+          for (int i = min; i < max; ++i) {
+            queue.push(i);
+          }
+        });
+  }
+
+  for (auto& thread : pusherThreads) {
+    thread.join();
+  }
+  queue.finish();
+
+  for (auto& thread : popperThreads) {
+    thread.join();
+  }
+
+  for (int i = 0; i < 10000; ++i) {
+    EXPECT_EQ(i, results[i]);
+  }
+}
+
 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
   {
     BufferWorkQueue queue;