]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[pzstd] Fix lantent bug in WorkQueue::push()
authorNick Terrell <terrelln@fb.com>
Thu, 13 Oct 2016 19:03:02 +0000 (12:03 -0700)
committerNick Terrell <terrelln@fb.com>
Fri, 14 Oct 2016 22:26:56 +0000 (15:26 -0700)
contrib/pzstd/utils/ThreadPool.h
contrib/pzstd/utils/WorkQueue.h
contrib/pzstd/utils/test/WorkQueueTest.cpp

index a1d1fc0b9e34de2b4c46e93adca4a82fc35aefa8..99b3ecfa51ae29447a056b8974d6fcb88e7839a3 100644 (file)
@@ -27,7 +27,7 @@ class ThreadPool {
   explicit ThreadPool(std::size_t numThreads) {
     threads_.reserve(numThreads);
     for (std::size_t i = 0; i < numThreads; ++i) {
-      threads_.emplace_back([&] {
+      threads_.emplace_back([this] {
         std::function<void()> task;
         while (tasks_.pop(task)) {
           task();
index c46e6cbcf2749200d771cf1e3b57e353f27b1b94..780e5360ffb1f2954c8d28f73a9c47846dc09a7c 100644 (file)
@@ -54,12 +54,13 @@ class WorkQueue {
   /**
    * Push an item onto the work queue.  Notify a single thread that work is
    * available.  If `finish()` has been called, do nothing and return false.
+   * If `push()` returns false, then `item` has not been moved from.
    *
    * @param item  Item to push onto the queue.
    * @returns     True upon success, false if `finish()` has been called.  An
    *               item was pushed iff `push()` returns true.
    */
-  bool push(T item) {
+  bool push(T&& item) {
     {
       std::unique_lock<std::mutex> lock(mutex_);
       while (full() && !done_) {
index ebf375a84e265b16c15d41cac357707366220f4a..7f58ccb3f1997187e226ff8506234abfccf42525 100644 (file)
@@ -10,6 +10,7 @@
 #include "utils/WorkQueue.h"
 
 #include <gtest/gtest.h>
+#include <memory>
 #include <mutex>
 #include <thread>
 #include <vector>
@@ -64,7 +65,7 @@ TEST(WorkQueue, SPSC) {
   const int max = 100;
 
   for (int i = 0; i < 10; ++i) {
-    queue.push(i);
+    queue.push(int{i});
   }
 
   std::thread thread([ &queue, max ] {
@@ -80,7 +81,7 @@ TEST(WorkQueue, SPSC) {
 
   std::this_thread::yield();
   for (int i = 10; i < max; ++i) {
-    queue.push(i);
+    queue.push(int{i});
   }
   queue.finish();
 
@@ -97,7 +98,7 @@ TEST(WorkQueue, SPMC) {
   }
 
   for (int i = 0; i < 50; ++i) {
-    queue.push(i);
+    queue.push(int{i});
   }
   queue.finish();
 
@@ -126,7 +127,7 @@ TEST(WorkQueue, MPMC) {
     pusherThreads.emplace_back(
         [ &queue, min, max ] {
           for (int i = min; i < max; ++i) {
-            queue.push(i);
+            queue.push(int{i});
           }
         });
   }
@@ -212,7 +213,7 @@ TEST(WorkQueue, BoundedSizeMPMC) {
     pusherThreads.emplace_back(
         [ &queue, min, max ] {
           for (int i = min; i < max; ++i) {
-            queue.push(i);
+            queue.push(int{i});
           }
         });
   }
@@ -231,6 +232,18 @@ TEST(WorkQueue, BoundedSizeMPMC) {
   }
 }
 
+TEST(WorkQueue, FailedPush) {
+  WorkQueue<std::unique_ptr<int>> queue;
+  std::unique_ptr<int> x(new int{5});
+  EXPECT_TRUE(queue.push(std::move(x)));
+  EXPECT_EQ(nullptr, x);
+  queue.finish();
+  x.reset(new int{6});
+  EXPECT_FALSE(queue.push(std::move(x)));
+  EXPECT_NE(nullptr, x);
+  EXPECT_EQ(6, *x);
+}
+
 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
   {
     BufferWorkQueue queue;