]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
ZSTDMT_initCStream() supports restart from invalid state
authorYann Collet <cyan@fb.com>
Wed, 18 Jan 2017 23:18:17 +0000 (15:18 -0800)
committerYann Collet <cyan@fb.com>
Wed, 18 Jan 2017 23:18:17 +0000 (15:18 -0800)
ZSTDMT_initCStream() will correcly scrub for resources
when it detects that previous compression was not properly finished.

lib/compress/zstdmt_compress.c

index 3762f5a2592223e51ac918145f0190bc9c41b993..c417e8aadb688935ed1f69dea79a10d3800e25e0 100644 (file)
@@ -228,6 +228,7 @@ struct ZSTDMT_CCtx_s {
     unsigned doneJobID;
     unsigned nextJobID;
     unsigned frameEnded;
+    unsigned allJobsCompleted;
     ZSTDMT_jobDescription jobs[1];   /* variable size (must lies at the end) */
 };
 
@@ -244,6 +245,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
     if (!cctx) return NULL;
     cctx->nbThreads = nbThreads;
     cctx->jobIDMask = nbJobs - 1;
+    cctx->allJobsCompleted = 1;
     cctx->factory = POOL_create(nbThreads, 1);
     cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
     cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
@@ -277,8 +279,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
 {
     if (mtctx==NULL) return 0;   /* compatible with free on NULL */
     POOL_free(mtctx->factory);
-    ZSTDMT_releaseAllJobResources(mtctx);    /* kill workers first */
-    ZSTDMT_freeBufferPool(mtctx->buffPool);  /* release job resources first */
+    if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
+    ZSTDMT_freeBufferPool(mtctx->buffPool);  /* release job resources into pools first */
     ZSTDMT_freeCCtxPool(mtctx->cctxPool);
     pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
     pthread_cond_destroy(&mtctx->jobCompleted_cond);
@@ -393,6 +395,11 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) {
 }
 
 size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
+    if (zcs->allJobsCompleted == 0) {   /* previous job not correctly finished */
+        ZSTDMT_waitForAllJobsCompleted(zcs);
+        ZSTDMT_releaseAllJobResources(zcs);
+        zcs->allJobsCompleted = 1;
+    }
     zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
     zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
     zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog);
@@ -402,13 +409,14 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
     zcs->doneJobID = 0;
     zcs->nextJobID = 0;
     zcs->frameEnded = 0;
+    zcs->allJobsCompleted = 0;
     return 0;
 }
 
 
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
-    if (zcs->frameEnded) return ERROR(stage_wrong);   /* current frame being ended. Finish it and restart a new one */
+    if (zcs->frameEnded) return ERROR(stage_wrong);   /* current frame being ended. Only flush is allowed. Restart with init */
 
     /* fill input buffer */
     {   size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
@@ -573,7 +581,9 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             }
             /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
             if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
-            return (zcs->doneJobID < zcs->nextJobID);
+            if (zcs->doneJobID < zcs->nextJobID) return 1;   /* still some buffer to flush */
+            zcs->allJobsCompleted = zcs->frameEnded;
+            return 0;
     }   }
 }