]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
ensure all writes to job->cSize are mutex protected
authorYann Collet <cyan@fb.com>
Fri, 21 Sep 2018 22:37:30 +0000 (15:37 -0700)
committerYann Collet <cyan@fb.com>
Fri, 21 Sep 2018 23:00:39 +0000 (16:00 -0700)
even when reporting errors,
using a macro for code brevity, as suggested by @terrelln,

lib/compress/zstdmt_compress.c
programs/fileio.c

index a7c409c62358b6ee3b1a6fa1a8dc3f488142ed8e..6b9c24b56f2aafa783e04dfd168bc5078b1d7521 100644 (file)
@@ -632,6 +632,13 @@ typedef struct {
     unsigned frameChecksumNeeded;        /* used only by mtctx */
 } ZSTDMT_jobDescription;
 
+#define JOB_ERROR(e) {                          \
+    ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);   \
+    job->cSize = e;                             \
+    ZSTD_pthread_mutex_unlock(&job->job_mutex); \
+    goto _endJob;                               \
+}
+
 /* ZSTDMT_compressionJob() is a POOL_function type */
 void ZSTDMT_compressionJob(void* jobDescription)
 {
@@ -643,22 +650,14 @@ void ZSTDMT_compressionJob(void* jobDescription)
     size_t lastCBlockSize = 0;
 
     /* ressources */
-    if (cctx==NULL) {
-        job->cSize = ERROR(memory_allocation);
-        goto _endJob;
-    }
+    if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
     if (dstBuff.start == NULL) {   /* streaming job : doesn't provide a dstBuffer */
         dstBuff = ZSTDMT_getBuffer(job->bufPool);
-        if (dstBuff.start==NULL) {
-            job->cSize = ERROR(memory_allocation);
-            goto _endJob;
-        }
+        if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation));
         job->dstBuff = dstBuff;   /* this value can be read in ZSTDMT_flush, when it copies the whole job */
     }
-    if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) {
-        job->cSize = ERROR(memory_allocation);
-        goto _endJob;
-    }
+    if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL)
+        JOB_ERROR(ERROR(memory_allocation));
 
     /* Don't compute the checksum for chunks, since we compute it externally,
      * but write it in the header.
@@ -672,30 +671,26 @@ void ZSTDMT_compressionJob(void* jobDescription)
     if (job->cdict) {
         size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize);
         assert(job->firstJob);  /* only allowed for first job */
-        if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
+        if (ZSTD_isError(initError)) JOB_ERROR(initError);
     } else {  /* srcStart points at reloaded section */
         U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
         {   size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob);
-            if (ZSTD_isError(forceWindowError)) {
-                job->cSize = forceWindowError;
-                goto _endJob;
-        }   }
+            if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
+        }
         {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
                                         job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
                                         ZSTD_dtlm_fast,
                                         NULL, /*cdict*/
                                         jobParams, pledgedSrcSize);
-            if (ZSTD_isError(initError)) {
-                job->cSize = initError;
-                goto _endJob;
-    }   }   }
+            if (ZSTD_isError(initError)) JOB_ERROR(initError);
+    }   }
 
     /* Perform serial step as early as possible, but after CCtx initialization */
     ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
 
     if (!job->firstJob) {  /* flush and overwrite frame header when it's not first job */
         size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
-        if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
+        if (ZSTD_isError(hSize)) JOB_ERROR(hSize);
         DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
         ZSTD_invalidateRepCodes(cctx);
     }
@@ -713,12 +708,7 @@ void ZSTDMT_compressionJob(void* jobDescription)
         assert(job->cSize == 0);
         for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
             size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
-            if (ZSTD_isError(cSize)) {
-                ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
-                job->cSize = cSize;
-                ZSTD_pthread_mutex_unlock(&job->job_mutex);
-                goto _endJob;
-            }
+            if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
             ip += chunkSize;
             op += cSize; assert(op < oend);
             /* stats */
@@ -739,7 +729,7 @@ void ZSTDMT_compressionJob(void* jobDescription)
             size_t const cSize = (job->lastJob) ?
                  ZSTD_compressEnd     (cctx, op, oend-op, ip, lastBlockSize) :
                  ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
-            if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
+            if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
             lastCBlockSize = cSize;
     }   }
 
@@ -1657,7 +1647,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
                 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
                         mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
                 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
-                DEBUGLOG(5, "dstBuffer released")
+                DEBUGLOG(5, "dstBuffer released");
                 mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
                 mtctx->jobs[wJobID].cSize = 0;   /* ensure this job slot is considered "not started" in future check */
                 mtctx->consumed += srcSize;
@@ -1880,7 +1870,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
                 /* It is only possible for this operation to fail if there are
                  * still compression jobs ongoing.
                  */
-                DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed")
+                DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed");
                 assert(mtctx->doneJobID != mtctx->nextJobID);
             } else
                 DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);
index 00f0bc2632d745b42d277b00767888d894ebab12..f3800b689e8abd6c26351893ac66c5fe4823fafc 100644 (file)
@@ -892,6 +892,9 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                         assert(zfp.produced >= previous_zfp_update.produced);
                         assert(g_nbWorkers >= 1);
 
+                        /* test if output speed is so slow that all buffers are full
+                         * and no further progress is possible
+                         * (neither compression nor adding more input into internal buffers) */
                         if ( (zfp.ingested == previous_zfp_update.ingested)   /* no data read : input buffer full */
                           && (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no more buffer to compress OR compression is really slow */
                           && (zfp.nbActiveWorkers == 0)          /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */