unsigned inputPresented = 0;
unsigned inputBlocked = 0;
unsigned lastJobID = 0;
+ UTIL_time_t lastAdaptTime = UTIL_getTime();
+ U64 const adaptEveryMicro = REFRESH_RATE;
+
UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize);
DISPLAYLEVEL(6, "compression using zstd format \n");
compressedfilesize += outBuff.pos;
}
- /* display notification; and adapt compression level */
- if (READY_FOR_UPDATE()) {
+ /* adaptive mode : statistics measurement and speed correction */
+ if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) {
+ ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
+
+ lastAdaptTime = UTIL_getTime();
+
+ /* check output speed */
+ if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
+
+ unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
+ unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
+ assert(zfp.produced >= previous_zfp_update.produced);
+ assert(prefs->nbWorkers >= 1);
+
+ /* test if compression is blocked
+ * either because output is slow and all buffers are full
+ * or because input is slow and no job can start while waiting for at least one buffer to be filled.
+ * note : exclude starting part, since currentJobID > 1 */
+ if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
+ && (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */
+ ) {
+ DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
+ speedChange = slower;
+ }
+
+ previous_zfp_update = zfp;
+
+ if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
+ && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
+ ) {
+ DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
+ speedChange = slower;
+ }
+ flushWaiting = 0;
+ }
+
+ /* course correct only if there is at least one new job completed */
+ if (zfp.currentJobID > lastJobID) {
+ DISPLAYLEVEL(6, "compression level adaptation check \n")
+
+ /* check input speed */
+ if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */
+ if (inputBlocked <= 0) {
+ DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
+ speedChange = slower;
+ } else if (speedChange == noChange) {
+ unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
+ unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
+ unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
+ unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
+ previous_zfp_correction = zfp;
+ assert(inputPresented > 0);
+ DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
+ inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
+ (unsigned)newlyIngested, (unsigned)newlyConsumed,
+ (unsigned)newlyFlushed, (unsigned)newlyProduced);
+ if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
+ && (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
+ && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
+ ) {
+ DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
+ newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
+ speedChange = faster;
+ }
+ }
+ inputBlocked = 0;
+ inputPresented = 0;
+ }
+
+ if (speedChange == slower) {
+ DISPLAYLEVEL(6, "slower speed , higher compression \n")
+ compressionLevel ++;
+ if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
+ if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
+ compressionLevel += (compressionLevel == 0); /* skip 0 */
+ ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
+ }
+ if (speedChange == faster) {
+ DISPLAYLEVEL(6, "faster speed , lighter compression \n")
+ compressionLevel --;
+ if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
+ compressionLevel -= (compressionLevel == 0); /* skip 0 */
+ ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
+ }
+ speedChange = noChange;
+
+ lastJobID = zfp.currentJobID;
+ } /* if (zfp.currentJobID > lastJobID) */
+ } /* if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) */
+
+ /* display notification */
+ if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
UTIL_HumanReadableSize_t const produced_hrs = UTIL_makeHumanReadableSize(zfp.produced);
+ DELAY_NEXT_UPDATE();
+
/* display progress notifications */
DISPLAY_PROGRESS("\r%79s\r", ""); /* Clear out the current displayed line */
if (g_display_prefs.displayLevel >= 3) {
if (fileSize != UTIL_FILESIZE_UNKNOWN)
DISPLAY_PROGRESS("/%6.*f%4s", file_hrs.precision, file_hrs.value, file_hrs.suffix);
DISPLAY_PROGRESS(" ==> %2.f%%", cShare);
- DELAY_NEXT_UPDATE();
}
-
- /* adaptive mode : statistics measurement and speed correction */
- if (prefs->adaptiveMode) {
-
- /* check output speed */
- if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
-
- unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
- unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
- assert(zfp.produced >= previous_zfp_update.produced);
- assert(prefs->nbWorkers >= 1);
-
- /* test if compression is blocked
- * either because output is slow and all buffers are full
- * or because input is slow and no job can start while waiting for at least one buffer to be filled.
- * note : exclude starting part, since currentJobID > 1 */
- if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
- && (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */
- ) {
- DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
- speedChange = slower;
- }
-
- previous_zfp_update = zfp;
-
- if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
- && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
- ) {
- DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
- speedChange = slower;
- }
- flushWaiting = 0;
- }
-
- /* course correct only if there is at least one new job completed */
- if (zfp.currentJobID > lastJobID) {
- DISPLAYLEVEL(6, "compression level adaptation check \n")
-
- /* check input speed */
- if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */
- if (inputBlocked <= 0) {
- DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
- speedChange = slower;
- } else if (speedChange == noChange) {
- unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
- unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
- unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
- unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
- previous_zfp_correction = zfp;
- assert(inputPresented > 0);
- DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
- inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
- (unsigned)newlyIngested, (unsigned)newlyConsumed,
- (unsigned)newlyFlushed, (unsigned)newlyProduced);
- if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
- && (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
- && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
- ) {
- DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
- newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
- speedChange = faster;
- }
- }
- inputBlocked = 0;
- inputPresented = 0;
- }
-
- if (speedChange == slower) {
- DISPLAYLEVEL(6, "slower speed , higher compression \n")
- compressionLevel ++;
- if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
- if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
- compressionLevel += (compressionLevel == 0); /* skip 0 */
- ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
- }
- if (speedChange == faster) {
- DISPLAYLEVEL(6, "faster speed , lighter compression \n")
- compressionLevel --;
- if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
- compressionLevel -= (compressionLevel == 0); /* skip 0 */
- ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
- }
- speedChange = noChange;
-
- lastJobID = zfp.currentJobID;
- } /* if (zfp.currentJobID > lastJobID) */
- } /* if (g_adaptiveMode) */
- } /* if (READY_FOR_UPDATE()) */
+ } /* if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) */
} /* while ((inBuff.pos != inBuff.size) */
} while (directive != ZSTD_e_end);