]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt: removed job->jobCompleted
authorYann Collet <cyan@fb.com>
Fri, 26 Jan 2018 01:35:49 +0000 (17:35 -0800)
committerYann Collet <cyan@fb.com>
Fri, 26 Jan 2018 01:35:49 +0000 (17:35 -0800)
replaced by equivalent signal job->consumer == job->srcSize.

created additional functions
ZSTD_writeLastEmptyBlock()
and
ZSTDMT_writeLastEmptyBlock()
required when it's necessary to finish a frame with a last empty job, to create an "end of frame" marker.

It avoids creating a job with srcSize==0.

lib/compress/zstd_compress.c
lib/compress/zstd_compress_internal.h
lib/compress/zstdmt_compress.c

index a87c368ead58b7e56aaf4d4a7bf291a3099cf9f4..7b16d736439ee9b9802f28e58e51115d0523ecaf 100644 (file)
@@ -1824,7 +1824,7 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
                                         void* dst, size_t dstCapacity,
                                         const void* src, size_t srcSize)
 {
-    DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u) (dictLimit=%u, nextToUpdate=%u)",
+    DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%udictLimit=%u, nextToUpdate=%u)",
                 (U32)dstCapacity, zc->blockState.matchState.dictLimit, zc->blockState.matchState.nextToUpdate);
     if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1)
         return 0;   /* don't even attempt compression below a certain srcSize */
@@ -1837,9 +1837,9 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
         if (current > zc->blockState.matchState.nextToUpdate + 384)
             zc->blockState.matchState.nextToUpdate = current - MIN(192, (U32)(current - zc->blockState.matchState.nextToUpdate - 384));
     }
-    /* find and store sequences */
-    {
-        U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit;
+
+    /* select and store sequences */
+    {   U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit;
         size_t lastLLSize;
         { int i; for (i = 0; i < ZSTD_REP_NUM; ++i) zc->blockState.nextCBlock->rep[i] = zc->blockState.prevCBlock->rep[i]; }
         if (zc->appliedParams.ldmParams.enableLdm) {
@@ -1848,26 +1848,20 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
                     U32 rep[ZSTD_REP_NUM], ZSTD_CCtx_params const* params,
                     void const* src, size_t srcSize);
             ZSTD_ldmBlockCompressor const ldmBlockCompressor = extDict ? ZSTD_compressBlock_ldm_extDict : ZSTD_compressBlock_ldm;
-
             lastLLSize = ldmBlockCompressor(&zc->ldmState, &zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams, src, srcSize);
-        } else {
+        } else {   /* not long range mode */
             ZSTD_blockCompressor const blockCompressor = ZSTD_selectBlockCompressor(zc->appliedParams.cParams.strategy, extDict);
-
             lastLLSize = blockCompressor(&zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams.cParams, src, srcSize);
         }
-        {
-            const BYTE* const anchor = (const BYTE*)src + srcSize - lastLLSize;
-            ZSTD_storeLastLiterals(&zc->seqStore, anchor, lastLLSize);
-        }
-    }
-    /* encode */
-    {
-        size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace);
-        if (ZSTD_isError(cSize) || cSize == 0)
-            return cSize;
+        {   const BYTE* const lastLiterals = (const BYTE*)src + srcSize - lastLLSize;
+            ZSTD_storeLastLiterals(&zc->seqStore, lastLiterals, lastLLSize);
+    }   }
+
+    /* encode sequences and literals */
+    {   size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace);
+        if (ZSTD_isError(cSize) || cSize == 0) return cSize;
         /* confirm repcodes and entropy tables */
-        {
-            ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
+        {   ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
             zc->blockState.prevCBlock = zc->blockState.nextCBlock;
             zc->blockState.nextCBlock = tmp;
         }
@@ -2030,6 +2024,19 @@ static size_t ZSTD_writeFrameHeader(void* dst, size_t dstCapacity,
     return pos;
 }
 
+/* ZSTD_writeLastEmptyBlock() :
+ * output an empty Block with end-of-frame mark to complete a frame
+ * @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h))
+ *           or an error code if `dstCapcity` is too small (<ZSTD_blockHeaderSize)
+ */
+size_t ZSTD_writeLastEmptyBlock(void* dst, size_t dstCapacity)
+{
+    if (dstCapacity < ZSTD_blockHeaderSize) return ERROR(dstSize_tooSmall);
+    U32 const cBlockHeader24 = 1 /*lastBlock*/ + (((U32)bt_raw)<<1);  /* 0 size */
+    MEM_writeLE24(dst, cBlockHeader24);
+    return ZSTD_blockHeaderSize;
+}
+
 
 static void ZSTD_manageWindowContinuity(ZSTD_matchState_t* ms, void const* src, size_t srcSize)
 {
index 64f03a0d27c62ce19e2d5bdd1e63ac73d46353bc..cf5936587deab32defee7a2df366abba7998e4b9 100644 (file)
@@ -481,4 +481,13 @@ size_t ZSTD_compress_advanced_internal(ZSTD_CCtx* cctx,
                                  const void* dict,size_t dictSize,
                                  ZSTD_CCtx_params params);
 
+
+/* ZSTD_writeLastEmptyBlock() :
+ * output an empty Block with end-of-frame mark to complete a frame
+ * @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h))
+ *           or an error code if `dstCapcity` is too small (<ZSTD_blockHeaderSize)
+ */
+size_t ZSTD_writeLastEmptyBlock(void* dst, size_t dstCapacity);
+
+
 #endif /* ZSTD_COMPRESS_H */
index bc9209d5b6627374d33eb858d31a53b3dcc7d8cb..451bb5bf622ebe9625ff346c03bb16652e4f1ff6 100644 (file)
@@ -314,7 +314,6 @@ typedef struct {
     size_t   dstFlushed;
     unsigned firstChunk;
     unsigned lastChunk;
-    unsigned jobCompleted;
     unsigned frameChecksumNeeded;
     ZSTD_pthread_mutex_t* mtctx_mutex;
     ZSTD_pthread_cond_t* mtctx_cond;
@@ -345,7 +344,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
             goto _endJob;
         }
         ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
-        job->dstBuff = dstBuff;
+        job->dstBuff = dstBuff;   /* this value can be read in ZSTDMT_flush, when it copies the whole job */
         ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
     }
 
@@ -369,8 +368,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
             if (ZSTD_isError(initError)) {
                 job->cSize = initError;
                 goto _endJob;
-        }   }
-    }
+    }   }   }
     if (!job->firstChunk) {  /* flush and overwrite frame header when it's not first job */
         size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0);
         if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
@@ -379,12 +377,9 @@ void ZSTDMT_compressChunk(void* jobDescription)
     }
 
     /* compress */
-#if 0
-    job->cSize = (job->lastChunk) ?
-                 ZSTD_compressEnd     (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
-                 ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
-#else
-    if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX);   /* check overflow */
+    if (sizeof(size_t) > sizeof(int))
+        assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX);   /* check overflow */
+
     {   int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX);
         const BYTE* ip = (const BYTE*) src;
         BYTE* const ostart = (BYTE*)dstBuff.start;
@@ -404,7 +399,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
             job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
             DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
                         (U32)cSize, (U32)job->cSize);
-            ZSTD_pthread_cond_signal(job->mtctx_cond);
+            ZSTD_pthread_cond_signal(job->mtctx_cond);   /* warns some more data is ready to be flushed */
             ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
         }
         /* last block */
@@ -416,23 +411,19 @@ void ZSTDMT_compressChunk(void* jobDescription)
                  ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
             if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
             /* stats */
-            ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);   /* note : it's a mtctx mutex */
+            ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
             job->cSize += cSize;
-            job->consumed = job->srcSize;
             ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
-        }
-    }
-#endif
+    }   }
 
 _endJob:
