]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fixed : calling ZSTD_compress_generic() to end-flush a stream in multiple steps
authorYann Collet <cyan@fb.com>
Fri, 16 Jun 2017 18:58:21 +0000 (11:58 -0700)
committerYann Collet <cyan@fb.com>
Fri, 16 Jun 2017 18:58:21 +0000 (11:58 -0700)
lib/common/error_private.c
lib/common/zstd_errors.h
lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
lib/zstd.h

index 6bc86da7a173d3e44227be3da41ba5b9c53686be..2d752cd23a721631b52ce86e11514fc4b5ccea42 100644 (file)
@@ -24,7 +24,8 @@ const char* ERR_getErrorString(ERR_enum code)
     case PREFIX(frameParameter_unsupported): return "Unsupported frame parameter";
     case PREFIX(frameParameter_unsupportedBy32bits): return "Frame parameter unsupported in 32-bits mode";
     case PREFIX(frameParameter_windowTooLarge): return "Frame requires too much memory for decoding";
-    case PREFIX(compressionParameter_unsupported): return "Compression parameter is out of bound";
+    case PREFIX(compressionParameter_unsupported): return "Compression parameter is not supported";
+    case PREFIX(compressionParameter_outOfBound): return "Compression parameter is out of bound";
     case PREFIX(init_missing): return "Context should be init first";
     case PREFIX(memory_allocation): return "Allocation error : not enough memory";
     case PREFIX(stage_wrong): return "Operation not authorized at current processing stage";
index e067acf6ecac55fa25d3061c4c13eb3ad6bc4170..19f1597aa340d1026eefb6108dbfcc2dd5c7881e 100644 (file)
@@ -35,8 +35,11 @@ extern "C" {
 #endif
 
 /*-****************************************
-*  error codes list
-******************************************/
+ *  error codes list
+ *  note : this API is still considered unstable
+ *         it should not be used with a dynamic library
+ *         only static linking is allowed
+ ******************************************/
 typedef enum {
   ZSTD_error_no_error,
   ZSTD_error_GENERIC,
@@ -47,6 +50,7 @@ typedef enum {
   ZSTD_error_frameParameter_unsupportedBy32bits,
   ZSTD_error_frameParameter_windowTooLarge,
   ZSTD_error_compressionParameter_unsupported,
+  ZSTD_error_compressionParameter_outOfBound,
   ZSTD_error_init_missing,
   ZSTD_error_memory_allocation,
   ZSTD_error_stage_wrong,
@@ -67,7 +71,7 @@ typedef enum {
 
 /*! ZSTD_getErrorCode() :
     convert a `size_t` function result into a `ZSTD_ErrorCode` enum type,
-    which can be used to compare directly with enum list published into "error_public.h" */
+    which can be used to compare with enum list published above */
 ZSTDERRORLIB_API ZSTD_ErrorCode ZSTD_getErrorCode(size_t functionResult);
 ZSTDERRORLIB_API const char* ZSTD_getErrorString(ZSTD_ErrorCode code);
 
index 1be0e97b3f3a1bd98687273e6141d9f9ac5f9937..dfaa1854ff8e9756874e73bfa402ce77c2b49d51 100644 (file)
@@ -241,7 +241,7 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
 {
 #   define CLAMPCHECK(val,min,max) {                         \
         if ((val<min) | (val>max)) {                         \
-            return ERROR(compressionParameter_unsupported);  \
+            return ERROR(compressionParameter_outOfBound);  \
     }   }
 
     if (cctx->streamStage != zcss_init) return ERROR(stage_wrong);
@@ -342,19 +342,20 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
             if (cctx->staticSize)  /* MT not compatible with static alloc */
                 return ERROR(compressionParameter_unsupported);
             ZSTDMT_freeCCtx(cctx->mtctx);
-            cctx->nbThreads = value;
+            cctx->nbThreads = 1;
             cctx->mtctx = ZSTDMT_createCCtx(value);
             if (cctx->mtctx == NULL) return ERROR(memory_allocation);
-        }
-        cctx->nbThreads = 1;
+            cctx->nbThreads = value;
+        } else
+            cctx->nbThreads = 1;
         return 0;
 
-    case ZSTDMT_p_jobSize:
+    case ZSTD_p_jobSize:
         if (cctx->nbThreads <= 1) return ERROR(compressionParameter_unsupported);
         assert(cctx->mtctx != NULL);
         return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_sectionSize, value);
 
-    case ZSTDMT_p_overlapSizeLog:
+    case ZSTD_p_overlapSizeLog:
         if (cctx->nbThreads <= 1) return ERROR(compressionParameter_unsupported);
         assert(cctx->mtctx != NULL);
         return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_overlapSectionLog, value);
@@ -3648,8 +3649,8 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
                   && (zcs->inBuffPos == zcs->inToCompress) ) {
                     /* empty */
                     someMoreWork = 0; break;
-                 }
-             }
+                }
+            }
             /* compress current block (note : this stage cannot be stopped in the middle) */
             DEBUGLOG(5, "stream compression stage (flushMode==%u)", flushMode);
             {   void* cDst;
@@ -3658,7 +3659,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
                 size_t oSize = oend-op;
                 unsigned const lastBlock = (flushMode == ZSTD_e_end) && (ip==iend);
                 if (oSize >= ZSTD_compressBound(iSize))
-                    cDst = op;   /* compress directly into output buffer (skip flush stage) */
+                    cDst = op;   /* compress into output buffer, to skip flush stage */
                 else
                     cDst = zcs->outBuff, oSize = zcs->outBuffSize;
                 cSize = lastBlock ?
@@ -3667,7 +3668,6 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
                         ZSTD_compressContinue(zcs, cDst, oSize,
                                     zcs->inBuff + zcs->inToCompress, iSize);
                 if (ZSTD_isError(cSize)) return cSize;
-                DEBUGLOG(5, "cSize = %u   (lastBlock:%u)", (U32)cSize, lastBlock);
                 zcs->frameEnded = lastBlock;
                 /* prepare next block */
                 zcs->inBuffTarget = zcs->inBuffPos + zcs->blockSize;
@@ -3681,7 +3681,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
                 if (cDst == op) {  /* no need to flush */
                     op += cSize;
                     if (zcs->frameEnded) {
-                        DEBUGLOG(5, "Frame directly completed");
+                        DEBUGLOG(5, "Frame completed directly in outBuffer");
                         someMoreWork = 0;
                         zcs->streamStage = zcss_init;
                     }
@@ -3707,7 +3707,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
                 }
                 zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0;
                 if (zcs->frameEnded) {
-                    DEBUGLOG(5, "Frame completed");
+                    DEBUGLOG(5, "Frame completed on flush");
                     someMoreWork = 0;
                     zcs->streamStage = zcss_init;
                     break;
@@ -3781,15 +3781,19 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
 
 #ifdef ZSTD_MULTITHREAD
     if (cctx->nbThreads > 1) {
+        DEBUGLOG(5, "calling ZSTDMT_compressStream_generic(%i,...)", endOp);
         size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
+        DEBUGLOG(5, "ZSTDMT result : %u", (U32)flushMin);
         if (ZSTD_isError(flushMin)) cctx->streamStage = zcss_init;
+        if (endOp == ZSTD_e_end && flushMin==0)
+            cctx->streamStage = zcss_init;   /* compression completed */
         return flushMin;
     }
 #endif
 
-    DEBUGLOG(5, "starting ZSTD_compressStream_generic");
+    DEBUGLOG(5, "calling ZSTD_compressStream_generic(%i,...)", endOp);
     CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) );
-    DEBUGLOG(5, "completing ZSTD_compress_generic");
+    DEBUGLOG(5, "completed ZSTD_compress_generic");
     return cctx->outBuffContentSize - cctx->outBuffFlushedSize; /* remaining to flush */
 }
 
