]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : fixed memory leak
authorYann Collet <cyan@fb.com>
Fri, 26 Jan 2018 18:44:09 +0000 (10:44 -0800)
committerYann Collet <cyan@fb.com>
Fri, 26 Jan 2018 18:44:09 +0000 (10:44 -0800)
writeLastEmptyBlock() must release srcBuffer
as mtctx assumes it's done by job worker.

minor : changed 2 job member names (src->srcBuffer, srcStart->prefixStart) for clarity

lib/compress/zstdmt_compress.c

index 4e53367fda7f22c89f95632d40d022b06430dbb2..9fea4969fa35d0c3ea98759f3d428722704639a1 100644 (file)
@@ -304,19 +304,19 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 /* ------------------------------------------ */
 
 typedef struct {
-    size_t   consumed;       /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */
-    size_t   cSize;          /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */
+    size_t   consumed;                   /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
+    size_t   cSize;                      /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
     ZSTD_pthread_mutex_t* mtctx_mutex;   /* Thread-safe - used by mtctx and (all) worker */
     ZSTD_pthread_cond_t* mtctx_cond;     /* Thread-safe - used by mtctx and (all) worker */
     ZSTDMT_CCtxPool* cctxPool;           /* Thread-safe - used by mtctx and (all) worker */
     ZSTDMT_bufferPool* bufPool;          /* Thread-safe - used by mtctx and (all) worker */
-    buffer_t dstBuff;        /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
-    buffer_t src;            /* set by mtctx, then modified by worker => no barrier */
-    const void* srcStart;    /* set by mtctx, then read by worker => no barrier */
-    size_t   prefixSize;     /* set by mtctx, then read by worker => no barrier */
-    size_t   srcSize;        /* set by mtctx, then read by worker => no barrier */
-    unsigned firstChunk;     /* set by mtctx, then read by worker => no barrier */
-    unsigned lastChunk;      /* set by mtctx, then read by worker => no barrier */
+    buffer_t dstBuff;                    /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
+    buffer_t srcBuff;                    /* set by mtctx, then released by worker => no barrier */
+    const void* prefixStart;             /* set by mtctx, then read by worker => no barrier */
+    size_t   prefixSize;                 /* set by mtctx, then read by worker => no barrier */
+    size_t   srcSize;                    /* set by mtctx, then read by worker => no barrier */
+    unsigned firstChunk;                 /* set by mtctx, then read by worker => no barrier */
+    unsigned lastChunk;                  /* set by mtctx, then read by worker => no barrier */
     ZSTD_CCtx_params params;             /* set by mtctx, then read by worker => no barrier */
     const ZSTD_CDict* cdict;             /* set by mtctx, then read by worker => no barrier */
     unsigned long long fullFrameSize;    /* set by mtctx, then read by worker => no barrier */
@@ -329,7 +329,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
 {
     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
     ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
-    const void* const src = (const char*)job->srcStart + job->prefixSize;
+    const void* const src = (const char*)job->prefixStart + job->prefixSize;
     buffer_t dstBuff = job->dstBuff;
 
     /* ressources */
@@ -362,7 +362,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
                 goto _endJob;
         }   }
         {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
-                                        job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
+                                        job->prefixStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
                                         NULL, /*cdict*/
                                         jobParams, pledgedSrcSize);
             if (ZSTD_isError(initError)) {
@@ -419,8 +419,8 @@ void ZSTDMT_compressChunk(void* jobDescription)
 _endJob:
     /* release resources */
     ZSTDMT_releaseCCtx(job->cctxPool, cctx);
-    ZSTDMT_releaseBuffer(job->bufPool, job->src);
-    job->src = g_nullBuffer; job->srcStart = NULL;
+    ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff);
+    job->srcBuff = g_nullBuffer; job->prefixStart = NULL;
     /* report */
     ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
     job->consumed = job->srcSize;
@@ -557,9 +557,9 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
         DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
         ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
         mtctx->jobs[jobID].dstBuff = g_nullBuffer;
