]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
introduced POOL_resize()
authorYann Collet <cyan@fb.com>
Tue, 19 Jun 2018 03:46:39 +0000 (20:46 -0700)
committerYann Collet <cyan@fb.com>
Tue, 19 Jun 2018 03:46:39 +0000 (20:46 -0700)
not complete yet :
finalize behavior in case of unfinished expansion

lib/common/pool.c
lib/common/pool.h

index 773488b072558119ddfd172d4dbd499db8e0d77a..6795f25eb9c1f2549304be640644c02deed819ce 100644 (file)
@@ -33,8 +33,9 @@ typedef struct POOL_job_s {
 struct POOL_ctx_s {
     ZSTD_customMem customMem;
     /* Keep track of the threads */
-    ZSTD_pthread_t *threads;
-    size_t numThreads;
+    ZSTD_pthread_t* threads;
+    size_t threadCapacity;
+    size_t threadLimit;
 
     /* The queue is a circular buffer */
     POOL_job *queue;
@@ -58,10 +59,10 @@ struct POOL_ctx_s {
 };
 
 /* POOL_thread() :
  Work thread for the thread pool.
  Waits for jobs and executes them.
  @returns : NULL on failure else non-null.
-*/
* Work thread for the thread pool.
* Waits for jobs and executes them.
* @returns : NULL on failure else non-null.
+ */
 static void* POOL_thread(void* opaque) {
     POOL_ctx* const ctx = (POOL_ctx*)opaque;
     if (!ctx) { return NULL; }
@@ -103,16 +104,17 @@ POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
 }
 
-POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
+POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
+                               ZSTD_customMem customMem) {
     POOL_ctx* ctx;
-    /* Check the parameters */
+    /* Check parameters */
     if (!numThreads) { return NULL; }
     /* Allocate the context and zero initialize */
     ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
     if (!ctx) { return NULL; }
     /* Initialize the job queue.
-     * It needs one extra space since one space is wasted to differentiate empty
-     * and full queues.
+     * It needs one extra space since one space is wasted to differentiate
+     * empty and full queues.
      */
     ctx->queueSize = queueSize + 1;
     ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
@@ -126,7 +128,7 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customM
     ctx->shutdown = 0;
     /* Allocate space for the thread handles */
     ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
-    ctx->numThreads = 0;
+    ctx->threadCapacity = 0;
     ctx->customMem = customMem;
     /* Check for errors */
     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
@@ -134,11 +136,12 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customM
     {   size_t i;
         for (i = 0; i < numThreads; ++i) {
             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
-                ctx->numThreads = i;
+                ctx->threadCapacity = i;
                 POOL_free(ctx);
                 return NULL;
         }   }
-        ctx->numThreads = numThreads;
+        ctx->threadCapacity = numThreads;
+        ctx->threadLimit = numThreads;
     }
     return ctx;
 }
@@ -156,8 +159,8 @@ static void POOL_join(POOL_ctx* ctx) {
     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
     /* Join all of the threads */
     {   size_t i;
-        for (i = 0; i < ctx->numThreads; ++i) {
-            ZSTD_pthread_join(ctx->threads[i], NULL);
+        for (i = 0; i < ctx->threadCapacity; ++i) {
+            ZSTD_pthread_join(ctx->threads[i], NULL);  /* note : could fail */
     }   }
 }
 
@@ -172,24 +175,72 @@ void POOL_free(POOL_ctx *ctx) {
     ZSTD_free(ctx, ctx->customMem);
 }
 
+
+
 size_t POOL_sizeof(POOL_ctx *ctx) {
     if (ctx==NULL) return 0;  /* supports sizeof NULL */
     return sizeof(*ctx)
         + ctx->queueSize * sizeof(POOL_job)
-        + ctx->numThreads * sizeof(ZSTD_pthread_t);
+        + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
+}
+
+
+/* note : only works if no job is running !
+ * return : 1 on success, 0 on failure */
+static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
+{
+    if (ctx->numThreadsBusy > 0) return 0;
+    if (numThreads <= ctx->threadCapacity) {
+        ctx->threadLimit = numThreads;
+        return 1;
+    }
+    /* numThreads > threadCapacity */
+    {   ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
+        if (!threadPool) return 0;
+        /* Initialize additional threads */
+        {   size_t threadId;
+            for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
+                if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
+                    break;
+            }   }
+            if (threadId != numThreads) {  /* not all threads successfully init */
+                /* how to destroy existing threads ? */
+                /* POOL_join destroy all existing threads, not just newly created ones */
+                return 0;
+            }
+        }
+        /* replace existing thread pool */
+        memcpy(threadPool, ctx->threads, ctx->threadCapacity);
+        ZSTD_free(ctx->threads, ctx->customMem);
+        ctx->threads = threadPool;
+    }
+    ctx->threadCapacity = numThreads;
+    ctx->threadLimit = numThreads;
+    return 1;
+}
+
+/* return : 1 on success, 0 on failure */
+int POOL_resize(POOL_ctx* ctx, size_t numThreads)
+{
+    int result;
+    if (!ctx) return 0;
+    ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+    result = POOL_resize_internal(ctx, numThreads);
+    ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+    return result;
 }
 
 /**
  * Returns 1 if the queue is full and 0 otherwise.
  *
- * If the queueSize is 1 (the pool was created with an intended queueSize of 0),
- * then a queue is empty if there is a thread free and no job is waiting.
+ * When queueSize is 1 (pool was created with an intended queueSize of 0),
+ * then a queue is empty if there is a thread free _and_ no job is waiting.
  */
 static int isQueueFull(POOL_ctx const* ctx) {
     if (ctx->queueSize > 1) {
         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
     } else {
-        return ctx->numThreadsBusy == ctx->numThreads ||
+        return ctx->numThreadsBusy == ctx->threadLimit ||
                !ctx->queueEmpty;
     }
 }
index a57e9b4fabc2dc6d4bb818a1d3696372b9083680..dba28859c483b3010ad45ee75eccc4285e406807 100644 (file)
@@ -30,40 +30,39 @@ typedef struct POOL_ctx_s POOL_ctx;
 */
 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize);
 
-POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem);
+POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
+                               ZSTD_customMem customMem);
 
 /*! POOL_free() :
   Free a thread pool returned by POOL_create().
-*/
*  Free a thread pool returned by POOL_create().
+ */
 void POOL_free(POOL_ctx* ctx);
 
 /*! POOL_sizeof() :
-    return memory usage of pool returned by POOL_create().
-*/
+ * @return threadpool memory usage
+ *  note : compatible with NULL (returns 0 in this case)
+ */
 size_t POOL_sizeof(POOL_ctx* ctx);
 
 /*! POOL_function :
   The function type that can be added to a thread pool.
-*/
*  The function type that can be added to a thread pool.
+ */
 typedef void (*POOL_function)(void*);
-/*! POOL_add_function :
-    The function type for a generic thread pool add function.
-*/
-typedef void (*POOL_add_function)(void*, POOL_function, void*);
 
 /*! POOL_add() :
-    Add the job `function(opaque)` to the thread pool. `ctx` must be valid.
-    Possibly blocks until there is room in the queue.
-    Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
-*/
+ *  Add the job `function(opaque)` to the thread pool. `ctx` must be valid.
+ *  Possibly blocks until there is room in the queue.
+ *  Note : The function may be executed asynchronously,
+ *         therefore, `opaque` must live until function has been completed.
+ */
 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);
 
 
 /*! POOL_tryAdd() :
   Add the job `function(opaque)` to the thread pool if a worker is available.
   return immediately otherwise.
  @return : 1 if successful, 0 if not.
-*/
*  Add the job `function(opaque)` to thread pool _if_ a worker is available.
*  Returns immediately even if not (does not block).
* @return : 1 if successful, 0 if not.
+ */
 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque);