]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fixed frame checksum issue
authorYann Collet <cyan@fb.com>
Fri, 19 Jan 2018 00:20:26 +0000 (16:20 -0800)
committerYann Collet <cyan@fb.com>
Fri, 19 Jan 2018 00:20:26 +0000 (16:20 -0800)
and race conditions

lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
tests/zstreamtest.c

index e8840be9729d4f017454a8835a0370fbab155295..a87c368ead58b7e56aaf4d4a7bf291a3099cf9f4 100644 (file)
@@ -2396,7 +2396,7 @@ static size_t ZSTD_writeEpilogue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity)
     BYTE* op = ostart;
     size_t fhSize = 0;
 
-    DEBUGLOG(5, "ZSTD_writeEpilogue");
+    DEBUGLOG(4, "ZSTD_writeEpilogue");
     if (cctx->stage == ZSTDcs_created) return ERROR(stage_wrong);  /* init missing */
 
     /* special case : empty frame */
@@ -2420,6 +2420,7 @@ static size_t ZSTD_writeEpilogue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity)
     if (cctx->appliedParams.fParams.checksumFlag) {
         U32 const checksum = (U32) XXH64_digest(&cctx->xxhState);
         if (dstCapacity<4) return ERROR(dstSize_tooSmall);
+        DEBUGLOG(4, "ZSTD_writeEpilogue: write checksum : %08X", checksum);
         MEM_writeLE32(op, checksum);
         op += 4;
     }
index ffcbdf5f8eec735f0ad05abdaf8092c7206480b8..1aa6b866f6fb3db73bc5ea30617d9937a80e3e02 100644 (file)
@@ -315,7 +315,7 @@ typedef struct {
     unsigned firstChunk;
     unsigned lastChunk;
     unsigned jobCompleted;
-    unsigned checksumWritten;
+    unsigned frameChecksumNeeded;
     ZSTD_pthread_mutex_t* jobCompleted_mutex;
     ZSTD_pthread_cond_t* jobCompleted_cond;
     ZSTD_CCtx_params params;
@@ -1019,7 +1019,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
     zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
     zcs->jobs[jobID].lastChunk = endFrame;
     zcs->jobs[jobID].jobCompleted = 0;
-    zcs->jobs[jobID].checksumWritten = 0;
+    zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
     zcs->jobs[jobID].dstFlushed = 0;
     zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
     zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
@@ -1065,22 +1065,25 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 }
 
 
-/* ZSTDMT_flushNextJob() :
- * output : will be updated with amount of data flushed .
- * blockToFlush : if >0, the function will block and wait if there is no data available to flush .
- * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */
+/*! ZSTDMT_flushNextJob() :
+ * `output` : will be updated with amount of data flushed .
+ * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
+ * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
 static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
 {
     unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
     DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
-    if (zcs->doneJobID == zcs->nextJobID) return 0;   /* all flushed ! */
+    if (zcs->doneJobID == zcs->nextJobID) {
+        DEBUGLOG(2, "ZSTDMT_flushNextJob: doneJobID==nextJobID : nothing to flush !")
+        return 0;   /* all flushed ! */
+    }
     ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
     while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
+        if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; }  /* nothing ready to be flushed => skip */
+        if (zcs->jobs[wJobID].jobCompleted==1) break;
         DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)",
                     zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
-        assert(zcs->jobs[wJobID].jobCompleted==0);
-        if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; }  /* nothing ready to be flushed => skip */
-        ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);  /* block when nothing available to flush */
+        ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);  /* block when nothing available to flush but more to come */
     }
 
     /* some output is available to be flushed */
@@ -1094,16 +1097,14 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
             return job.cSize;
         }
         /* add frame checksum if necessary */
-        if ( zcs->frameEnded
-          && (zcs->doneJobID+1 == zcs->nextJobID)
-          && (zcs->params.fParams.checksumFlag)
-          && (!job.checksumWritten) ) {
+        if ( job.jobCompleted
+          && job.frameChecksumNeeded ) {
             U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
             DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
             MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
             job.cSize += 4;
             zcs->jobs[wJobID].cSize += 4;
-            zcs->jobs[wJobID].checksumWritten = 1;
+            zcs->jobs[wJobID].frameChecksumNeeded = 0;
         }
         assert(job.cSize >= job.dstFlushed);
         {   size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
@@ -1114,19 +1115,20 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
         }
         if ( job.jobCompleted
           && (job.dstFlushed == job.cSize) ) {   /* output buffer fully flushed => move to next one */
+            DEBUGLOG(2, "Job %u completed, moving to next one", zcs->doneJobID);
             ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
             zcs->jobs[wJobID].dstBuff = g_nullBuffer;
             zcs->jobs[wJobID].jobCompleted = 0;
-            zcs->doneJobID++;
             zcs->consumed += job.srcSize;
             zcs->produced += job.cSize;
+            zcs->doneJobID++;
         } else {
             zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
         }
-        /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
+        /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
         if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
-        if (zcs->doneJobID < zcs->nextJobID) return 1;   /* still some buffer to flush */
-        zcs->allJobsCompleted = zcs->frameEnded;   /* frame completed and entirely flushed */
+        if (zcs->doneJobID < zcs->nextJobID) return 1;   /* still some more buffer to flush */
+        zcs->allJobsCompleted = zcs->frameEnded;   /* last frame entirely flushed */
         return 0;   /* everything flushed */
 }   }
 
index 5cd1ea0fbec4b4d7fc4ff2452d5bd92b3294490b..c26cb3295bc92d4db048db3c88fd2d304fa4890e 100644 (file)
@@ -1346,8 +1346,12 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
                 outBuff.size = outBuff.pos + dstBuffSize;
                 DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
                 decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
+                if (ZSTD_getErrorCode(decompressionResult) == ZSTD_error_corruption_detected) {
+                    DISPLAY("ZSTD_decompressStream: checksum error : \n");
+                    findDiff(copyBuffer, dstBuffer, totalTestSize);
+                }
                 CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
-                DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize);
+                DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u \n", (U32)inBuff.pos);
             }
             CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
             CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);