]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
removed static variables
authorYann Collet <cyan@fb.com>
Wed, 19 Sep 2018 22:09:45 +0000 (15:09 -0700)
committerYann Collet <cyan@fb.com>
Wed, 19 Sep 2018 22:25:50 +0000 (15:25 -0700)
so that --adapt can work on multiple input files too

lib/compress/zstdmt_compress.c
programs/fileio.c

index 244690cdce564f77c0f1ec0ad44823c4c5b2a949..39255fdcfdd421413cb71df2c9f8cb2f5628f3e1 100644 (file)
@@ -1530,7 +1530,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
         mtctx->jobs[jobID].jobID = mtctx->nextJobID;
         mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
         mtctx->jobs[jobID].lastJob = endFrame;
-        mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
+        mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);
         mtctx->jobs[jobID].dstFlushed = 0;
 
         /* Update the round buffer pos and clear the input buffer to be reset */
index 701e30e8f1a85b611813ced3df178f1b165bed5a..00f0bc2632d745b42d277b00767888d894ebab12 100644 (file)
@@ -807,6 +807,8 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
     ZSTD_EndDirective directive = ZSTD_e_continue;
 
     /* stats */
+    ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
+    ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
     typedef enum { noChange, slower, faster } speedChange_e;
     speedChange_e speedChange = noChange;
     unsigned flushWaiting = 0;
@@ -820,7 +822,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
     if (fileSize != UTIL_FILESIZE_UNKNOWN) {
         CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
     }
-    (void)compressionLevel; (void)srcFileName;
+    (void)srcFileName;
 
     /* Main compression loop */
     do {
@@ -863,69 +865,85 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                 ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
                 double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
 
-                /* check output speed */
-                if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */
-                    static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 };   /* note : requires fileio to run main thread */
-
-                    unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
-                    unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
-                    assert(zfp.produced >= cpszfp.produced);
-                    assert(g_nbWorkers >= 1);
-
-                    if ( (zfp.ingested == cpszfp.ingested)   /* no data read : input buffer full */
-                      && (zfp.consumed == cpszfp.consumed)   /* no data compressed : no more buffer to compress OR compression is really slow */
-                      && (zfp.nbActiveWorkers == 0)          /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
-                      ) {
-                        DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
-                        speedChange = slower;
-                    }
+                /* display progress notifications */
+                if (g_displayLevel >= 3) {
+                    DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
+                                compressionLevel,
+                                (U32)((zfp.ingested - zfp.consumed) >> 20),
+                                (U32)(zfp.consumed >> 20),
+                                (U32)(zfp.produced >> 20),
+                                cShare );
+                } else {   /* summarized notifications if == 2; */
+                    DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20));
+                    if (fileSize != UTIL_FILESIZE_UNKNOWN)
+                        DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20));
+                    DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare);
+                    DELAY_NEXT_UPDATE();
+                }
 
-                    cpszfp = zfp;
+                /* adaptive mode : statistics measurement and speed correction */
+                if (g_adaptiveMode) {
 
-                    if ( (newlyProduced > (newlyFlushed * 9 / 8))   /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
-                      && (flushWaiting == 0)                        /* flush speed was never slowed by lack of production, so it's operating at max capacity */
-                      ) {
-                        DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
-                        speedChange = slower;
-                    }
-                    flushWaiting = 0;
-                }
+                    /* check output speed */
+                    if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */
 
