]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fixed: bug when counting nb of active threads
authorYann Collet <cyan@fb.com>
Thu, 21 Jun 2018 00:18:57 +0000 (17:18 -0700)
committerYann Collet <cyan@fb.com>
Thu, 21 Jun 2018 01:28:49 +0000 (18:28 -0700)
when queueSize > 1

also : added a test in testpool.c
       verifying resizing is effective.

lib/common/pool.c
tests/poolTests.c

index ca5a38ee5fc7deba8fd2444d983ebc7ee6d26edd..d08b1de79e2630104c4c5e63b9ae65a3a1d919ef 100644 (file)
@@ -91,12 +91,13 @@ static void* POOL_thread(void* opaque) {
             job.function(job.opaque);
 
             /* If the intended queue size was 0, signal after finishing job */
+            ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+            ctx->numThreadsBusy--;
             if (ctx->queueSize == 1) {
-                ZSTD_pthread_mutex_lock(&ctx->queueMutex);
-                ctx->numThreadsBusy--;
-                ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
-        }   }
+            }
+            ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+        }
     }  /* for (;;) */
     assert(0); /* Unreachable */
 }
index 00ee830154c9981e0516a64f4fd4c52645f1479f..23061b568257e5eba1da73a907d93d23d76506c6 100644 (file)
 #include <stddef.h>
 #include <stdio.h>
 
-#define ASSERT_TRUE(p)                                                         \
-  do {                                                                         \
-    if (!(p)) {                                                                \
-      return 1;                                                                \
-    }                                                                          \
+#define ASSERT_TRUE(p)                                                       \
+  do {                                                                       \
+    if (!(p)) {                                                              \
+      return 1;                                                              \
+    }                                                                        \
   } while (0)
 #define ASSERT_FALSE(p) ASSERT_TRUE(!(p))
 #define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs))
@@ -32,10 +32,10 @@ struct data {
 
 void fn(void *opaque) {
   struct data *data = (struct data *)opaque;
-  pthread_mutex_lock(&data->mutex);
+  ZSTD_pthread_mutex_lock(&data->mutex);
   data->data[data->i] = data->i;
   ++data->i;
-  pthread_mutex_unlock(&data->mutex);
+  ZSTD_pthread_mutex_unlock(&data->mutex);
 }
 
 int testOrder(size_t numThreads, size_t queueSize) {
@@ -43,25 +43,26 @@ int testOrder(size_t numThreads, size_t queueSize) {
   POOL_ctx *ctx = POOL_create(numThreads, queueSize);
   ASSERT_TRUE(ctx);
   data.i = 0;
-  pthread_mutex_init(&data.mutex, NULL);
-  {
-    size_t i;
+  ZSTD_pthread_mutex_init(&data.mutex, NULL);
+  { size_t i;
     for (i = 0; i < 16; ++i) {
       POOL_add(ctx, &fn, &data);
     }
   }
   POOL_free(ctx);
   ASSERT_EQ(16, data.i);
-  {
-    size_t i;
+  { size_t i;
     for (i = 0; i < data.i; ++i) {
       ASSERT_EQ(i, data.data[i]);
     }
   }
-  pthread_mutex_destroy(&data.mutex);
+  ZSTD_pthread_mutex_destroy(&data.mutex);
   return 0;
 }
 
+
+/* --- test deadlocks --- */
+
 void waitFn(void *opaque) {
   (void)opaque;
   UTIL_sleepMilli(1);
@@ -72,8 +73,7 @@ int testWait(size_t numThreads, size_t queueSize) {
   struct data data;
   POOL_ctx *ctx = POOL_create(numThreads, queueSize);
   ASSERT_TRUE(ctx);
-  {
-    size_t i;
+  { size_t i;
     for (i = 0; i < 16; ++i) {
         POOL_add(ctx, &waitFn, &data);
     }
@@ -82,25 +82,115 @@ int testWait(size_t numThreads, size_t queueSize) {
   return 0;
 }
 
+
+/* --- test POOL_resize() --- */
+
+typedef struct {
+    ZSTD_pthread_mutex_t mut;
+    int val;
+    int max;
+    ZSTD_pthread_cond_t cond;
+} test_t;
+
+void waitLongFn(void *opaque) {
+  test_t* test = (test_t*) opaque;
+  UTIL_sleepMilli(10);
+  ZSTD_pthread_mutex_lock(&test->mut);
+  test->val = test->val + 1;
+  if (test->val == test->max)
+    ZSTD_pthread_cond_signal(&test->cond);
+  ZSTD_pthread_mutex_unlock(&test->mut);
+}
+
+static int testThreadReduction_internal(POOL_ctx* ctx, test_t test)
+{
+    int const nbWaits = 16;
+    UTIL_time_t startTime, time4threads, time2threads;
+
+    test.val = 0;
+    test.max = nbWaits;
+
+    startTime = UTIL_getTime();
+    {   int i;
+        for (i=0; i<nbWaits; i++)
+            POOL_add(ctx, &waitLongFn, &test);
+    }
+    ZSTD_pthread_mutex_lock(&test.mut);
+    ZSTD_pthread_cond_wait(&test.cond, &test.mut);
+    ASSERT_TRUE(test.val == nbWaits);
+    ZSTD_pthread_mutex_unlock(&test.mut);
+    time4threads = UTIL_clockSpanNano(startTime);
+
+    ctx = POOL_resize(ctx, 2/*nbThreads*/);
+    ASSERT_TRUE(ctx);
+    test.val = 0;
+    startTime = UTIL_getTime();
+    {   int i;
+        for (i=0; i<nbWaits; i++)
+            POOL_add(ctx, &waitLongFn, &test);
+    }
+    ZSTD_pthread_mutex_lock(&test.mut);
+    ZSTD_pthread_cond_wait(&test.cond, &test.mut);
+    ASSERT_TRUE(test.val == nbWaits);
+    ZSTD_pthread_mutex_unlock(&test.mut);
+    time2threads = UTIL_clockSpanNano(startTime);
+
+    if (time4threads >= time2threads) return 1;   /* check 4 threads were effectively faster than 2 */
+    return 0;
+}
+
+static int testThreadReduction(void) {
+    int result;
+    test_t test;
+    POOL_ctx* ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/);
+
+    ASSERT_TRUE(ctx);
+
+    memset(&test, 0, sizeof(test));
+    ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
+    ASSERT_FALSE( ZSTD_pthread_cond_init(&test.cond, NULL) );
+
+    result = testThreadReduction_internal(ctx, test);
+
+    ZSTD_pthread_mutex_destroy(&test.mut);
+    ZSTD_pthread_cond_destroy(&test.cond);
+    POOL_free(ctx);
+    return result;
+}
+
+
+/* --- test launcher --- */
+
 int main(int argc, const char **argv) {
   size_t numThreads;
+  (void)argc;
+  (void)argv;
+
+  if (POOL_create(0, 1)) {   /* should not be possible */
+    printf("FAIL: should not create POOL with 0 threads\n");
+    return 1;
+  }
+
   for (numThreads = 1; numThreads <= 4; ++numThreads) {
     size_t queueSize;
     for (queueSize = 0; queueSize <= 2; ++queueSize) {
+      printf("queueSize==%u, numThreads=%u \n",
+            (unsigned)queueSize, (unsigned)numThreads);
       if (testOrder(numThreads, queueSize)) {
         printf("FAIL: testOrder\n");
         return 1;
       }
+      printf("SUCCESS: testOrder\n");
       if (testWait(numThreads, queueSize)) {
         printf("FAIL: testWait\n");
         return 1;
       }
+      printf("SUCCESS: testWait\n");
     }
   }
-  printf("PASS: testOrder\n");
-  (void)argc;
-  (void)argv;
-  return (POOL_create(0, 1)) ? printf("FAIL: testInvalid\n"), 1
-                             : printf("PASS: testInvalid\n"), 0;
+
+  if (testThreadReduction()) return 1;
+  printf("PASS: all POOL tests\n");
+
   return 0;
 }