ZSTD_EndDirective directive = ZSTD_e_continue;
/* stats */
+ ZSTD_frameProgression previous_zfp_update = { 0, 0, 0, 0, 0, 0 };
+ ZSTD_frameProgression previous_zfp_correction = { 0, 0, 0, 0, 0, 0 };
typedef enum { noChange, slower, faster } speedChange_e;
speedChange_e speedChange = noChange;
unsigned flushWaiting = 0;
if (fileSize != UTIL_FILESIZE_UNKNOWN) {
CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
}
- (void)compressionLevel; (void)srcFileName;
+ (void)srcFileName;
/* Main compression loop */
do {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
- /* check output speed */
- if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
- static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 }; /* note : requires fileio to run main thread */
-
- unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
- unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
- assert(zfp.produced >= cpszfp.produced);
- assert(g_nbWorkers >= 1);
-
- if ( (zfp.ingested == cpszfp.ingested) /* no data read : input buffer full */
- && (zfp.consumed == cpszfp.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */
- && (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
- ) {
- DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
- speedChange = slower;
- }
+ /* display progress notifications */
+ if (g_displayLevel >= 3) {
+ DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
+ compressionLevel,
+ (U32)((zfp.ingested - zfp.consumed) >> 20),
+ (U32)(zfp.consumed >> 20),
+ (U32)(zfp.produced >> 20),
+ cShare );
+ } else { /* summarized notifications if == 2; */
+ DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20));
+ if (fileSize != UTIL_FILESIZE_UNKNOWN)
+ DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20));
+ DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare);
+ DELAY_NEXT_UPDATE();
+ }
- cpszfp = zfp;
+ /* adaptive mode : statistics measurement and speed correction */
+ if (g_adaptiveMode) {
- if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
- && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
- ) {
- DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
- speedChange = slower;
- }
- flushWaiting = 0;
- }
+ /* check output speed */
+ if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
- /* course correct only if there is at least one new job completed */
- if (zfp.currentJobID > lastJobID) {
- DISPLAYLEVEL(6, "compression level adaptation check \n")
+ unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
+ unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
+ assert(zfp.produced >= previous_zfp_update.produced);
+ assert(g_nbWorkers >= 1);
- /* check input speed */
- if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */
- if (inputBlocked <= 0) {
- DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
+ if ( (zfp.ingested == previous_zfp_update.ingested) /* no data read : input buffer full */
+ && (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */
+ && (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
+ ) {
+ DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
+ speedChange = slower;
+ }
+
+ previous_zfp_update = zfp;
+
+ if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
+ && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
+ ) {
+ DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
speedChange = slower;
- } else if (speedChange == noChange) {
- static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0, 0, 0 };
- unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested;
- unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed;
- unsigned long long newlyProduced = zfp.produced - csuzfp.produced;
- unsigned long long newlyFlushed = zfp.flushed - csuzfp.flushed;
- csuzfp = zfp;
- assert(inputPresented > 0);
- DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
- inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
- (U32)newlyIngested, (U32)newlyConsumed,
- (U32)newlyFlushed, (U32)newlyProduced);
- if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
- && (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
- && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
- ) {
- DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
- newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
- speedChange = faster;
- }
}
- inputBlocked = 0;
- inputPresented = 0;
+ flushWaiting = 0;
}
- if (g_adaptiveMode) {
+ /* course correct only if there is at least one new job completed */
+ if (zfp.currentJobID > lastJobID) {
+ DISPLAYLEVEL(6, "compression level adaptation check \n")
+
+ /* check input speed */
+ if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */
+ if (inputBlocked <= 0) {
+ DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
+ speedChange = slower;
+ } else if (speedChange == noChange) {
+ unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
+ unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
+ unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
+ unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
+ previous_zfp_correction = zfp;
+ assert(inputPresented > 0);
+ DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
+ inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
+ (U32)newlyIngested, (U32)newlyConsumed,
+ (U32)newlyFlushed, (U32)newlyProduced);
+ if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
+ && (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
+ && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
+ ) {
+ DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
+ newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
+ speedChange = faster;
+ }
+ }
+ inputBlocked = 0;
+ inputPresented = 0;
+ }
+
if (speedChange == slower) {
DISPLAYLEVEL(6, "slower speed , higher compression \n")
compressionLevel ++;
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
}
speedChange = noChange;
- }
- lastJobID = zfp.currentJobID;
- } /* if (zfp.currentJobID > lastJobID) */
- if (g_displayLevel >= 3) {
- DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
- compressionLevel,
- (U32)((zfp.ingested - zfp.consumed) >> 20),
- (U32)(zfp.consumed >> 20),
- (U32)(zfp.produced >> 20),
- cShare );
- } else {
- /* g_displayLevel <= 2; only display notifications if == 2; */
- DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20));
- if (fileSize != UTIL_FILESIZE_UNKNOWN)
- DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20));
- DISPLAYLEVEL(2, "MB ==> %2.f%% ", cShare);
- DELAY_NEXT_UPDATE();
- }
- }
- }
+ lastJobID = zfp.currentJobID;
+ } /* if (zfp.currentJobID > lastJobID) */
+ } /* if (g_adaptiveMode) */
+ } /* if (READY_FOR_UPDATE()) */
+ } /* while ((inBuff.pos != inBuff.size) */
} while (directive != ZSTD_e_end);
if (ferror(srcFile)) {