]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : intermediate outBuffer allocated from within worker
authorYann Collet <cyan@fb.com>
Tue, 11 Jul 2017 21:59:10 +0000 (14:59 -0700)
committerYann Collet <cyan@fb.com>
Tue, 11 Jul 2017 21:59:10 +0000 (14:59 -0700)
reduces total amount of memory needed,
since jobs in queue do not have an outBuffer pre-reserved now

lib/compress/zstdmt_compress.c

index c4547c81b6b618e4a6578dc6e160a72e79431946..9f30d31814073a17eb82cf93091c5f98468aea8c 100644 (file)
@@ -286,6 +286,7 @@ typedef struct {
     ZSTD_parameters params;
     const ZSTD_CDict* cdict;
     ZSTDMT_CCtxPool* cctxPool;
+    ZSTDMT_bufferPool* bufPool;
     unsigned long long fullFrameSize;
 } ZSTDMT_jobDescription;
 
@@ -295,7 +296,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
     ZSTD_CCtx* cctx = ZSTDMT_getCCtx(job->cctxPool);
     const void* const src = (const char*)job->srcStart + job->dictSize;
-    buffer_t const dstBuff = job->dstBuff;
+    buffer_t dstBuff = job->dstBuff;
     DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u",
                  job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
 
@@ -304,6 +305,16 @@ void ZSTDMT_compressChunk(void* jobDescription)
         goto _endJob;
     }
 
+    if (dstBuff.start == NULL) {
+        size_t const dstCapacity = ZSTD_compressBound(job->srcSize);
+        dstBuff = ZSTDMT_getBuffer(job->bufPool, dstCapacity);
+        if (dstBuff.start==NULL) {
+            job->cSize = ERROR(memory_allocation);
+            goto _endJob;
+        }
+        job->dstBuff = dstBuff;
+    }
+
     if (job->cdict) {  /* should only happen for first segment */
         size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize);
         DEBUGLOG(5, "using CDict");
@@ -347,7 +358,7 @@ _endJob:
 struct ZSTDMT_CCtx_s {
     POOL_ctx* factory;
     ZSTDMT_jobDescription* jobs;
-    ZSTDMT_bufferPool* buffPool;
+    ZSTDMT_bufferPool* bufPool;
     ZSTDMT_CCtxPool* cctxPool;
     pthread_mutex_t jobCompleted_mutex;
     pthread_cond_t jobCompleted_cond;
@@ -402,9 +413,9 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
     mtctx->factory = POOL_create(nbThreads, 1);
     mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
     mtctx->jobIDMask = nbJobs - 1;
-    mtctx->buffPool = ZSTDMT_createBufferPool(nbThreads, cMem);
+    mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem);
     mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem);
-    if (!mtctx->factory | !mtctx->jobs | !mtctx->buffPool | !mtctx->cctxPool) {
+    if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) {
         ZSTDMT_freeCCtx(mtctx);
         return NULL;
     }
@@ -426,13 +437,13 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
     unsigned jobID;
     DEBUGLOG(3, "ZSTDMT_releaseAllJobResources");
     for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
-        ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff);
+        ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
         mtctx->jobs[jobID].dstBuff = g_nullBuffer;
-        ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src);
+        ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src);
         mtctx->jobs[jobID].src = g_nullBuffer;
     }
     memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
-    ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer);
+    ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer);
     mtctx->inBuff.buffer = g_nullBuffer;
     mtctx->allJobsCompleted = 1;
 }
@@ -442,7 +453,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
     if (mtctx==NULL) return 0;   /* compatible with free on NULL */
     POOL_free(mtctx->factory);
     if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
-    ZSTDMT_freeBufferPool(mtctx->buffPool);  /* release job resources into pools first */
+    ZSTDMT_freeBufferPool(mtctx->bufPool);  /* release job resources into pools first */
     ZSTD_free(mtctx->jobs, mtctx->cMem);
     ZSTDMT_freeCCtxPool(mtctx->cctxPool);
     ZSTD_freeCDict(mtctx->cdictLocal);
@@ -456,11 +467,11 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
 {
     if (mtctx == NULL) return 0;   /* supports sizeof NULL */
     return sizeof(*mtctx)
-        + POOL_sizeof(mtctx->factory)
-        + ZSTDMT_sizeof_bufferPool(mtctx->buffPool)
-        + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)
-        + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)
-        + ZSTD_sizeof_CDict(mtctx->cdictLocal);
+            + POOL_sizeof(mtctx->factory)
+            + ZSTDMT_sizeof_bufferPool(mtctx->bufPool)
+            + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)
+            + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)
+            + ZSTD_sizeof_CDict(mtctx->cdictLocal);
 }
 
 size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
@@ -534,16 +545,9 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
             size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
             size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize);
             buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
