From: Yann Collet Date: Fri, 21 Sep 2018 22:37:30 +0000 (-0700) Subject: ensure all writes to job->cSize are mutex protected X-Git-Tag: v1.3.6^2~19^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bfff4f4809777f114888db858ccd4aa2c18641bb;p=thirdparty%2Fzstd.git ensure all writes to job->cSize are mutex protected even when reporting errors, using a macro for code brevity, as suggested by @terrelln, --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index a7c409c62..6b9c24b56 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -632,6 +632,13 @@ typedef struct { unsigned frameChecksumNeeded; /* used only by mtctx */ } ZSTDMT_jobDescription; +#define JOB_ERROR(e) { \ + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \ + job->cSize = e; \ + ZSTD_pthread_mutex_unlock(&job->job_mutex); \ + goto _endJob; \ +} + /* ZSTDMT_compressionJob() is a POOL_function type */ void ZSTDMT_compressionJob(void* jobDescription) { @@ -643,22 +650,14 @@ void ZSTDMT_compressionJob(void* jobDescription) size_t lastCBlockSize = 0; /* ressources */ - if (cctx==NULL) { - job->cSize = ERROR(memory_allocation); - goto _endJob; - } + if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation)); if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ dstBuff = ZSTDMT_getBuffer(job->bufPool); - if (dstBuff.start==NULL) { - job->cSize = ERROR(memory_allocation); - goto _endJob; - } + if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation)); job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ } - if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) { - job->cSize = ERROR(memory_allocation); - goto _endJob; - } + if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) + JOB_ERROR(ERROR(memory_allocation)); /* Don't compute the checksum for chunks, since we compute it externally, * but write it in the header. @@ -672,30 +671,26 @@ void ZSTDMT_compressionJob(void* jobDescription) if (job->cdict) { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize); assert(job->firstJob); /* only allowed for first job */ - if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } + if (ZSTD_isError(initError)) JOB_ERROR(initError); } else { /* srcStart points at reloaded section */ U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); - if (ZSTD_isError(forceWindowError)) { - job->cSize = forceWindowError; - goto _endJob; - } } + if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError); + } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ ZSTD_dtlm_fast, NULL, /*cdict*/ jobParams, pledgedSrcSize); - if (ZSTD_isError(initError)) { - job->cSize = initError; - goto _endJob; - } } } + if (ZSTD_isError(initError)) JOB_ERROR(initError); + } } /* Perform serial step as early as possible, but after CCtx initialization */ ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); - if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } + if (ZSTD_isError(hSize)) JOB_ERROR(hSize); DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); ZSTD_invalidateRepCodes(cctx); } @@ -713,12 +708,7 @@ void ZSTDMT_compressionJob(void* jobDescription) assert(job->cSize == 0); for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize); - if (ZSTD_isError(cSize)) { - ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); - job->cSize = cSize; - ZSTD_pthread_mutex_unlock(&job->job_mutex); - goto _endJob; - } + if (ZSTD_isError(cSize)) JOB_ERROR(cSize); ip += chunkSize; op += cSize; assert(op < oend); /* stats */ @@ -739,7 +729,7 @@ void ZSTDMT_compressionJob(void* jobDescription) size_t const cSize = (job->lastJob) ? ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); - if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } + if (ZSTD_isError(cSize)) JOB_ERROR(cSize); lastCBlockSize = cSize; } } @@ -1657,7 +1647,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); - DEBUGLOG(5, "dstBuffer released") + DEBUGLOG(5, "dstBuffer released"); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ mtctx->consumed += srcSize; @@ -1880,7 +1870,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* It is only possible for this operation to fail if there are * still compression jobs ongoing. */ - DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed") + DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed"); assert(mtctx->doneJobID != mtctx->nextJobID); } else DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start); diff --git a/programs/fileio.c b/programs/fileio.c index 00f0bc263..f3800b689 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -892,6 +892,9 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, assert(zfp.produced >= previous_zfp_update.produced); assert(g_nbWorkers >= 1); + /* test if output speed is so slow that all buffers are full + * and no further progress is possible + * (neither compression nor adding more input into internal buffers) */ if ( (zfp.ingested == previous_zfp_update.ingested) /* no data read : input buffer full */ && (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */ && (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */