]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : simplify job creation
authorYann Collet <cyan@fb.com>
Fri, 19 Jan 2018 21:19:59 +0000 (13:19 -0800)
committerYann Collet <cyan@fb.com>
Fri, 19 Jan 2018 21:25:06 +0000 (13:25 -0800)
job will not be created when not enough room within job Table

lib/compress/zstdmt_compress.c
tests/zstreamtest.c

index 303bc1f71832f9dbf7e8fde6029c137d819a2a85..5b253f162fe1a901aa4c5b027d2803fdc85643cc 100644 (file)
@@ -1003,6 +1003,10 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 {
     unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
+    unsigned const limitID = zcs->doneJobID & zcs->jobIDMask;
+    if ((zcs->doneJobID < zcs->nextJobID) & (jobID == limitID))
+        return 0;  /* new job would overwrite unflushed older job */
+
     if (!zcs->jobReady) {
         DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
                     zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
@@ -1210,27 +1214,15 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     }   }
 
     if ( (mtctx->jobReady)
-      || ( (mtctx->inBuff.filled >= newJobThreshold)  /* filled enough : let's compress */
-        && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) ) {   /* avoid overwriting job round buffer */
+      || (mtctx->inBuff.filled >= newJobThreshold)  /* filled enough : let's compress */
+      || ( (endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0) ) ) {   /* avoid overwriting job round buffer */
         CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) );
     }
 
     /* check for potential compressed data ready to be flushed */
     {   size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
         if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not flush yet */
-        if (mtctx->jobReady) return remainingToFlush;   /* some more input ready to be compressed */
-
-        switch(endOp)
-        {
-            case ZSTD_e_flush:
-                return ZSTDMT_flushStream(mtctx, output);
-            case ZSTD_e_end:
-                return ZSTDMT_endStream(mtctx, output);
-            case ZSTD_e_continue:
-                return 1;
-            default:
-                return ERROR(GENERIC);   /* invalid endDirective */
-        }
+        return remainingToFlush;
     }
 }
 
@@ -1249,8 +1241,9 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou
     size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize;
     DEBUGLOG(5, "ZSTDMT_flushStream_internal");
 
-    if ( (mtctx->jobReady || (srcSize > 0) || (endFrame && !mtctx->frameEnded))
-       && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) {
+    if ( mtctx->jobReady     /* one job ready for a worker to pick up */
+      || (srcSize > 0)       /* still some data within input buffer */
+      || (endFrame && !mtctx->frameEnded)) {  /* need a last 0-size block to end frame */
            DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job");
         CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
     }
index b21611abc121da20dc7c2eeb690057be7ae9bd81..6ec14bb8e156234d11ae3924f94a95e1c2424114 100644 (file)
@@ -863,6 +863,10 @@ static size_t findDiff(const void* buf1, const void* buf2, size_t max)
     for (u=0; u<max; u++) {
         if (b1[u] != b2[u]) break;
     }
+    if (u==max) {
+        DISPLAY("=> No difference detected within %u bytes \n", (U32)max);
+        return u;
+    }
     DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max);
     if (u>=3)
         DISPLAY(" %02X %02X %02X ",
@@ -1352,8 +1356,8 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
                 outBuff.size = outBuff.pos + dstBuffSize;
                 DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
                 decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
-                if (ZSTD_getErrorCode(decompressionResult) == ZSTD_error_corruption_detected) {
-                    DISPLAY("ZSTD_decompressStream: checksum error : \n");
+                if (ZSTD_isError(decompressionResult)) {
+                    DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult));
                     findDiff(copyBuffer, dstBuffer, totalTestSize);
                 }
                 CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));