-                /* course correct only if there is at least one new job completed */
-                if (zfp.currentJobID > lastJobID) {
-                    DISPLAYLEVEL(6, "compression level adaptation check \n")
+                        unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
+                        unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
+                        assert(zfp.produced >= previous_zfp_update.produced);
+                        assert(g_nbWorkers >= 1);
 
-                    /* check input speed */
-                    if (zfp.currentJobID > g_nbWorkers+1) {   /* warm up period, to fill all workers */
-                        if (inputBlocked <= 0) {
-                            DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
+                        if ( (zfp.ingested == previous_zfp_update.ingested)   /* no data read : input buffer full */
+                          && (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no more buffer to compress OR compression is really slow */
+                          && (zfp.nbActiveWorkers == 0)          /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
+                          ) {
+                            DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
+                            speedChange = slower;
+                        }
+
+                        previous_zfp_update = zfp;
+
+                        if ( (newlyProduced > (newlyFlushed * 9 / 8))   /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
+                          && (flushWaiting == 0)                        /* flush speed was never slowed by lack of production, so it's operating at max capacity */
+                          ) {
+                            DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
                             speedChange = slower;
-                        } else if (speedChange == noChange) {
-                            static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0, 0, 0 };
-                            unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested;
-                            unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed;
-                            unsigned long long newlyProduced = zfp.produced - csuzfp.produced;
-                            unsigned long long newlyFlushed = zfp.flushed - csuzfp.flushed;
-                            csuzfp = zfp;
-                            assert(inputPresented > 0);
-                            DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
-                                            inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
-                                            (U32)newlyIngested, (U32)newlyConsumed,
-                                            (U32)newlyFlushed, (U32)newlyProduced);
-                            if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
-                              && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
-                              && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
-                            ) {
-                                DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
-                                                newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
-                                speedChange = faster;
-                            }
                         }
-                        inputBlocked = 0;
-                        inputPresented = 0;
+                        flushWaiting = 0;
                     }
 
-                    if (g_adaptiveMode) {
+                    /* course correct only if there is at least one new job completed */
+                    if (zfp.currentJobID > lastJobID) {
+                        DISPLAYLEVEL(6, "compression level adaptation check \n")
+
+                        /* check input speed */
+                        if (zfp.currentJobID > g_nbWorkers+1) {   /* warm up period, to fill all workers */
+                            if (inputBlocked <= 0) {
+                                DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
+                                speedChange = slower;
+                            } else if (speedChange == noChange) {
+                                unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
+                                unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
+                                unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
+                                unsigned long long newlyFlushed  = zfp.flushed  - previous_zfp_correction.flushed;
+                                previous_zfp_correction = zfp;
+                                assert(inputPresented > 0);
+                                DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
+                                                inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
+                                                (U32)newlyIngested, (U32)newlyConsumed,
+                                                (U32)newlyFlushed, (U32)newlyProduced);
+                                if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
+                                  && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
+                                  && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
+                                ) {
+                                    DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
+                                                    newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
+                                    speedChange = faster;
+                                }
+                            }
+                            inputBlocked = 0;
+                            inputPresented = 0;
+                        }
+
                         if (speedChange == slower) {
                             DISPLAYLEVEL(6, "slower speed , higher compression \n")
                             compressionLevel ++;
@@ -940,27 +958,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                             ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
                         }
                         speedChange = noChange;
-                    }
-                    lastJobID = zfp.currentJobID;
-                }    /* if (zfp.currentJobID > lastJobID) */
 
-                if (g_displayLevel >= 3) {
-                    DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
-                                compressionLevel,
-                                (U32)((zfp.ingested - zfp.consumed) >> 20),
-                                (U32)(zfp.consumed >> 20),
-                                (U32)(zfp.produced >> 20),
-                                cShare );
-                } else {
-                    /* g_displayLevel <= 2; only display notifications if == 2; */
-                    DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20));
-                    if (fileSize != UTIL_FILESIZE_UNKNOWN)
-                        DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20));
-                    DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare);
-                    DELAY_NEXT_UPDATE();
-                }
-            }
-        }
+                        lastJobID = zfp.currentJobID;
+                    }  /* if (zfp.currentJobID > lastJobID) */
+                }  /* if (g_adaptiveMode) */
+            }  /* if (READY_FOR_UPDATE()) */
+        }  /* while ((inBuff.pos != inBuff.size) */
     } while (directive != ZSTD_e_end);
 
     if (ferror(srcFile)) {