-            buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity);
+            buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
             size_t dictSize = u ? overlapSize : 0;
 
-            if (dstBuffer.start==NULL) {
-                mtctx->jobs[u].cSize = ERROR(memory_allocation);   /* job result */
-                mtctx->jobs[u].jobCompleted = 1;
-                nbChunks = u+1;   /* only wait and free u jobs, instead of initially expected nbChunks ones */
-                break;   /* let's wait for previous jobs to complete, but don't start new ones */
-            }
-
             mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
             mtctx->jobs[u].dictSize = dictSize;
             mtctx->jobs[u].srcSize = chunkSize;
@@ -554,6 +558,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
             if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0;
             mtctx->jobs[u].dstBuff = dstBuffer;
             mtctx->jobs[u].cctxPool = mtctx->cctxPool;
+            mtctx->jobs[u].bufPool = mtctx->bufPool;
             mtctx->jobs[u].firstChunk = (u==0);
             mtctx->jobs[u].lastChunk = (u==nbChunks-1);
             mtctx->jobs[u].jobCompleted = 0;
@@ -591,13 +596,13 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
                         memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize);  /* may overlap when chunk compressed within dst */
                     if (chunkID >= compressWithinDst) {  /* chunk compressed into its own buffer, which must be released */
                         DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst);
-                        ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff);
+                        ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff);
                     }
                     mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
                 }
                 dstPos += cSize ;
             }
-        }
+        }  /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
         if (!error) DEBUGLOG(4, "compressed size : %u  ", (U32)dstPos);
         return error ? error : dstPos;
     }
@@ -696,8 +701,9 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
 }
 
 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
-                                const void* dict, size_t dictSize,
-                                ZSTD_parameters params, unsigned long long pledgedSrcSize)
+                             const void* dict, size_t dictSize,
+                                   ZSTD_parameters params,
+                                   unsigned long long pledgedSrcSize)
 {
     DEBUGLOG(5, "ZSTDMT_initCStream_advanced");
     return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, NULL, params, pledgedSrcSize);
@@ -733,18 +739,8 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
 
 static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame)
 {
-    size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
-    buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
     unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
-    if (dstBuffer.start==NULL) {
-        zcs->jobs[jobID].jobCompleted = 1;
-        zcs->nextJobID++;
-        ZSTDMT_waitForAllJobsCompleted(zcs);
-        ZSTDMT_releaseAllJobResources(zcs);
-        return ERROR(memory_allocation);
-    }
-
     DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ",
                 zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
     zcs->jobs[jobID].src = zcs->inBuff.buffer;
@@ -757,8 +753,9 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
     if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
     zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
     zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
-    zcs->jobs[jobID].dstBuff = dstBuffer;
+    zcs->jobs[jobID].dstBuff = g_nullBuffer;
     zcs->jobs[jobID].cctxPool = zcs->cctxPool;
+    zcs->jobs[jobID].bufPool = zcs->bufPool;
     zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
     zcs->jobs[jobID].lastChunk = endFrame;
     zcs->jobs[jobID].jobCompleted = 0;
@@ -770,7 +767,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
     if (!endFrame) {
         size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
         DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
-        zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
+        zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool, zcs->inBuffSize);
         if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
             zcs->jobs[jobID].jobCompleted = 1;
             zcs->nextJobID++;
@@ -845,19 +842,19 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
                     job.cSize += 4;
                     zcs->jobs[wJobID].cSize += 4;
             }   }
-            ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
+            ZSTDMT_releaseBuffer(zcs->bufPool, job.src);
             zcs->jobs[wJobID].srcStart = NULL;
             zcs->jobs[wJobID].src = g_nullBuffer;
             zcs->jobs[wJobID].jobScanned = 1;
         }
         {   size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(5, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+            DEBUGLOG(2, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
         }
         if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => move to next one */
-            ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
+            ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
             zcs->jobs[wJobID].dstBuff = g_nullBuffer;
             zcs->jobs[wJobID].jobCompleted = 0;
             zcs->doneJobID++;
@@ -904,7 +901,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
         if (ZSTD_isError(cSize)) return cSize;
         input->pos = input->size;
         output->pos += cSize;
-        ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer);  /* was allocated in initStream */
+        ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer);  /* was allocated in initStream */
         mtctx->allJobsCompleted = 1;
         mtctx->frameEnded = 1;
         return 0;
@@ -913,7 +910,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     /* fill input buffer */
     if (input->size > input->pos) {   /* support NULL input */
         if (mtctx->inBuff.buffer.start == NULL) {
-            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->buffPool, mtctx->inBuffSize);
+            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool, mtctx->inBuffSize);
             if (mtctx->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
             mtctx->inBuff.filled = 0;
         }