]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
release input buffers from inside worker thread
authorYann Collet <cyan@fb.com>
Tue, 11 Jul 2017 22:56:40 +0000 (15:56 -0700)
committerYann Collet <cyan@fb.com>
Tue, 11 Jul 2017 22:56:40 +0000 (15:56 -0700)
buffers are released sooner, which makes them available faster for next job.
=> decreases total nb of buffers necessary

lib/compress/zstdmt_compress.c

index 703d25e3070f4b1bf62d41af878eaf1033a5659c..7cf637f592d0d7bfbc5f6da1f8218af7cc1cd007 100644 (file)
@@ -169,7 +169,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
 {
     DEBUGLOG(2, "ZSTDMT_releaseBuffer");
-    if (buf.start == NULL) return;   /* release on NULL */
+    if (buf.start == NULL) return;   /* compatible with release on NULL */
     pthread_mutex_lock(&bufPool->poolMutex);
     if (bufPool->nbBuffers < bufPool->totalBuffers) {
         bufPool->bTable[bufPool->nbBuffers++] = buf;   /* store for later re-use */
@@ -271,16 +271,11 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 
 /* =====   Thread worker   ===== */
 
-typedef struct {
-    buffer_t buffer;
-    size_t filled;
-} inBuff_t;
-
 typedef struct {
     buffer_t src;
     const void* srcStart;
-    size_t   srcSize;
     size_t   dictSize;
+    size_t   srcSize;
     buffer_t dstBuff;
     size_t   cSize;
     size_t   dstFlushed;
@@ -349,6 +344,8 @@ void ZSTDMT_compressChunk(void* jobDescription)
 
 _endJob:
     ZSTDMT_releaseCCtx(job->cctxPool, cctx);
+    ZSTDMT_releaseBuffer(job->bufPool, job->src);
+    job->src = g_nullBuffer; job->srcStart = NULL;
     PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
     job->jobCompleted = 1;
     job->jobScanned = 0;
@@ -361,6 +358,11 @@ _endJob:
 /* =====   Multi-threaded compression   ===== */
 /* ------------------------------------------ */
 
+typedef struct {
+    buffer_t buffer;
+    size_t filled;
+} inBuff_t;
+
 struct ZSTDMT_CCtx_s {
     POOL_ctx* factory;
     ZSTDMT_jobDescription* jobs;
@@ -513,6 +515,7 @@ static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbT
 }
 
 
+/* Note : missing checksum at the end ! */
 size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
                            void* dst, size_t dstCapacity,
                      const void* src, size_t srcSize,
@@ -555,6 +558,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
             buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
             size_t dictSize = u ? overlapSize : 0;
 
+            mtctx->jobs[u].src = g_nullBuffer;
             mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
             mtctx->jobs[u].dictSize = dictSize;
             mtctx->jobs[u].srcSize = chunkSize;
@@ -771,10 +775,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
     zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
     zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
 
+    if (zcs->params.fParams.checksumFlag)
+        XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->dictSize, srcSize);
+
     /* get a new buffer for next input */
     if (!endFrame) {
         size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
-        DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
         zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
         if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
             zcs->jobs[jobID].jobCompleted = 1;
@@ -783,18 +789,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
             ZSTDMT_releaseAllJobResources(zcs);
             return ERROR(memory_allocation);
         }
-        DEBUGLOG(5, "inBuff currently filled to %u", (U32)zcs->inBuff.filled);
         zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
-        DEBUGLOG(5, "new job : inBuff filled to %u, with %u dict and %u src",
-                    (U32)zcs->inBuff.filled, (U32)newDictSize,
-                    (U32)(zcs->inBuff.filled - newDictSize));
         memmove(zcs->inBuff.buffer.start,
             (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize,
             zcs->inBuff.filled);
-        DEBUGLOG(5, "new inBuff pre-filled");
         zcs->dictSize = newDictSize;
     } else {   /* if (endFrame==1) */
-        DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
         zcs->inBuff.buffer = g_nullBuffer;
         zcs->inBuff.filled = 0;
         zcs->dictSize = 0;
@@ -842,7 +842,6 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
             }
             DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
             if (zcs->params.fParams.checksumFlag) {
-                XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize);
                 if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) {  /* write checksum at end of last section */
                     U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
                     DEBUGLOG(5, "writing checksum : %08X \n", checksum);
@@ -850,9 +849,6 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
                     job.cSize += 4;
                     zcs->jobs[wJobID].cSize += 4;
             }   }
-            ZSTDMT_releaseBuffer(zcs->bufPool, job.src);
-            zcs->jobs[wJobID].srcStart = NULL;
-            zcs->jobs[wJobID].src = g_nullBuffer;
             zcs->jobs[wJobID].jobScanned = 1;
         }
         {   size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);