-    /* release */
+    /* release resources */
     ZSTDMT_releaseCCtx(job->cctxPool, cctx);
     ZSTDMT_releaseBuffer(job->bufPool, job->src);
     job->src = g_nullBuffer; job->srcStart = NULL;
     /* report */
     ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
     job->consumed = job->srcSize;
-    job->jobCompleted = 1;
     ZSTD_pthread_cond_signal(job->mtctx_cond);
     ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
 }
@@ -577,18 +568,18 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
     mtctx->allJobsCompleted = 1;
 }
 
-static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
+static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
 {
     DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
-    while (zcs->doneJobID < zcs->nextJobID) {
-        unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
-        ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
-        while (zcs->jobs[jobID].jobCompleted==0) {
-            DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID);   /* we want to block when waiting for data to flush */
-            ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex);
+    while (mtctx->doneJobID < mtctx->nextJobID) {
+        unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
+        ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+        while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
+            DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
+            ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
         }
-        ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
-        zcs->doneJobID++;
+        ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
+        mtctx->doneJobID++;
     }
 }
 
@@ -769,7 +760,7 @@ static size_t ZSTDMT_compress_advanced_internal(
             mtctx->jobs[u].src = g_nullBuffer;
             mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
             mtctx->jobs[u].prefixSize = dictSize;
-            mtctx->jobs[u].srcSize = chunkSize;
+            mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0);  /* avoid job.srcSize == 0 */
             mtctx->jobs[u].consumed = 0;
             mtctx->jobs[u].cSize = 0;
             mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