-        DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].src.start);
-        ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src);
-        mtctx->jobs[jobID].src = g_nullBuffer;
+        DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start);
+        ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff);
+        mtctx->jobs[jobID].srcBuff = g_nullBuffer;
     }
     memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
     DEBUGLOG(4, "input: release address %08X", (U32)(size_t)mtctx->inBuff.buffer.start);
@@ -757,8 +757,8 @@ static size_t ZSTDMT_compress_advanced_internal(
             buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
             size_t dictSize = u ? overlapSize : 0;
 
-            mtctx->jobs[u].src = g_nullBuffer;
-            mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
+            mtctx->jobs[u].srcBuff = g_nullBuffer;
+            mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize;
             mtctx->jobs[u].prefixSize = dictSize;
             mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0);  /* avoid job.srcSize == 0 */
             mtctx->jobs[u].consumed = 0;
@@ -781,7 +781,7 @@ static size_t ZSTDMT_compress_advanced_internal(
             }
 
             DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u  (%u bytes)", u, (U32)chunkSize);
-            DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12);
+            DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12);
             POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
 
             frameStartPos += chunkSize;
@@ -802,7 +802,7 @@ static size_t ZSTDMT_compress_advanced_internal(
             ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
             DEBUGLOG(5, "ready to write chunk %u ", chunkID);
 
-            mtctx->jobs[chunkID].srcStart = NULL;
+            mtctx->jobs[chunkID].prefixStart = NULL;
             {   size_t const cSize = mtctx->jobs[chunkID].cSize;
                 if (ZSTD_isError(cSize)) error = cSize;
                 if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
@@ -999,18 +999,21 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) {
  */
 static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
 {
-    assert(job->srcSize == 0);
     assert(job->lastChunk == 1);
-    assert(job->firstChunk == 0);   /* first chunk needs to create frame header too */
-    assert(job->dstBuff.start == NULL);  /* invoked from streaming variant only */
-    {   buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool);
-        if (dstBuff.start==NULL) return ERROR(memory_allocation);
-        job->dstBuff = dstBuff;   /* will be released by ZSTDMT_flushProduced() */
-        assert(dstBuff.size >= ZSTD_blockHeaderSize);
-        job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size);
-        assert(!ZSTD_isError(job->cSize));
-        assert(job->consumed == 0);
-    }
+    assert(job->srcSize == 0);      /* last chunk is empty -> will be simplified into a last empty block */
+    assert(job->firstChunk == 0);   /* cannot be first chunk, as it also needs to create frame header */
+    /* A job created by streaming variant starts with a src buffer, but no dst buffer.
+     * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx.
+     * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx.
+     * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */
+    assert(job->dstBuff.start == NULL);  /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
+    assert(job->srcBuff.start != NULL);      /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */
+    assert(job->srcBuff.size >= ZSTD_blockHeaderSize);
+    job->dstBuff = job->srcBuff;
+    job->srcBuff = g_nullBuffer;
+    job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size);
+    assert(!ZSTD_isError(job->cSize));
+    assert(job->consumed == 0);
     return 0;
 }
 
@@ -1028,12 +1031,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
     if (!mtctx->jobReady) {
         DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
                     mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize);
-        mtctx->jobs[jobID].src = mtctx->inBuff.buffer;
-        mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start;
+        mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer;
+        mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start;
+        mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
         mtctx->jobs[jobID].srcSize = srcSize;
         mtctx->jobs[jobID].consumed = 0;
         mtctx->jobs[jobID].cSize = 0;
-        mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
         assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize);
         mtctx->jobs[jobID].params = mtctx->params;
         /* do not calculate checksum within sections, but write it in header for first section */
@@ -1066,7 +1069,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
             }
             mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize;
             memmove(mtctx->inBuff.buffer.start,   /* copy end of current job into next job, as "prefix" */
-                (const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize,
+                (const char*)mtctx->jobs[jobID].prefixStart + mtctx->prefixSize + srcSize - newPrefixSize,
                 mtctx->inBuff.filled);
             mtctx->prefixSize = newPrefixSize;
         } else {   /* endFrame==1 => no need for another input buffer */