]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
error out when --adapt is associated with --single-thread
authorYann Collet <cyan@fb.com>
Wed, 19 Sep 2018 21:49:13 +0000 (14:49 -0700)
committerYann Collet <cyan@fb.com>
Wed, 19 Sep 2018 21:49:13 +0000 (14:49 -0700)
since they are not compatible

lib/zstd.h
programs/fileio.c

index 25441e68c34c6acdd77ab6b7de68f4ea26ace370..669161b41544c185cced95138d22598876ebc4d4 100644 (file)
@@ -736,13 +736,14 @@ ZSTDLIB_API size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const
 
 /*! ZSTD_resetCStream() :
  *  start a new compression job, using same parameters from previous job.
- *  This is typically useful to skip dictionary loading stage, since it will re-use it in-place..
+ *  This is typically useful to skip dictionary loading stage, since it will re-use it in-place.
  *  Note that zcs must be init at least once before using ZSTD_resetCStream().
  *  If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN.
  *  If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end.
  *  For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs,
  *  but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead.
- * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
+ * @return : 0, or an error code (which can be tested using ZSTD_isError())
+ */
 ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize);
 
 
@@ -755,21 +756,27 @@ typedef struct {
     unsigned nbActiveWorkers;      /* MT only : nb of workers actively compressing at probe time */
 } ZSTD_frameProgression;
 
-/* ZSTD_getFrameProgression():
+/* ZSTD_getFrameProgression() :
  * tells how much data has been ingested (read from input)
  * consumed (input actually compressed) and produced (output) for current frame.
- * Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed.
- * Can report progression inside worker threads (multi-threading and non-blocking mode).
+ * Note : (ingested - consumed) is amount of input data buffered internally, not yet compressed.
+ * Aggregates progression inside active worker threads.
  */
 ZSTDLIB_API ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);
 
-/*! ZSTD_toFlushNow()
+/*! ZSTD_toFlushNow() :
  *  Tell how many bytes are ready to be flushed immediately.
  *  Useful for multithreading scenarios (nbWorkers >= 1).
- *  Probe the oldest active job (not yet entirely flushed) and check its output buffer.
- *  If return 0, it means there is no active job, or
- *  it means oldest job is still active, but everything produced has been flushed so far,
- *  therefore flushing is limited by speed of oldest job. */
+ *  Probe the oldest active job, defined as oldest job not yet entirely flushed,
+ *  and check its output buffer.
+ * @return : amount of data stored in oldest job and ready to be flushed immediately.
+ *  if @return == 0, it means either :
+ *  + there is no active job (could be checked with ZSTD_frameProgression()), or
+ *  + oldest job is still actively compressing data,
+ *    but everything it has produced has also been flushed so far,
+ *    therefore flushing speed is currently limited by production speed of oldest job
+ *    irrespective of the speed of concurrent newer jobs.
+ */
 ZSTDLIB_API size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx);
 
 
index 6ea43c902107e6982e8a2e5ab47e38bb7419de26..701e30e8f1a85b611813ced3df178f1b165bed5a 100644 (file)
@@ -283,7 +283,11 @@ void FIO_setOverlapLog(unsigned overlapLog){
     g_overlapLog = overlapLog;
 }
 static U32 g_adaptiveMode = 0;
-void FIO_setAdaptiveMode(unsigned adapt) { g_adaptiveMode = adapt; }
+void FIO_setAdaptiveMode(unsigned adapt) {
+    if ((adapt>0) && (g_nbWorkers==0))
+        EXM_THROW(1, "Adaptive mode is not compatible with single thread mode \n");
+    g_adaptiveMode = adapt;
+}
 static U32 g_ldmFlag = 0;
 void FIO_setLdmFlag(unsigned ldmFlag) {
     g_ldmFlag = (ldmFlag>0);
@@ -541,7 +545,8 @@ static void FIO_freeCResources(cRess_t ress)
 
 
 #ifdef ZSTD_GZCOMPRESS
-static unsigned long long FIO_compressGzFrame(cRess_t* ress,
+static unsigned long long
+FIO_compressGzFrame(cRess_t* ress,
                     const char* srcFileName, U64 const srcFileSize,
                     int compressionLevel, U64* readsize)
 {
@@ -623,9 +628,10 @@ static unsigned long long FIO_compressGzFrame(cRess_t* ress,
 
 
 #ifdef ZSTD_LZMACOMPRESS
-static unsigned long long FIO_compressLzmaFrame(cRess_t* ress,
-                            const char* srcFileName, U64 const srcFileSize,
-                            int compressionLevel, U64* readsize, int plain_lzma)
+static unsigned long long
+FIO_compressLzmaFrame(cRess_t* ress,
+                      const char* srcFileName, U64 const srcFileSize,
+                      int compressionLevel, U64* readsize, int plain_lzma)
 {
     unsigned long long inFileSize = 0, outFileSize = 0;
     lzma_stream strm = LZMA_STREAM_INIT;
@@ -698,9 +704,10 @@ static unsigned long long FIO_compressLzmaFrame(cRess_t* ress,
 #define LZ4F_max64KB max64KB
 #endif
 static int FIO_LZ4_GetBlockSize_FromBlockId (int id) { return (1 << (8 + (2 * id))); }
-static unsigned long long FIO_compressLz4Frame(cRess_t* ress,
-                            const char* srcFileName, U64 const srcFileSize,
-                            int compressionLevel, U64* readsize)
+static unsigned long long
+FIO_compressLz4Frame(cRess_t* ress,
+                     const char* srcFileName, U64 const srcFileSize,
+                     int compressionLevel, U64* readsize)
 {
     const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB);
     unsigned long long inFileSize = 0, outFileSize = 0;
@@ -838,7 +845,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
 
             /* count stats */
             inputPresented++;
-            if (oldIPos == inBuff.pos) inputBlocked++;
+            if (oldIPos == inBuff.pos) inputBlocked++;  /* input buffer is full and can't take any more : input speed is faster than consumption rate */
             if (!toFlushNow) flushWaiting = 1;
 
             /* Write compressed stream */
@@ -846,7 +853,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                             (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos);
             if (outBuff.pos) {
                 size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
-                if (sizeCheck!=outBuff.pos)
+                if (sizeCheck != outBuff.pos)
                     EXM_THROW(25, "Write error : cannot write compressed block");
                 compressedfilesize += outBuff.pos;
             }
@@ -857,24 +864,24 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                 double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
 
                 /* check output speed */
-                if (zfp.currentJobID > 1) {
-                    static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 };
+                if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */
+                    static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 };   /* note : requires fileio to run main thread */
 
                     unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
                     unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
                     assert(zfp.produced >= cpszfp.produced);
-
-                    cpszfp = zfp;
+                    assert(g_nbWorkers >= 1);
 
                     if ( (zfp.ingested == cpszfp.ingested)   /* no data read : input buffer full */
                       && (zfp.consumed == cpszfp.consumed)   /* no data compressed : no more buffer to compress OR compression is really slow */
                       && (zfp.nbActiveWorkers == 0)          /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
-                      && (zfp.currentJobID > 0)              /* first job started : only remaining reason is no more available buffer to start compression */
                       ) {
                         DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
                         speedChange = slower;
                     }
 
+                    cpszfp = zfp;
+
                     if ( (newlyProduced > (newlyFlushed * 9 / 8))   /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
                       && (flushWaiting == 0)                        /* flush speed was never slowed by lack of production, so it's operating at max capacity */
                       ) {