@@ -782,7 +773,6 @@ static size_t ZSTDMT_compress_advanced_internal(
             mtctx->jobs[u].bufPool = mtctx->bufPool;
             mtctx->jobs[u].firstChunk = (u==0);
             mtctx->jobs[u].lastChunk = (u==nbChunks-1);
-            mtctx->jobs[u].jobCompleted = 0;
             mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
             mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
 
@@ -805,7 +795,7 @@ static size_t ZSTDMT_compress_advanced_internal(
         for (chunkID=0; chunkID<nbChunks; chunkID++) {
             DEBUGLOG(5, "waiting for chunk %u ", chunkID);
             ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
-            while (mtctx->jobs[chunkID].jobCompleted==0) {
+            while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) {
                 DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
                 ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
             }
@@ -879,7 +869,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
 /* ====================================== */
 
 size_t ZSTDMT_initCStream_internal(
-        ZSTDMT_CCtx* zcs,
+        ZSTDMT_CCtx* mtctx,
         const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
         const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
         unsigned long long pledgedSrcSize)
@@ -888,8 +878,8 @@ size_t ZSTDMT_initCStream_internal(
     /* params are supposed to be fully validated at this point */
     assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
-    assert(zcs->cctxPool->totalCCtx == params.nbThreads);
-    zcs->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
+    assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
+    mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
     if (params.jobSize == 0) {
         if (params.cParams.windowLog >= 29)
             params.jobSize = ZSTDMT_JOBSIZE_MAX;
@@ -898,56 +888,56 @@ size_t ZSTDMT_initCStream_internal(
     }
     if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
 
-    if (zcs->singleBlockingThread) {
+    if (mtctx->singleBlockingThread) {
         ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
         DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
         assert(singleThreadParams.nbThreads == 0);
-        return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0],
+        return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
                                          dict, dictSize, cdict,
                                          singleThreadParams, pledgedSrcSize);
     }
     DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads);
 
-    if (zcs->allJobsCompleted == 0) {   /* previous compression not correctly finished */
-        ZSTDMT_waitForAllJobsCompleted(zcs);
-        ZSTDMT_releaseAllJobResources(zcs);
-        zcs->allJobsCompleted = 1;
+    if (mtctx->allJobsCompleted == 0) {   /* previous compression not correctly finished */
+        ZSTDMT_waitForAllJobsCompleted(mtctx);
+        ZSTDMT_releaseAllJobResources(mtctx);
+        mtctx->allJobsCompleted = 1;
     }
 
-    zcs->params = params;
-    zcs->frameContentSize = pledgedSrcSize;
+    mtctx->params = params;
+    mtctx->frameContentSize = pledgedSrcSize;
     if (dict) {
-        ZSTD_freeCDict(zcs->cdictLocal);
-        zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
+        ZSTD_freeCDict(mtctx->cdictLocal);
+        mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
                                                     ZSTD_dlm_byCopy, dictMode, /* note : a loadPrefix becomes an internal CDict */
-                                                    params.cParams, zcs->cMem);
-        zcs->cdict = zcs->cdictLocal;
-        if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
+                                                    params.cParams, mtctx->cMem);
+        mtctx->cdict = mtctx->cdictLocal;
+        if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation);
     } else {
-        ZSTD_freeCDict(zcs->cdictLocal);
-        zcs->cdictLocal = NULL;
-        zcs->cdict = cdict;
+        ZSTD_freeCDict(mtctx->cdictLocal);
+        mtctx->cdictLocal = NULL;
+        mtctx->cdict = cdict;
     }
 
     assert(params.overlapSizeLog <= 9);
-    zcs->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog));
-    DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(zcs->targetPrefixSize>>10));
-    zcs->targetSectionSize = params.jobSize;
-    if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
-    if (zcs->targetSectionSize < zcs->targetPrefixSize) zcs->targetSectionSize = zcs->targetPrefixSize;  /* job size must be >= overlap size */
-    DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize);
-    zcs->inBuffSize = zcs->targetPrefixSize + zcs->targetSectionSize;
-    DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10));
-    ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
-    zcs->inBuff.buffer = g_nullBuffer;
-    zcs->prefixSize = 0;
-    zcs->doneJobID = 0;
-    zcs->nextJobID = 0;
-    zcs->frameEnded = 0;
-    zcs->allJobsCompleted = 0;
-    zcs->consumed = 0;
-    zcs->produced = 0;
-    if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
+    mtctx->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog));
+    DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
+    mtctx->targetSectionSize = params.jobSize;
+    if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
+    if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize;  /* job size must be >= overlap size */
+    DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
+    mtctx->inBuffSize = mtctx->targetPrefixSize + mtctx->targetSectionSize;
+    DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuffSize>>10));
+    ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuffSize, ZSTD_compressBound(mtctx->targetSectionSize)) );
+    mtctx->inBuff.buffer = g_nullBuffer;
+    mtctx->prefixSize = 0;
+    mtctx->doneJobID = 0;
+    mtctx->nextJobID = 0;
+    mtctx->frameEnded = 0;
+    mtctx->allJobsCompleted = 0;
+    mtctx->consumed = 0;
+    mtctx->produced = 0;
+    if (params.fParams.checksumFlag) XXH64_reset(&mtctx->xxhState, 0);
     return 0;
 }
 
@@ -982,103 +972,134 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
  * pledgedSrcSize can be zero == unknown (for the time being)
  * prefer using ZSTD_CONTENTSIZE_UNKNOWN,
  * as `0` might mean "empty" in the future */
-size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
+size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize)
 {
     if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
-    if (zcs->params.nbThreads==1)
-        return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
-    return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, 0, zcs->params,
+    if (mtctx->params.nbThreads==1)
+        return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize);
+    return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params,
                                        pledgedSrcSize);
 }
 
-size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
+size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) {
     ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0);
-    ZSTD_CCtx_params cctxParams = zcs->params;   /* retrieve sticky params */
+    ZSTD_CCtx_params cctxParams = mtctx->params;   /* retrieve sticky params */
     DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel);
     cctxParams.cParams = params.cParams;
     cctxParams.fParams = params.fParams;
-    return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN);
+    return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN);
 }
 
 
