From: Yann Collet Date: Fri, 19 Jan 2018 18:01:40 +0000 (-0800) Subject: zstdmt uses POOL_tryAdd() to call a new worker X-Git-Tag: v1.3.4~1^2~67^2~26 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=70f81d603037242124d7ae446429462f996f16ab;p=thirdparty%2Fzstd.git zstdmt uses POOL_tryAdd() to call a new worker so that it's no longer a blocking call. This makes it possible to stream out data gradually, while waiting for a worker to become available. --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 1aa6b866f..68f974d7b 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -389,7 +389,7 @@ void ZSTDMT_compressChunk(void* jobDescription) BYTE* op = ostart; BYTE* oend = op + dstBuff.size; int blockNb; - DEBUGLOG(2, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); + DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); assert(job->cSize == 0); for (blockNb = 1; blockNb < nbBlocks; blockNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); @@ -400,7 +400,7 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; - DEBUGLOG(2, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", + DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); ZSTD_pthread_cond_signal(job->jobCompleted_cond); ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); @@ -458,6 +458,7 @@ struct ZSTDMT_CCtx_s { size_t prefixSize; size_t targetPrefixSize; inBuff_t inBuff; + int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */ XXH64_state_t xxhState; unsigned singleBlockingThread; unsigned jobIDMask; @@ -668,14 +669,17 @@ unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { ZSTD_frameProgression fs; - DEBUGLOG(5, "ZSTDMT_getFrameProgression"); + DEBUGLOG(6, "ZSTDMT_getFrameProgression"); ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); fs.consumed = mtctx->consumed; fs.produced = mtctx->produced; assert(mtctx->inBuff.filled >= mtctx->prefixSize); fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize); { unsigned jobNb; - for (jobNb = mtctx->doneJobID ; jobNb < mtctx->nextJobID ; jobNb++) { + unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); + DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", + mtctx->doneJobID, lastJobNb, mtctx->jobReady) + for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { unsigned const wJobID = jobNb & mtctx->jobIDMask; size_t const cResult = mtctx->jobs[wJobID].cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; @@ -999,59 +1003,60 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi { unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; - DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", - zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize); - zcs->jobs[jobID].src = zcs->inBuff.buffer; - zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; - zcs->jobs[jobID].srcSize = srcSize; - zcs->jobs[jobID].consumed = 0; - zcs->jobs[jobID].cSize = 0; - zcs->jobs[jobID].prefixSize = zcs->prefixSize; - assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize); - zcs->jobs[jobID].params = zcs->params; - /* do not calculate checksum within sections, but write it in header for first section */ - if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; - zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; - zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; - zcs->jobs[jobID].dstBuff = g_nullBuffer; - zcs->jobs[jobID].cctxPool = zcs->cctxPool; - zcs->jobs[jobID].bufPool = zcs->bufPool; - zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); - zcs->jobs[jobID].lastChunk = endFrame; - zcs->jobs[jobID].jobCompleted = 0; - zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag; - zcs->jobs[jobID].dstFlushed = 0; - zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; - zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; - - if (zcs->params.fParams.checksumFlag) - XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize); - - /* get a new buffer for next input */ - if (!endFrame) { - size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize); - zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool); - if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ - zcs->jobs[jobID].jobCompleted = 1; - zcs->nextJobID++; - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return ERROR(memory_allocation); - } - zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize; - memmove(zcs->inBuff.buffer.start, - (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize, - zcs->inBuff.filled); - zcs->prefixSize = newPrefixSize; - } else { /* if (endFrame==1) */ - zcs->inBuff.buffer = g_nullBuffer; - zcs->inBuff.filled = 0; - zcs->prefixSize = 0; - zcs->frameEnded = 1; - if (zcs->nextJobID == 0) { - /* single chunk exception : checksum is calculated directly within worker thread */ - zcs->params.fParams.checksumFlag = 0; - } } + if (!zcs->jobReady) { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", + zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize); + zcs->jobs[jobID].src = zcs->inBuff.buffer; + zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; + zcs->jobs[jobID].srcSize = srcSize; + zcs->jobs[jobID].consumed = 0; + zcs->jobs[jobID].cSize = 0; + zcs->jobs[jobID].prefixSize = zcs->prefixSize; + assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize); + zcs->jobs[jobID].params = zcs->params; + /* do not calculate checksum within sections, but write it in header for first section */ + if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; + zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; + zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; + zcs->jobs[jobID].dstBuff = g_nullBuffer; + zcs->jobs[jobID].cctxPool = zcs->cctxPool; + zcs->jobs[jobID].bufPool = zcs->bufPool; + zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); + zcs->jobs[jobID].lastChunk = endFrame; + zcs->jobs[jobID].jobCompleted = 0; + zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag; + zcs->jobs[jobID].dstFlushed = 0; + zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; + zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; + + if (zcs->params.fParams.checksumFlag) + XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize); + + /* get a new buffer for next input */ + if (!endFrame) { + size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize); + zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool); + if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ + zcs->jobs[jobID].jobCompleted = 1; + zcs->nextJobID++; + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return ERROR(memory_allocation); + } + zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize; + memmove(zcs->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ + (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize, + zcs->inBuff.filled); + zcs->prefixSize = newPrefixSize; + } else { /* endFrame==1 => no need for another input buffer */ + zcs->inBuff.buffer = g_nullBuffer; + zcs->inBuff.filled = 0; + zcs->prefixSize = 0; + zcs->frameEnded = 1; + if (zcs->nextJobID == 0) { + /* single chunk exception : checksum is calculated directly within worker thread */ + zcs->params.fParams.checksumFlag = 0; + } } } DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, @@ -1059,8 +1064,13 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); - POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ - zcs->nextJobID++; + if (POOL_tryAdd(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID])) { + zcs->nextJobID++; + zcs->jobReady = 0; + } else { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", zcs->nextJobID); + zcs->jobReady = 1; + } return 0; } @@ -1072,16 +1082,17 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; - DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); + DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); if (zcs->doneJobID == zcs->nextJobID) { - DEBUGLOG(2, "ZSTDMT_flushNextJob: doneJobID==nextJobID : nothing to flush !") + DEBUGLOG(5, "ZSTDMT_flushNextJob: doneJobID(%u)==(%u)nextJobID : nothing to flush !", + zcs->doneJobID, zcs->nextJobID) return 0; /* all flushed ! */ } ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) { if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ if (zcs->jobs[wJobID].jobCompleted==1) break; - DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)", + DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed); ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */ } @@ -1108,14 +1119,16 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi } assert(job.cSize >= job.dstFlushed); { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(2, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); + DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u (completion:%.1f%%)", + (U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; } if ( job.jobCompleted && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */ - DEBUGLOG(2, "Job %u completed, moving to next one", zcs->doneJobID); + DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", + zcs->doneJobID, (U32)job.dstFlushed); ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer; zcs->jobs[wJobID].jobCompleted = 0; @@ -1128,6 +1141,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some more buffer to flush */ + if (zcs->jobReady) return 1; /* some more work to do ! */ zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */ return 0; /* everything flushed */ } } @@ -1176,7 +1190,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* fill input buffer */ - if (input->size > input->pos) { /* support NULL input */ + if ( (!mtctx->jobReady) + && (input->size > input->pos) ) { /* support NULL input */ if (mtctx->inBuff.buffer.start == NULL) { mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, no forward input progress */ mtctx->inBuff.filled = 0; @@ -1186,34 +1201,36 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } } if (mtctx->inBuff.buffer.start != NULL) { size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); - DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad); + DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", + (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuffSize); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); input->pos += toLoad; mtctx->inBuff.filled += toLoad; forwardInputProgress = toLoad>0; } } - if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ - && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { /* avoid overwriting job round buffer */ + if ( (mtctx->jobReady) + || ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ + && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) ) { /* avoid overwriting job round buffer */ CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) ); } /* check for potential compressed data ready to be flushed */ - CHECK_F( ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress /* blockToFlush */) ); /* block if there was no forward input progress */ - - if (input->pos < input->size) /* input not consumed : do not flush yet */ - endOp = ZSTD_e_continue; - - switch(endOp) - { - case ZSTD_e_flush: - return ZSTDMT_flushStream(mtctx, output); - case ZSTD_e_end: - return ZSTDMT_endStream(mtctx, output); - case ZSTD_e_continue: - return 1; - default: - return ERROR(GENERIC); /* invalid endDirective */ + { size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */ + if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */ + if (mtctx->jobReady) return remainingToFlush; /* some more input ready to be compressed */ + + switch(endOp) + { + case ZSTD_e_flush: + return ZSTDMT_flushStream(mtctx, output); + case ZSTD_e_end: + return ZSTDMT_endStream(mtctx, output); + case ZSTD_e_continue: + return 1; + default: + return ERROR(GENERIC); /* invalid endDirective */ + } } } diff --git a/programs/fileio.c b/programs/fileio.c index 7045a5323..6039a75a2 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -792,6 +792,7 @@ static int FIO_compressFilename_internal(cRess_t ress, /* Fill input Buffer */ size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; + DISPLAYLEVEL(6, "fread %u bytes from source \n", (U32)inSize); readsize += inSize; if (inSize == 0 || (fileSize != UTIL_FILESIZE_UNKNOWN && readsize == fileSize)) @@ -803,32 +804,23 @@ static int FIO_compressFilename_internal(cRess_t ress, CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); /* Write compressed stream */ - DISPLAYLEVEL(6, "ZSTD_compress_generic,ZSTD_e_continue: generated %u bytes \n", - (U32)outBuff.pos); + DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => intput pos(%u)<=(%u)size ; output generated %u bytes \n", + (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos); if (outBuff.pos) { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); compressedfilesize += outBuff.pos; } + if (READY_FOR_UPDATE()) { + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); + DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", + (U32)(zfp.ingested >> 20), + (U32)(zfp.consumed >> 20), + (U32)(zfp.produced >> 20), + (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 ); + } } -#if 1 - if (READY_FOR_UPDATE()) { - ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); - DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", - (U32)(zfp.ingested >> 20), - (U32)(zfp.consumed >> 20), - (U32)(zfp.produced >> 20), - (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 ); - } -#else - if (fileSize == UTIL_FILESIZE_UNKNOWN) { - DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20)); - } else { - DISPLAYUPDATE(2, "\rRead : %u / %u MB", - (U32)(readsize>>20), (U32)(fileSize>>20)); - } -#endif } while (directive != ZSTD_e_end); finish: