FILE* const dstFile = ress.dstFile;
U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue;
+
+ typedef enum { noChange, slower, faster } speedChange_e;
+ speedChange_e speedChange = noChange;
unsigned inputBlocked = 0;
unsigned lastJobID = 0;
+ unsigned long long lastProduced = 0;
+ unsigned long long lastFlushedSize = 0;
DISPLAYLEVEL(6, "compression using zstd format \n");
stillToFlush = 1;
while ((inBuff.pos != inBuff.size) /* input buffer must be entirely ingested */
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
+
size_t const oldIPos = inBuff.pos;
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
EXM_THROW(25, "Write error : cannot write compressed block");
compressedfilesize += outBuff.pos;
}
+
+ /* display notification; and adapt compression level */
if (READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
- /* check input speed */
- if (zfp.currentJobID >= lastJobID+2) {
- if (inputBlocked<=1) { /* small tolerance */
- DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
- compressionLevel++;
+ /* check output speed */
+ if (zfp.currentJobID > 0) {
+ unsigned long long newlyProduced = zfp.produced - lastProduced;
+ unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
+ assert(zfp.produced >= lastProduced);
+ if (newlyProduced == 0) {
+ DISPLAYLEVEL(6, "no more data compression generation => buffers are full, compression waiting => output (or input) too slow \n")
+ speedChange = slower;
+ }
+
+ if ( (newlyProduced > (newlyFlushed * 9 / 8))
+ && (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) {
+ DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) \n", newlyProduced, newlyFlushed);
+ speedChange = slower;
+ }
+ lastProduced = zfp.produced;
+ lastFlushedSize = compressedfilesize;
+ }
+
+ /* course correct only if there is at least one job completed */
+ if (zfp.currentJobID > lastJobID) {
+ DISPLAYLEVEL(6, "compression level adaptation check \n")
+
+ /* check input speed */
+ if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */
+ if (inputBlocked <= 1) { /* small tolerance */
+ DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
+ speedChange = slower;
+ }
+ inputBlocked = 0;
+ }
+
+ if (speedChange == slower) {
+ DISPLAYLEVEL(6, "slower speed , higher compression \n")
+ compressionLevel ++;
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
+ speedChange = noChange;
}
lastJobID = zfp.currentJobID;
- inputBlocked = 0;
- }
+ } /* if (zfp.currentJobID > lastJobID) */
if (g_displayLevel >= 3) {
- DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%%",
+ DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
compressionLevel,
(U32)((zfp.ingested - zfp.consumed) >> 20),
(U32)(zfp.consumed >> 20),