]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
buffer pool can be invoked from multiple threads
authorYann Collet <cyan@fb.com>
Tue, 11 Jul 2017 21:14:07 +0000 (14:14 -0700)
committerYann Collet <cyan@fb.com>
Tue, 11 Jul 2017 21:14:07 +0000 (14:14 -0700)
lib/common/pool.c
lib/common/pool.h
lib/compress/zstdmt_compress.c

index 749fa4f2f7b41e5356a76a6a51fc713306d15409..06d8a5f57db736a0fb2d6695f7f54f71e72de3ff 100644 (file)
@@ -92,7 +92,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
      * and full queues.
      */
     ctx->queueSize = queueSize + 1;
-    ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
+    ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job));
     ctx->queueHead = 0;
     ctx->queueTail = 0;
     pthread_mutex_init(&ctx->queueMutex, NULL);
@@ -100,7 +100,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
     pthread_cond_init(&ctx->queuePopCond, NULL);
     ctx->shutdown = 0;
     /* Allocate space for the thread handles */
-    ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
+    ctx->threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t));
     ctx->numThreads = 0;
     /* Check for errors */
     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
@@ -153,8 +153,8 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
         + ctx->numThreads * sizeof(pthread_t);
 }
 
-void POOL_add(void *ctxVoid, POOL_function function, void *opaque) {
-    POOL_ctx *ctx = (POOL_ctx *)ctxVoid;
+void POOL_add(voidctxVoid, POOL_function function, void *opaque) {
+    POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
     if (!ctx) { return; }
 
     pthread_mutex_lock(&ctx->queueMutex);
@@ -183,22 +183,22 @@ struct POOL_ctx_s {
   int data;
 };
 
-POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
+POOL_ctxPOOL_create(size_t numThreads, size_t queueSize) {
   (void)numThreads;
   (void)queueSize;
-  return (POOL_ctx *)malloc(sizeof(POOL_ctx));
+  return (POOL_ctx*)malloc(sizeof(POOL_ctx));
 }
 
-void POOL_free(POOL_ctx *ctx) {
-  if (ctx) free(ctx);
+void POOL_free(POOL_ctxctx) {
+    free(ctx);
 }
 
-void POOL_add(void *ctx, POOL_function function, void *opaque) {
+void POOL_add(void* ctx, POOL_function function, void* opaque) {
   (void)ctx;
   function(opaque);
 }
 
-size_t POOL_sizeof(POOL_ctx *ctx) {
+size_t POOL_sizeof(POOL_ctxctx) {
     if (ctx==NULL) return 0;  /* supports sizeof NULL */
     return sizeof(*ctx);
 }
index 386cd674b7c067c3b7c6e6b04b6525c906e09712..957100f4625f597aba78bf7579e8e5f5cceacee1 100644 (file)
@@ -19,11 +19,11 @@ extern "C" {
 typedef struct POOL_ctx_s POOL_ctx;
 
 /*! POOL_create() :
   Create a thread pool with at most `numThreads` threads.
   `numThreads` must be at least 1.
   The maximum number of queued jobs before blocking is `queueSize`.
   `queueSize` must be at least 1.
   @return : The POOL_ctx pointer on success else NULL.
*  Create a thread pool with at most `numThreads` threads.
* `numThreads` must be at least 1.
*  The maximum number of queued jobs before blocking is `queueSize`.
* `queueSize` must be at least 1.
* @return : POOL_ctx pointer on success, else NULL.
 */
 POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);
 
index 255b9c7edbb0322dd5d8f496011a9f904d275d0e..c4547c81b6b618e4a6578dc6e160a72e79431946 100644 (file)
@@ -73,6 +73,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
 
 
 /* =====   Buffer Pool   ===== */
+/* a single Buffer Pool can be invoked from multiple threads in parallel */
 
 typedef struct buffer_s {
     void* start;
@@ -82,6 +83,7 @@ typedef struct buffer_s {
 static const buffer_t g_nullBuffer = { NULL, 0 };
 
 typedef struct ZSTDMT_bufferPool_s {
+    pthread_mutex_t poolMutex;
     unsigned totalBuffers;
     unsigned nbBuffers;
     ZSTD_customMem cMem;
@@ -94,6 +96,7 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_custo
     ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
         sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
     if (bufPool==NULL) return NULL;
+    pthread_mutex_init(&bufPool->poolMutex, NULL);
     bufPool->totalBuffers = maxNbBuffers;
     bufPool->nbBuffers = 0;
     bufPool->cMem = cMem;
@@ -106,6 +109,7 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
     if (!bufPool) return;   /* compatibility with free on NULL */
     for (u=0; u<bufPool->totalBuffers; u++)
         ZSTD_free(bufPool->bTable[u].start, bufPool->cMem);
+    pthread_mutex_destroy(&bufPool->poolMutex);
     ZSTD_free(bufPool, bufPool->cMem);
 }
 
@@ -116,31 +120,37 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
                             + (bufPool->totalBuffers - 1) * sizeof(buffer_t);
     unsigned u;
     size_t totalBufferSize = 0;
+    pthread_mutex_lock(&bufPool->poolMutex);
     for (u=0; u<bufPool->totalBuffers; u++)
         totalBufferSize += bufPool->bTable[u].size;
+    pthread_mutex_unlock(&bufPool->poolMutex);
 
     return poolSize + totalBufferSize;
 }
 
 /** ZSTDMT_getBuffer() :
  *  assumption : invocation from main thread only ! */
-static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
+static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize)
 {
     DEBUGLOG(2, "ZSTDMT_getBuffer");
-    if (pool->nbBuffers) {   /* try to use an existing buffer */
-        buffer_t const buf = pool->bTable[--(pool->nbBuffers)];
+    pthread_mutex_lock(&bufPool->poolMutex);
+    if (bufPool->nbBuffers) {   /* try to use an existing buffer */
+        buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];
         size_t const availBufferSize = buf.size;
-        if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize))
+        if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) {
             /* large enough, but not too much */
+            pthread_mutex_unlock(&bufPool->poolMutex);
             return buf;
+        }
         /* size conditions not respected : scratch this buffer, create new one */
         DEBUGLOG(2, "existing buffer does not meet size conditions => freeing");
-        ZSTD_free(buf.start, pool->cMem);
+        ZSTD_free(buf.start, bufPool->cMem);
     }
+    pthread_mutex_unlock(&bufPool->poolMutex);
     /* create new buffer */
     DEBUGLOG(2, "create a new buffer");
     {   buffer_t buffer;
-        void* const start = ZSTD_malloc(bSize, pool->cMem);
+        void* const start = ZSTD_malloc(bSize, bufPool->cMem);
         if (start==NULL) bSize = 0;
         buffer.start = start;   /* note : start can be NULL if malloc fails ! */
         buffer.size = bSize;
@@ -149,23 +159,25 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
 }
 
 /* store buffer for later re-use, up to pool capacity */
-static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
+static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
 {
     DEBUGLOG(2, "ZSTDMT_releaseBuffer");
     if (buf.start == NULL) return;   /* release on NULL */
-    if (pool->nbBuffers < pool->totalBuffers) {
-        pool->bTable[pool->nbBuffers++] = buf;   /* store for later re-use */
+    pthread_mutex_lock(&bufPool->poolMutex);
+    if (bufPool->nbBuffers < bufPool->totalBuffers) {
+        bufPool->bTable[bufPool->nbBuffers++] = buf;   /* store for later re-use */
+        pthread_mutex_unlock(&bufPool->poolMutex);
         return;
     }
+    pthread_mutex_unlock(&bufPool->poolMutex);
     /* Reached bufferPool capacity (should not happen) */
     DEBUGLOG(2, "buffer pool capacity reached => freeing ");
-    ZSTD_free(buf.start, pool->cMem);
+    ZSTD_free(buf.start, bufPool->cMem);
 }
 
 
 /* =====   CCtx Pool   ===== */
-
-/* a single cctxPool can be called from multiple threads in parallel */
+/* a single CCtx Pool can be invoked from multiple threads in parallel */
 
 typedef struct {
     pthread_mutex_t poolMutex;
@@ -314,7 +326,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
     job->cSize = (job->lastChunk) ?
                  ZSTD_compressEnd     (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
                  ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
-    DEBUGLOG(5, "compressed %u bytes into %u bytes   (first:%u) (last:%u)",
+    DEBUGLOG(2, "compressed %u bytes into %u bytes   (first:%u) (last:%u)",
                 (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
     DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));