]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
simplified Buffer Pool
authorYann Collet <cyan@fb.com>
Sat, 31 Dec 2016 13:45:33 +0000 (14:45 +0100)
committerYann Collet <cyan@fb.com>
Sat, 31 Dec 2016 13:45:33 +0000 (14:45 +0100)
lib/compress/zstdmt_compress.c

index 1b925914a47890bf7dfe452c51683b6aeecebfa5..97de6e64539dedd26b6c3e6fc5ad5b6e159c4305 100644 (file)
@@ -42,51 +42,65 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
 
 
 #define ZSTDMT_NBTHREADS_MAX 128
-#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
+
+/* ===   Buffer Pool   === */
 
 typedef struct buffer_s {
     void* start;
     size_t size;
 } buffer_t;
 
-#define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX
 typedef struct ZSTDMT_bufferPool_s {
-    pthread_mutex_t bufferPool_mutex;
-    buffer_t bTable[ZSTDMT_NBBUFFERSPOOLED_MAX];
+    unsigned totalBuffers;;
     unsigned nbBuffers;
+    buffer_t bTable[1];   /* variable size */
 } ZSTDMT_bufferPool;
 
+static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads)
+{
+    unsigned const maxNbBuffers = 2*nbThreads + 2;
+    ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)calloc(1, sizeof(ZSTDMT_bufferPool) + maxNbBuffers * sizeof(buffer_t));
+    if (bufPool==NULL) return NULL;
+    bufPool->totalBuffers = maxNbBuffers;
+    return bufPool;
+}
+
+static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
+{
+    unsigned u;
+    if (!bufPool) return;   /* compatibility with free on NULL */
+    for (u=0; u<bufPool->totalBuffers; u++)
+        free(bufPool->bTable[u].start);
+    free(bufPool);
+}
+
+/* note : invocation only from main thread ! */
 static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
 {
-    PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
     if (pool->nbBuffers) {   /* try to use an existing buffer */
         pool->nbBuffers--;
         buffer_t const buf = pool->bTable[pool->nbBuffers];
-        pthread_mutex_unlock(&pool->bufferPool_mutex);
         size_t const availBufferSize = buf.size;
         if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize))   /* large enough, but not too much */
             return buf;
         free(buf.start);   /* size conditions not respected : create a new buffer */
     }
-    pthread_mutex_unlock(&pool->bufferPool_mutex);
     /* create new buffer */
     buffer_t buf;
     buf.size = bSize;
-    buf.start = calloc(1, bSize);
+    buf.start = malloc(bSize);
     return buf;
 }
 
 /* effectively store buffer for later re-use, up to pool capacity */
 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
 {
-    PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
-    if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
-        pthread_mutex_unlock(&pool->bufferPool_mutex);
-        free(buf.start);
+    if (pool->nbBuffers < pool->totalBuffers) {
+        pool->bTable[pool->nbBuffers++] = buf;   /* store for later re-use */
         return;
     }
-    pool->bTable[pool->nbBuffers++] = buf;   /* store for later re-use */
-    pthread_mutex_unlock(&pool->bufferPool_mutex);
+    /* Reached bufferPool capacity (should not happen) */
+    free(buf.start);
 }
 
 
@@ -118,7 +132,7 @@ void ZSTDMT_compressFrame(void* jobDescription)
 }
 
 
-/* note : calls to CCtxPool only from main thread */
+/* ===   CCtx Pool   === */
 
 typedef struct {
     unsigned totalCCtx;
@@ -126,6 +140,8 @@ typedef struct {
     ZSTD_CCtx* cctx[1];   /* variable size */
 } ZSTDMT_CCtxPool;
 
+/* note : CCtxPool invocation only from main thread */
+
 static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads)
 {
     ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + nbThreads*sizeof(ZSTD_CCtx*));
@@ -168,7 +184,7 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
 
 struct ZSTDMT_CCtx_s {
     POOL_ctx* factory;
-    ZSTDMT_bufferPool buffPool;
+    ZSTDMT_bufferPool* buffPool;
     ZSTDMT_CCtxPool* cctxPool;
     unsigned nbThreads;
     pthread_mutex_t jobCompleted_mutex;
@@ -182,7 +198,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
     if (!cctx) return NULL;
     cctx->nbThreads = nbThreads;
     cctx->factory = POOL_create(nbThreads, 1);
-    pthread_mutex_init(&cctx->buffPool.bufferPool_mutex, NULL);
+    cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
     cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
     pthread_mutex_init(&cctx->jobCompleted_mutex, NULL);
     return cctx;
@@ -191,9 +207,9 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)  /* incompleted ! */
 {
     POOL_free(mtctx->factory);
-    /* free mutexes (if necessary) */
-    /* free bufferPool */
+    ZSTDMT_freeBufferPool(mtctx->buffPool);
     ZSTDMT_freeCCtxPool(mtctx->cctxPool);
+    pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
     free(mtctx);
     return 0;
 }
@@ -221,7 +237,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
         for (u=0; u<nbFrames; u++) {
             size_t const frameSize = MIN(remainingSrcSize, avgFrameSize);
             size_t const dstBufferCapacity = u ? ZSTD_compressBound(frameSize) : dstCapacity;
-            buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(&mtctx->buffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity };
+            buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity };
             ZSTD_CCtx* cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
 
             mtctx->jobs[u].srcStart = srcStart + frameStartPos;
@@ -252,13 +268,15 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
             {   size_t const cSize = mtctx->jobs[frameID].cSize;
                 if (ZSTD_isError(cSize)) return cSize;
                 if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall);
-                if (frameID) memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, mtctx->jobs[frameID].cSize);
+                if (frameID) {
+                    memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, cSize);
+                    ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[frameID].dstBuff);
+                }
                 dstPos += cSize ;
             }
             ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[frameID].cctx);
-            ZSTDMT_releaseBuffer(&mtctx->buffPool, mtctx->jobs[frameID].dstBuff);
         }
-        DEBUGLOG(4, "compressed size : %u  ", (U32)dstPos);
+        DEBUGLOG(3, "compressed size : %u  ", (U32)dstPos);
         return dstPos;
     }