index 0984128249ce183636704f39624d61862c6663f0..a4b4f5160a073eedf30c57c74f4ee0611351e9a4 100644 (file)
@@ -704,7 +704,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
             zcs->inBuff.filled);
         DEBUGLOG(5, "new inBuff pre-filled");
         zcs->dictSize = newDictSize;
-    } else {
+    } else {   /* if (endFrame==1) */
         zcs->inBuff.buffer = g_nullBuffer;
         zcs->inBuff.filled = 0;
         zcs->dictSize = 0;
@@ -768,7 +768,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
             zcs->jobs[wJobID].jobScanned = 1;
         }
         {   size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+            DEBUGLOG(5, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
@@ -808,11 +808,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
 
     if ( (zcs->inBuff.filled >= newJobThreshold)  /* filled enough : let's compress */
         && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {   /* avoid overwriting job round buffer */
-        CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) );
+        CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0 /* blockToFlush */) );
     }
 
     /* check for data to flush */
-    CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */
+    CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */
 
     /* recommended next input size : fill current input buffer */
     return zcs->inBuffSize - zcs->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */
@@ -823,16 +823,20 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
 {
     size_t const srcSize = zcs->inBuff.filled - zcs->dictSize;
 
-    if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
+    if (srcSize)
+        DEBUGLOG(5, "flushing : %u bytes left to compress", (U32)srcSize);
     if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
        && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
+        DEBUGLOG(5, "create new job with %u bytes to compress", (U32)srcSize);
+        DEBUGLOG(5, "end order : %u", endFrame);
         CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
+        DEBUGLOG(5, "resulting zcs->frameEnded : %u", zcs->frameEnded);
     }
 
     /* check if there is any data available to flush */
     DEBUGLOG(5, "zcs->doneJobID : %u  ; zcs->nextJobID : %u",
-            zcs->doneJobID, zcs->nextJobID);
-    return ZSTDMT_flushNextJob(zcs, output, 1);
+                zcs->doneJobID, zcs->nextJobID);
+    return ZSTDMT_flushNextJob(zcs, output, 1 /*blockToFlush */);
 }
 
 
@@ -840,14 +844,14 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 {
     if (zcs->nbThreads==1)
         return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
-    return ZSTDMT_flushStream_internal(zcs, output, 0);
+    return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */);
 }
 
 size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 {
     if (zcs->nbThreads==1)
         return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
-    return ZSTDMT_flushStream_internal(zcs, output, 1);
+    return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */);
 }
 
 size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
@@ -855,7 +859,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
                                      ZSTD_inBuffer* input,
                                      ZSTD_EndDirective endOp)
 {
-    CHECK_F(ZSTDMT_compressStream(mtctx, output, input));
+    if (input->pos < input->size)  /* exclude final flushes */
+        CHECK_F(ZSTDMT_compressStream(mtctx, output, input));
     switch(endOp)
     {
         case ZSTD_e_flush:
index a195438143b88ee0563d6ea7ac445653128b3a79..1e73b52779944ba774dafb28c4054c29b0aa072e 100644 (file)
@@ -702,11 +702,11 @@ typedef enum {
                               * More threads improve speed, but also increase memory usage.
                               * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
                               * Special: value 0 means "do not change nbThreads" */
-    ZSTDMT_p_jobSize,        /* Size of a compression job. Each compression job is completed in parallel.
+    ZSTD_p_jobSize,          /* Size of a compression job. Each compression job is completed in parallel.
                               * 0 means default, which is dynamically determined based on compression parameters.
                               * Job size must be a minimum of overlapSize, or 1 KB, whichever is largest
                               * The minimum size is automatically and transparently enforced */
-    ZSTDMT_p_overlapSizeLog, /* Size of previous input reloaded at the beginning of each job.
+    ZSTD_p_overlapSizeLog,   /* Size of previous input reloaded at the beginning of each job.
                               * 0 => no overlap, 6(default) => use 1/8th of windowSize, >=9 => use full windowSize */
 
     /* advanced parameters - may not remain available after API update */