]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added extended POOL test
authorYann Collet <cyan@fb.com>
Thu, 21 Jun 2018 21:58:59 +0000 (14:58 -0700)
committerYann Collet <cyan@fb.com>
Thu, 21 Jun 2018 21:58:59 +0000 (14:58 -0700)
abrupt end + downsizing with running jobs remaining in queue.

also : POOL_resize() requires numThreads >= 1

lib/common/pool.c
lib/common/pool.h
tests/poolTests.c

index d08b1de79e2630104c4c5e63b9ae65a3a1d919ef..bd31f032ff3323f123404fa7dde2562bba988594 100644 (file)
@@ -73,10 +73,13 @@ static void* POOL_thread(void* opaque) {
 
         while ( ctx->queueEmpty
             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
-             if (ctx->shutdown) {
-                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
-                 return opaque;
-             }
+            if (ctx->shutdown) {
+                /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
+                 * a few threads will be shutdown while !queueEmpty,
+                 * but enough threads will remain active to finish the queue */
+                ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+                return opaque;
+            }
             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
         }
         /* Pop a job off the queue */
@@ -99,7 +102,7 @@ static void* POOL_thread(void* opaque) {
             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
         }
     }  /* for (;;) */
-    assert(0); /* Unreachable */
+    assert(0);  /* Unreachable */
 }
 
 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
@@ -187,10 +190,12 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
 }
 
 
-/* note : only works if no job is running ! */
+/* @return : a working pool on success, NULL on failure
+ *    note : starting context is considered consumed. */
 static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
 {
     if (numThreads <= ctx->threadCapacity) {
+        if (!numThreads) return NULL;
         ctx->threadLimit = numThreads;
         return ctx;
     }
index 00ea76631f3c60d0144cf810b415118398d75898..caf514907b04874d5c2ac5ba6d2cfffffe4dec21 100644 (file)
@@ -45,6 +45,7 @@ void POOL_free(POOL_ctx* ctx);
  *    note : new pool context might have same address as original one, but it's not guaranteed.
  *           consider starting context as consumed, only rely on returned one.
  *    note 2 : only numThreads can be resized, queueSize is unchanged.
+ *    note 3 : `numThreads` must be at least 1
  */
 POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads);
 
index d7ff70ac9720cea3f620078844bc24e2d1282aab..d5768967a2a2a0567fa76ab272e904ab131a8dd3 100644 (file)
@@ -169,7 +169,7 @@ typedef struct {
 
 void waitIncFn(void *opaque) {
   abruptEndCanary_t* test = (abruptEndCanary_t*) opaque;
-  UTIL_sleepMilli(1);
+  UTIL_sleepMilli(10);
   ZSTD_pthread_mutex_lock(&test->mut);
   test->val = test->val + 1;
   ZSTD_pthread_mutex_unlock(&test->mut);
@@ -179,14 +179,15 @@ static int testAbruptEnding_internal(abruptEndCanary_t test)
 {
     int const nbWaits = 16;
 
-    POOL_ctx* const ctx = POOL_create(2 /*numThreads*/, nbWaits /*queueSize*/);
+    POOL_ctx* ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
     ASSERT_TRUE(ctx);
     test.val = 0;
 
     {   int i;
         for (i=0; i<nbWaits; i++)
-            POOL_add(ctx, &waitLongFn, &test);   /* all jobs either processed on in the queue */
+            POOL_add(ctx, &waitIncFn, &test);  /* all jobs pushed into queue */
     }
+    ctx = POOL_resize(ctx, 1 /*numThreads*/);   /* downsize numThreads, to try to break end condition */
 
     POOL_free(ctx);  /* must finish all jobs in queue before giving back control */
     ASSERT_EQ(test.val, nbWaits);