-static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD_EndDirective endOp)
+/* ZSTDMT_writeLastEmptyBlock()
+ * Write a single empty block with an end-of-frame
+ * to finish a frame.
+ * Completed synchronously.
+ * @return : 0, or an error code (can fail due to memory allocation)
+ */
+static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
+{
+    assert(job->srcSize == 0);
+    assert(job->lastChunk == 1);
+    assert(job->firstChunk == 0);   /* first chunk needs to create frame header too */
+    assert(job->dstBuff.start == NULL);  /* invoked from streaming variant only */
+    {   buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool);
+        if (dstBuff.start==NULL) return ERROR(memory_allocation);
+        job->dstBuff = dstBuff;   /* will be released by ZSTDMT_flushProduced() */
+        assert(dstBuff.size >= ZSTD_blockHeaderSize);
+        job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size);
+        assert(!ZSTD_isError(job->cSize));
+        assert(job->consumed == 0);
+    }
+    return 0;
+}
+
+static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
 {
-    unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
+    unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
     int const endFrame = (endOp == ZSTD_e_end);
 
-    if (zcs->nextJobID > zcs->doneJobID + zcs->jobIDMask) {
+    if (mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask) {
         DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");
-        assert((zcs->nextJobID & zcs->jobIDMask) == (zcs->doneJobID & zcs->jobIDMask));
+        assert((mtctx->nextJobID & mtctx->jobIDMask) == (mtctx->doneJobID & mtctx->jobIDMask));
         return 0;
     }
 
-    if (!zcs->jobReady) {
+    if (!mtctx->jobReady) {
         DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
-                    zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
-        zcs->jobs[jobID].src = zcs->inBuff.buffer;
-        zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
-        zcs->jobs[jobID].srcSize = srcSize;
-        zcs->jobs[jobID].consumed = 0;
-        zcs->jobs[jobID].cSize = 0;
-        zcs->jobs[jobID].prefixSize = zcs->prefixSize;
-        assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
-        zcs->jobs[jobID].params = zcs->params;
+                    mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize);
+        mtctx->jobs[jobID].src = mtctx->inBuff.buffer;
+        mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start;
+        mtctx->jobs[jobID].srcSize = srcSize;
+        mtctx->jobs[jobID].consumed = 0;
+        mtctx->jobs[jobID].cSize = 0;
+        mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
+        assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize);
+        mtctx->jobs[jobID].params = mtctx->params;
         /* do not calculate checksum within sections, but write it in header for first section */
-        if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
-        zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
-        zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
-        zcs->jobs[jobID].dstBuff = g_nullBuffer;
-        zcs->jobs[jobID].cctxPool = zcs->cctxPool;
-        zcs->jobs[jobID].bufPool = zcs->bufPool;
-        zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
-        zcs->jobs[jobID].lastChunk = endFrame;
-        zcs->jobs[jobID].jobCompleted = 0;
-        zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
-        zcs->jobs[jobID].dstFlushed = 0;
-        zcs->jobs[jobID].mtctx_mutex = &zcs->mtctx_mutex;
-        zcs->jobs[jobID].mtctx_cond = &zcs->mtctx_cond;
-
-        if (zcs->params.fParams.checksumFlag)
-            XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
+        if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0;
+        mtctx->jobs[jobID].cdict = mtctx->nextJobID==0 ? mtctx->cdict : NULL;
+        mtctx->jobs[jobID].fullFrameSize = mtctx->frameContentSize;
+        mtctx->jobs[jobID].dstBuff = g_nullBuffer;
+        mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
+        mtctx->jobs[jobID].bufPool = mtctx->bufPool;
+        mtctx->jobs[jobID].firstChunk = (mtctx->nextJobID==0);
+        mtctx->jobs[jobID].lastChunk = endFrame;
+        mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
+        mtctx->jobs[jobID].dstFlushed = 0;
+        mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex;
+        mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond;
+
+        if (mtctx->params.fParams.checksumFlag)
+            XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->prefixSize, srcSize);
 
         /* get a new buffer for next input */
         if (!endFrame) {
-            size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
-            zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
-            if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
-                zcs->jobs[jobID].jobCompleted = 1;
-                zcs->nextJobID++;
-                ZSTDMT_waitForAllJobsCompleted(zcs);
-                ZSTDMT_releaseAllJobResources(zcs);
+            size_t const newPrefixSize = MIN(srcSize + mtctx->prefixSize, mtctx->targetPrefixSize);
+            mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
+            if (mtctx->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
+                mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0;
+                mtctx->nextJobID++;
+                ZSTDMT_waitForAllJobsCompleted(mtctx);
+                ZSTDMT_releaseAllJobResources(mtctx);
                 return ERROR(memory_allocation);
             }
-            zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
-            memmove(zcs->inBuff.buffer.start,   /* copy end of current job into next job, as "prefix" */
-                (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
-                zcs->inBuff.filled);
-            zcs->prefixSize = newPrefixSize;
+            mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize;
+            memmove(mtctx->inBuff.buffer.start,   /* copy end of current job into next job, as "prefix" */
+                (const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize,
+                mtctx->inBuff.filled);
+            mtctx->prefixSize = newPrefixSize;
         } else {   /* endFrame==1 => no need for another input buffer */
-            zcs->inBuff.buffer = g_nullBuffer;
-            zcs->inBuff.filled = 0;
-            zcs->prefixSize = 0;
-            zcs->frameEnded = endFrame;
-            if (zcs->nextJobID == 0) {
-                /* single chunk exception : checksum is calculated directly within worker thread */
-                zcs->params.fParams.checksumFlag = 0;
-    }   }   }
+            mtctx->inBuff.buffer = g_nullBuffer;
+            mtctx->inBuff.filled = 0;
+            mtctx->prefixSize = 0;
+            mtctx->frameEnded = endFrame;
+            if (mtctx->nextJobID == 0) {
+                /* single chunk exception : checksum is already calculated directly within worker thread */
+                mtctx->params.fParams.checksumFlag = 0;
+        }   }
 
-    DEBUGLOG(2, "ZSTDMT_createCompressionJob: posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)",
-                zcs->nextJobID,
-                (U32)zcs->jobs[jobID].srcSize,
-                zcs->jobs[jobID].lastChunk,
-                zcs->doneJobID,
-                zcs->doneJobID & zcs->jobIDMask);
-    if (POOL_tryAdd(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID])) {
-        zcs->nextJobID++;
-        zcs->jobReady = 0;
+        if ( (srcSize == 0)
+          && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
+            assert(endOp == ZSTD_e_end);  /* only possible case : need to end the frame with an empty last block */
+            CHECK_F( ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID) );
+            mtctx->nextJobID++;
+            return 0;
+        }
+    }
+
+    DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)",
+                mtctx->nextJobID,
+                (U32)mtctx->jobs[jobID].srcSize,
+                mtctx->jobs[jobID].lastChunk,
+                mtctx->doneJobID,
+                mtctx->doneJobID & mtctx->jobIDMask);
+    if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) {
+        mtctx->nextJobID++;
+        mtctx->jobReady = 0;
     } else {
-        DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", zcs->nextJobID);
-        zcs->jobReady = 1;
+        DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID);
+        mtctx->jobReady = 1;
     }
     return 0;
 }
