]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt uses POOL_tryAdd() to call a new worker
authorYann Collet <cyan@fb.com>
Fri, 19 Jan 2018 18:01:40 +0000 (10:01 -0800)
committerYann Collet <cyan@fb.com>
Fri, 19 Jan 2018 18:01:40 +0000 (10:01 -0800)
so that it's no longer a blocking call.
This makes it possible to stream out data gradually,
while waiting for a worker to become available.

lib/compress/zstdmt_compress.c
programs/fileio.c

index 1aa6b866f6fb3db73bc5ea30617d9937a80e3e02..68f974d7bdde27765ee6738b6dd0f59e68d60b91 100644 (file)
@@ -389,7 +389,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
         BYTE* op = ostart;
         BYTE* oend = op + dstBuff.size;
         int blockNb;
-        DEBUGLOG(2, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
+        DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
         assert(job->cSize == 0);
         for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
             size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
@@ -400,7 +400,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
             ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);   /* note : it's a mtctx mutex */
             job->cSize += cSize;
             job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
-            DEBUGLOG(2, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
+            DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
                         (U32)cSize, (U32)job->cSize);
             ZSTD_pthread_cond_signal(job->jobCompleted_cond);
             ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
@@ -458,6 +458,7 @@ struct ZSTDMT_CCtx_s {
     size_t prefixSize;
     size_t targetPrefixSize;
     inBuff_t inBuff;
+    int jobReady;        /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */
     XXH64_state_t xxhState;
     unsigned singleBlockingThread;
     unsigned jobIDMask;
@@ -668,14 +669,17 @@ unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
 {
     ZSTD_frameProgression fs;
-    DEBUGLOG(5, "ZSTDMT_getFrameProgression");
+    DEBUGLOG(6, "ZSTDMT_getFrameProgression");
     ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
     fs.consumed = mtctx->consumed;
     fs.produced = mtctx->produced;
     assert(mtctx->inBuff.filled >= mtctx->prefixSize);
     fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize);
     {   unsigned jobNb;
-        for (jobNb = mtctx->doneJobID ; jobNb < mtctx->nextJobID ; jobNb++) {
+        unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
+        DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
+                    mtctx->doneJobID, lastJobNb, mtctx->jobReady)
+        for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
             unsigned const wJobID = jobNb & mtctx->jobIDMask;
             size_t const cResult = mtctx->jobs[wJobID].cSize;
             size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
@@ -999,59 +1003,60 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 {
     unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
-    DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
-                zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
-    zcs->jobs[jobID].src = zcs->inBuff.buffer;
-    zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
-    zcs->jobs[jobID].srcSize = srcSize;
-    zcs->jobs[jobID].consumed = 0;
-    zcs->jobs[jobID].cSize = 0;
-    zcs->jobs[jobID].prefixSize = zcs->prefixSize;
-    assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
-    zcs->jobs[jobID].params = zcs->params;
-    /* do not calculate checksum within sections, but write it in header for first section */
-    if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
-    zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
-    zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
-    zcs->jobs[jobID].dstBuff = g_nullBuffer;
-    zcs->jobs[jobID].cctxPool = zcs->cctxPool;
-    zcs->jobs[jobID].bufPool = zcs->bufPool;
-    zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
-    zcs->jobs[jobID].lastChunk = endFrame;
-    zcs->jobs[jobID].jobCompleted = 0;
-    zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
-    zcs->jobs[jobID].dstFlushed = 0;
-    zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
-    zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
-
-    if (zcs->params.fParams.checksumFlag)
-        XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
-
-    /* get a new buffer for next input */
-    if (!endFrame) {
-        size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
-        zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
-        if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
-            zcs->jobs[jobID].jobCompleted = 1;
-            zcs->nextJobID++;
-            ZSTDMT_waitForAllJobsCompleted(zcs);
-            ZSTDMT_releaseAllJobResources(zcs);
-            return ERROR(memory_allocation);
-        }
-        zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
-        memmove(zcs->inBuff.buffer.start,
-            (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
-            zcs->inBuff.filled);
-        zcs->prefixSize = newPrefixSize;
-    } else {   /* if (endFrame==1) */
-        zcs->inBuff.buffer = g_nullBuffer;
-        zcs->inBuff.filled = 0;
-        zcs->prefixSize = 0;
-        zcs->frameEnded = 1;
-        if (zcs->nextJobID == 0) {
-            /* single chunk exception : checksum is calculated directly within worker thread */
-            zcs->params.fParams.checksumFlag = 0;
-    }   }
+    if (!zcs->jobReady) {
+        DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
+                    zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
+        zcs->jobs[jobID].src = zcs->inBuff.buffer;
+        zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
+        zcs->jobs[jobID].srcSize = srcSize;
+        zcs->jobs[jobID].consumed = 0;
+        zcs->jobs[jobID].cSize = 0;
+        zcs->jobs[jobID].prefixSize = zcs->prefixSize;
+        assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
+        zcs->jobs[jobID].params = zcs->params;
+        /* do not calculate checksum within sections, but write it in header for first section */
+        if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
+        zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
+        zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
+        zcs->jobs[jobID].dstBuff = g_nullBuffer;
+        zcs->jobs[jobID].cctxPool = zcs->cctxPool;
+        zcs->jobs[jobID].bufPool = zcs->bufPool;
+        zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
+        zcs->jobs[jobID].lastChunk = endFrame;
+        zcs->jobs[jobID].jobCompleted = 0;
+        zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
+        zcs->jobs[jobID].dstFlushed = 0;
+        zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
+        zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+
+        if (zcs->params.fParams.checksumFlag)
+            XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
+
+        /* get a new buffer for next input */
+        if (!endFrame) {
+            size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
+            zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
+            if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
+                zcs->jobs[jobID].jobCompleted = 1;
+                zcs->nextJobID++;
+                ZSTDMT_waitForAllJobsCompleted(zcs);
+                ZSTDMT_releaseAllJobResources(zcs);
+                return ERROR(memory_allocation);
+            }
+            zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
+            memmove(zcs->inBuff.buffer.start,   /* copy end of current job into next job, as "prefix" */
+                (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
+                zcs->inBuff.filled);
+            zcs->prefixSize = newPrefixSize;
+        } else {   /* endFrame==1 => no need for another input buffer */
+            zcs->inBuff.buffer = g_nullBuffer;
+            zcs->inBuff.filled = 0;
+            zcs->prefixSize = 0;
+            zcs->frameEnded = 1;
+            if (zcs->nextJobID == 0) {
+                /* single chunk exception : checksum is calculated directly within worker thread */
+                zcs->params.fParams.checksumFlag = 0;
+    }   }   }
 
     DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)",
                 zcs->nextJobID,
@@ -1059,8 +1064,13 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
                 zcs->jobs[jobID].lastChunk,
                 zcs->doneJobID,
                 zcs->doneJobID & zcs->jobIDMask);
-    POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* this call is blocking when thread worker pool is exhausted */
-    zcs->nextJobID++;
+    if (POOL_tryAdd(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID])) {
+        zcs->nextJobID++;
+        zcs->jobReady = 0;
+    } else {
+        DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", zcs->nextJobID);
+        zcs->jobReady = 1;
+    }
     return 0;
 }
 
@@ -1072,16 +1082,17 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
 {
     unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
-    DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
+    DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
     if (zcs->doneJobID == zcs->nextJobID) {
-        DEBUGLOG(2, "ZSTDMT_flushNextJob: doneJobID==nextJobID : nothing to flush !")
+        DEBUGLOG(5, "ZSTDMT_flushNextJob: doneJobID(%u)==(%u)nextJobID : nothing to flush !",
+                    zcs->doneJobID, zcs->nextJobID)
         return 0;   /* all flushed ! */
     }
     ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
     while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
         if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; }  /* nothing ready to be flushed => skip */
         if (zcs->jobs[wJobID].jobCompleted==1) break;
-        DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)",
+        DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
                     zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
         ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);  /* block when nothing available to flush but more to come */
     }
@@ -1108,14 +1119,16 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
         }
         assert(job.cSize >= job.dstFlushed);
         {   size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(2, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+            DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u (completion:%.1f%%)",
+                        (U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
         }
         if ( job.jobCompleted
           && (job.dstFlushed == job.cSize) ) {   /* output buffer fully flushed => move to next one */
-            DEBUGLOG(2, "Job %u completed, moving to next one", zcs->doneJobID);
+            DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
+                    zcs->doneJobID, (U32)job.dstFlushed);
             ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
             zcs->jobs[wJobID].dstBuff = g_nullBuffer;
             zcs->jobs[wJobID].jobCompleted = 0;
@@ -1128,6 +1141,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
         /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
         if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
         if (zcs->doneJobID < zcs->nextJobID) return 1;   /* still some more buffer to flush */
+        if (zcs->jobReady) return 1;   /* some more work to do ! */
         zcs->allJobsCompleted = zcs->frameEnded;   /* last frame entirely flushed */
         return 0;   /* everything flushed */
 }   }
@@ -1176,7 +1190,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     }
 
     /* fill input buffer */
-    if (input->size > input->pos) {   /* support NULL input */
+    if ( (!mtctx->jobReady)
+      && (input->size > input->pos) ) {   /* support NULL input */
         if (mtctx->inBuff.buffer.start == NULL) {
             mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);  /* note : allocation can fail, in which case, no forward input progress */
             mtctx->inBuff.filled = 0;
@@ -1186,34 +1201,36 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
         }   }
         if (mtctx->inBuff.buffer.start != NULL) {
             size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
-            DEBUGLOG(5, "inBuff:%08X;  inBuffSize=%u;  ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad);
+            DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
+                        (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuffSize);
             memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
             input->pos += toLoad;
             mtctx->inBuff.filled += toLoad;
             forwardInputProgress = toLoad>0;
     }   }
 
-    if ( (mtctx->inBuff.filled >= newJobThreshold)  /* filled enough : let's compress */
-      && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) {   /* avoid overwriting job round buffer */
+    if ( (mtctx->jobReady)
+      || ( (mtctx->inBuff.filled >= newJobThreshold)  /* filled enough : let's compress */
+        && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) ) {   /* avoid overwriting job round buffer */
         CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) );
     }
 
     /* check for potential compressed data ready to be flushed */
-    CHECK_F( ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress /* blockToFlush */) ); /* block if there was no forward input progress */
-
-    if (input->pos < input->size)  /* input not consumed : do not flush yet */
-        endOp = ZSTD_e_continue;
-
-    switch(endOp)
-    {
-        case ZSTD_e_flush:
-            return ZSTDMT_flushStream(mtctx, output);
-        case ZSTD_e_end:
-            return ZSTDMT_endStream(mtctx, output);
-        case ZSTD_e_continue:
-            return 1;
-        default:
-            return ERROR(GENERIC);   /* invalid endDirective */
+    {   size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
+        if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not flush yet */
+        if (mtctx->jobReady) return remainingToFlush;   /* some more input ready to be compressed */
+
+        switch(endOp)
+        {
+            case ZSTD_e_flush:
+                return ZSTDMT_flushStream(mtctx, output);
+            case ZSTD_e_end:
+                return ZSTDMT_endStream(mtctx, output);
+            case ZSTD_e_continue:
+                return 1;
+            default:
+                return ERROR(GENERIC);   /* invalid endDirective */
+        }
     }
 }
 
index 7045a5323fbf868d71d0afe137c292113d5cb622..6039a75a24566d06422cebd4798c7e12d2973047 100644 (file)
@@ -792,6 +792,7 @@ static int FIO_compressFilename_internal(cRess_t ress,
         /* Fill input Buffer */
         size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
         ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
+        DISPLAYLEVEL(6, "fread %u bytes from source \n", (U32)inSize);
         readsize += inSize;
 
         if (inSize == 0 || (fileSize != UTIL_FILESIZE_UNKNOWN && readsize == fileSize))
@@ -803,32 +804,23 @@ static int FIO_compressFilename_internal(cRess_t ress,
             CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
 
             /* Write compressed stream */
-            DISPLAYLEVEL(6, "ZSTD_compress_generic,ZSTD_e_continue: generated %u bytes \n",
-                            (U32)outBuff.pos);
+            DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => intput pos(%u)<=(%u)size ; output generated %u bytes \n",
+                            (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos);
             if (outBuff.pos) {
                 size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
                 if (sizeCheck!=outBuff.pos)
                     EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName);
                 compressedfilesize += outBuff.pos;
             }
+            if (READY_FOR_UPDATE()) {
+                ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
+                DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
+                                (U32)(zfp.ingested >> 20),
+                                (U32)(zfp.consumed >> 20),
+                                (U32)(zfp.produced >> 20),
+                                (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 );
+            }
         }
-#if 1
-    if (READY_FOR_UPDATE()) {
-        ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
-        DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
-                        (U32)(zfp.ingested >> 20),
-                        (U32)(zfp.consumed >> 20),
-                        (U32)(zfp.produced >> 20),
-                        (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 );
-    }
-#else
-        if (fileSize == UTIL_FILESIZE_UNKNOWN) {
-            DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20));
-        } else {
-            DISPLAYUPDATE(2, "\rRead : %u / %u MB",
-                                (U32)(readsize>>20), (U32)(fileSize>>20));
-        }
-#endif
     } while (directive != ZSTD_e_end);
 
 finish: