]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fixed zstdmt cli freeze issue with large nb of threads
authorYann Collet <cyan@fb.com>
Wed, 25 Jan 2017 20:31:07 +0000 (12:31 -0800)
committerYann Collet <cyan@fb.com>
Wed, 25 Jan 2017 20:35:19 +0000 (12:35 -0800)
fileio.c was continually pushing more content without giving a chance to flush compressed one.
It would block the job queue when input data was accumulated too fast (requiring to define many threads).
Fixed : fileio flushes whatever it can after each input attempt.

lib/compress/zstdmt_compress.c
programs/fileio.c

index 99b2e68fbfc19e8e75fd8733a062b9f1ca2e33f4..04e0adfcd2770b9f7221fc6d4573119c52e7fe91 100644 (file)
@@ -233,6 +233,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
     DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
     if (job->cdict) {
         size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize);
+        if (job->cdict) DEBUGLOG(3, "using CDict ");
         if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
     } else {
         size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize);
@@ -296,6 +297,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
     U32 const minNbJobs = nbThreads + 2;
     U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
     U32 const nbJobs = 1 << nbJobsLog2;
+    //nbThreads = 1;   /* for tests */
     DEBUGLOG(5, "nbThreads : %u  ; minNbJobs : %u ;  nbJobsLog2 : %u ;  nbJobs : %u  \n",
             nbThreads, minNbJobs, nbJobsLog2, nbJobs);
     if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
@@ -490,6 +492,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
                                     ZSTD_parameters params, unsigned long long pledgedSrcSize)
 {
     ZSTD_customMem const cmem = { NULL, NULL, NULL };
+    DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog);
     if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize);
     if (zcs->allJobsCompleted == 0) {   /* previous job not correctly finished */
         ZSTDMT_waitForAllJobsCompleted(zcs);
@@ -596,6 +599,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
     } else {
         zcs->inBuff.buffer = g_nullBuffer;
         zcs->inBuff.filled = 0;
+        zcs->dictSize = 0;
         zcs->frameEnded = 1;
         if (zcs->nextJobID == 0)
             zcs->params.fParams.checksumFlag = 0;   /* single chunk : checksum is calculated directly within worker thread */
@@ -698,12 +702,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
 
 static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
 {
-    size_t const srcSize = zcs->inBuff.filled;
+    size_t const srcSize = zcs->inBuff.filled - zcs->dictSize;
 
-    if (srcSize) DEBUGLOG(1, "flushing : %u bytes left to compress", (U32)srcSize);
+    if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
     if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
        && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
-        CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize - zcs->dictSize, endFrame) );
+        CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
     }
 
     /* check if there is any data available to flush */
index 86db12acb030070f4de8b0b115019ec97d14b13d..db2bb55d660412214716951eccf41a4113f66b9e 100644 (file)
@@ -349,23 +349,22 @@ static int FIO_compressFilename_internal(cRess_t ress,
         readsize += inSize;
         DISPLAYUPDATE(2, "\rRead : %u MB  ", (U32)(readsize>>20));
 
-        /* Compress using buffered streaming */
         {   ZSTD_inBuffer  inBuff = { ress.srcBuffer, inSize, 0 };
-            ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
             while (inBuff.pos != inBuff.size) {   /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */
+                ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
 #ifdef ZSTD_MULTITHREAD
                 size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff);
 #else
                 size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
 #endif
                 if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result));
-            }
 
-            /* Write cBlock */
-            { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
-              if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); }
-            compressedfilesize += outBuff.pos;
-        }
+                /* Write compressed stream */
+                if (outBuff.pos) {
+                    size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
+                    if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName);
+                    compressedfilesize += outBuff.pos;
+        }   }   }
         DISPLAYUPDATE(2, "\rRead : %u MB  ==> %.2f%%   ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100);
     }