@@ -1088,73 +1109,78 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD
  * `output` : `pos` will be updated with amount of data flushed .
  * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
  * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
-static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
+static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
 {
-    unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
+    unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
     DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
     assert(output->size >= output->pos);
 
-    ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
-    if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
-        while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
-            if (zcs->jobs[wJobID].jobCompleted==1) break;
+    ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+    if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) {
+        assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
+        while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) {
+            if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) {
+                DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond",
+                            mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize);
+                break;
+            }
             DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
-                        zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
-            ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex);  /* block when nothing available to flush but more to come */
+                        mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
+            ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);  /* block when nothing available to flush but more to come */
     }   }
 
     /* some output is available to be flushed */
-    {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
-        ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
+    {   ZSTDMT_jobDescription job = mtctx->jobs[wJobID];
+        ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
         if (ZSTD_isError(job.cSize)) {
             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
-                        zcs->doneJobID, ZSTD_getErrorName(job.cSize));
-            ZSTDMT_waitForAllJobsCompleted(zcs);
-            ZSTDMT_releaseAllJobResources(zcs);
+                        mtctx->doneJobID, ZSTD_getErrorName(job.cSize));
+            ZSTDMT_waitForAllJobsCompleted(mtctx);
+            ZSTDMT_releaseAllJobResources(mtctx);
             return job.cSize;
         }
         /* add frame checksum if necessary (can only happen once) */
