From: Yann Collet Date: Wed, 18 Jan 2017 23:18:17 +0000 (-0800) Subject: ZSTDMT_initCStream() supports restart from invalid state X-Git-Tag: v1.1.3^2~19^2~36 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3a01c46b266996e7256faf731dd6813b9f47f3a7;p=thirdparty%2Fzstd.git ZSTDMT_initCStream() supports restart from invalid state ZSTDMT_initCStream() will correcly scrub for resources when it detects that previous compression was not properly finished. --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 3762f5a25..c417e8aad 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -228,6 +228,7 @@ struct ZSTDMT_CCtx_s { unsigned doneJobID; unsigned nextJobID; unsigned frameEnded; + unsigned allJobsCompleted; ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */ }; @@ -244,6 +245,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) if (!cctx) return NULL; cctx->nbThreads = nbThreads; cctx->jobIDMask = nbJobs - 1; + cctx->allJobsCompleted = 1; cctx->factory = POOL_create(nbThreads, 1); cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); @@ -277,8 +279,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) { if (mtctx==NULL) return 0; /* compatible with free on NULL */ POOL_free(mtctx->factory); - ZSTDMT_releaseAllJobResources(mtctx); /* kill workers first */ - ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources first */ + if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */ + ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */ ZSTDMT_freeCCtxPool(mtctx->cctxPool); pthread_mutex_destroy(&mtctx->jobCompleted_mutex); pthread_cond_destroy(&mtctx->jobCompleted_cond); @@ -393,6 +395,11 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) { } size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { + if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */ + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + zcs->allJobsCompleted = 1; + } zcs->params = ZSTD_getParams(compressionLevel, 0, 0); zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2); zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog); @@ -402,13 +409,14 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { zcs->doneJobID = 0; zcs->nextJobID = 0; zcs->frameEnded = 0; + zcs->allJobsCompleted = 0; return 0; } size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { - if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Finish it and restart a new one */ + if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */ /* fill input buffer */ { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); @@ -573,7 +581,9 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp } /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); - return (zcs->doneJobID < zcs->nextJobID); + if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */ + zcs->allJobsCompleted = zcs->frameEnded; + return 0; } } }