-        if ( job.jobCompleted
+        assert(job.consumed <= job.srcSize);
+        if ( (job.consumed == job.srcSize)
           && job.frameChecksumNeeded ) {
-            U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
+            U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState);
             DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
             MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
             job.cSize += 4;
-            zcs->jobs[wJobID].cSize += 4;
-            zcs->jobs[wJobID].frameChecksumNeeded = 0;
+            mtctx->jobs[wJobID].cSize += 4;
+            mtctx->jobs[wJobID].frameChecksumNeeded = 0;
         }
         assert(job.cSize >= job.dstFlushed);
-        if (job.dstBuff.start != NULL) {  /* one buffer present : some job is ongoing */
+        if (job.dstBuff.start != NULL) {  /* dst buffer present : some work is ongoing or completed */
             size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
             DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
-                        (U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
+                        (U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100);
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
 
-            if ( job.jobCompleted
+            if ( (job.consumed == job.srcSize)
               && (job.dstFlushed == job.cSize) ) {   /* output buffer fully flushed => move to next one */
                 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
-                        zcs->doneJobID, (U32)job.dstFlushed);
-                ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
-                zcs->jobs[wJobID].dstBuff = g_nullBuffer;
-                zcs->jobs[wJobID].jobCompleted = 0;
-                zcs->consumed += job.srcSize;
-                zcs->produced += job.cSize;
-                zcs->doneJobID++;
+                        mtctx->doneJobID, (U32)job.dstFlushed);
+                ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff);
+                mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
+                mtctx->consumed += job.srcSize;
+                mtctx->produced += job.cSize;
+                mtctx->doneJobID++;
             } else {
-                zcs->jobs[wJobID].dstFlushed = job.dstFlushed;   /* remember how much was flushed for next attempt */
+                mtctx->jobs[wJobID].dstFlushed = job.dstFlushed;   /* remember how much was flushed for next attempt */
         }   }
 
         /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
         if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
         if (job.srcSize > job.consumed) return 1;   /* current job not completely compressed */
     }
-    if (zcs->doneJobID < zcs->nextJobID) return 1;   /* some more jobs to flush */
-    if (zcs->jobReady) return 1;   /* one job is ready and queued! */
-    if (zcs->inBuff.filled > 0) return 1;   /* input not empty */
-    zcs->allJobsCompleted = zcs->frameEnded;   /* last frame entirely flushed */
-    if (end == ZSTD_e_end) return !zcs->frameEnded;  /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
+    if (mtctx->doneJobID < mtctx->nextJobID) return 1;   /* some more jobs to flush */
+    if (mtctx->jobReady) return 1;   /* one job is ready and queued! */
+    if (mtctx->inBuff.filled > 0) return 1;   /* input not empty */
+    mtctx->allJobsCompleted = mtctx->frameEnded;   /* last frame entirely flushed */
+    if (end == ZSTD_e_end) return !mtctx->frameEnded;  /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
     return 0;   /* everything flushed */
 }
 
@@ -1241,12 +1267,12 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
 }
 
 
-size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
+size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
-    CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) );
+    CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
 
     /* recommended next input size : fill current input buffer */
-    return zcs->inBuffSize - zcs->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
+    return mtctx->inBuffSize - mtctx->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
 }