BYTE* op = ostart;
BYTE* oend = op + dstBuff.size;
int blockNb;
- DEBUGLOG(2, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
+ DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
assert(job->cSize == 0);
for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
- DEBUGLOG(2, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
+ DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
size_t prefixSize;
size_t targetPrefixSize;
inBuff_t inBuff;
+ int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */
XXH64_state_t xxhState;
unsigned singleBlockingThread;
unsigned jobIDMask;
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{
ZSTD_frameProgression fs;
- DEBUGLOG(5, "ZSTDMT_getFrameProgression");
+ DEBUGLOG(6, "ZSTDMT_getFrameProgression");
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
fs.consumed = mtctx->consumed;
fs.produced = mtctx->produced;
assert(mtctx->inBuff.filled >= mtctx->prefixSize);
fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize);
{ unsigned jobNb;
- for (jobNb = mtctx->doneJobID ; jobNb < mtctx->nextJobID ; jobNb++) {
+ unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
+ DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
+ mtctx->doneJobID, lastJobNb, mtctx->jobReady)
+ for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
unsigned const wJobID = jobNb & mtctx->jobIDMask;
size_t const cResult = mtctx->jobs[wJobID].cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
{
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
- DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
- zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
- zcs->jobs[jobID].src = zcs->inBuff.buffer;
- zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
- zcs->jobs[jobID].srcSize = srcSize;
- zcs->jobs[jobID].consumed = 0;
- zcs->jobs[jobID].cSize = 0;
- zcs->jobs[jobID].prefixSize = zcs->prefixSize;
- assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
- zcs->jobs[jobID].params = zcs->params;
- /* do not calculate checksum within sections, but write it in header for first section */
- if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
- zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
- zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
- zcs->jobs[jobID].dstBuff = g_nullBuffer;
- zcs->jobs[jobID].cctxPool = zcs->cctxPool;
- zcs->jobs[jobID].bufPool = zcs->bufPool;
- zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
- zcs->jobs[jobID].lastChunk = endFrame;
- zcs->jobs[jobID].jobCompleted = 0;
- zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
- zcs->jobs[jobID].dstFlushed = 0;
- zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
- zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
-
- if (zcs->params.fParams.checksumFlag)
- XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
-
- /* get a new buffer for next input */
- if (!endFrame) {
- size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
- zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
- if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
- zcs->jobs[jobID].jobCompleted = 1;
- zcs->nextJobID++;
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- return ERROR(memory_allocation);
- }
- zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
- memmove(zcs->inBuff.buffer.start,
- (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
- zcs->inBuff.filled);
- zcs->prefixSize = newPrefixSize;
- } else { /* if (endFrame==1) */
- zcs->inBuff.buffer = g_nullBuffer;
- zcs->inBuff.filled = 0;
- zcs->prefixSize = 0;
- zcs->frameEnded = 1;
- if (zcs->nextJobID == 0) {
- /* single chunk exception : checksum is calculated directly within worker thread */
- zcs->params.fParams.checksumFlag = 0;
- } }
+ if (!zcs->jobReady) {
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
+ zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
+ zcs->jobs[jobID].src = zcs->inBuff.buffer;
+ zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
+ zcs->jobs[jobID].srcSize = srcSize;
+ zcs->jobs[jobID].consumed = 0;
+ zcs->jobs[jobID].cSize = 0;
+ zcs->jobs[jobID].prefixSize = zcs->prefixSize;
+ assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
+ zcs->jobs[jobID].params = zcs->params;
+ /* do not calculate checksum within sections, but write it in header for first section */
+ if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
+ zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
+ zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
+ zcs->jobs[jobID].dstBuff = g_nullBuffer;
+ zcs->jobs[jobID].cctxPool = zcs->cctxPool;
+ zcs->jobs[jobID].bufPool = zcs->bufPool;
+ zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
+ zcs->jobs[jobID].lastChunk = endFrame;
+ zcs->jobs[jobID].jobCompleted = 0;
+ zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
+ zcs->jobs[jobID].dstFlushed = 0;
+ zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
+ zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+
+ if (zcs->params.fParams.checksumFlag)
+ XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
+
+ /* get a new buffer for next input */
+ if (!endFrame) {
+ size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
+ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
+ if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
+ zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
+ }
+ zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
+ memmove(zcs->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
+ (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
+ zcs->inBuff.filled);
+ zcs->prefixSize = newPrefixSize;
+ } else { /* endFrame==1 => no need for another input buffer */
+ zcs->inBuff.buffer = g_nullBuffer;
+ zcs->inBuff.filled = 0;
+ zcs->prefixSize = 0;
+ zcs->frameEnded = 1;
+ if (zcs->nextJobID == 0) {
+ /* single chunk exception : checksum is calculated directly within worker thread */
+ zcs->params.fParams.checksumFlag = 0;
+ } } }
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
zcs->nextJobID,
zcs->jobs[jobID].lastChunk,
zcs->doneJobID,
zcs->doneJobID & zcs->jobIDMask);
- POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
- zcs->nextJobID++;
+ if (POOL_tryAdd(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID])) {
+ zcs->nextJobID++;
+ zcs->jobReady = 0;
+ } else {
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", zcs->nextJobID);
+ zcs->jobReady = 1;
+ }
return 0;
}
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
- DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
+ DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
if (zcs->doneJobID == zcs->nextJobID) {
- DEBUGLOG(2, "ZSTDMT_flushNextJob: doneJobID==nextJobID : nothing to flush !")
+ DEBUGLOG(5, "ZSTDMT_flushNextJob: doneJobID(%u)==(%u)nextJobID : nothing to flush !",
+ zcs->doneJobID, zcs->nextJobID)
return 0; /* all flushed ! */
}
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
if (zcs->jobs[wJobID].jobCompleted==1) break;
- DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)",
+ DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
}
}
assert(job.cSize >= job.dstFlushed);
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- DEBUGLOG(2, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+ DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u (completion:%.1f%%)",
+ (U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
}
if ( job.jobCompleted
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
- DEBUGLOG(2, "Job %u completed, moving to next one", zcs->doneJobID);
+ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
+ zcs->doneJobID, (U32)job.dstFlushed);
ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;
/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some more buffer to flush */
+ if (zcs->jobReady) return 1; /* some more work to do ! */
zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
return 0; /* everything flushed */
} }
}
/* fill input buffer */
- if (input->size > input->pos) { /* support NULL input */
+ if ( (!mtctx->jobReady)
+ && (input->size > input->pos) ) { /* support NULL input */
if (mtctx->inBuff.buffer.start == NULL) {
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, no forward input progress */
mtctx->inBuff.filled = 0;
} }
if (mtctx->inBuff.buffer.start != NULL) {
size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
- DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad);
+ DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
+ (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuffSize);
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
input->pos += toLoad;
mtctx->inBuff.filled += toLoad;
forwardInputProgress = toLoad>0;
} }
- if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
- && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { /* avoid overwriting job round buffer */
+ if ( (mtctx->jobReady)
+ || ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
+ && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) ) { /* avoid overwriting job round buffer */
CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) );
}
/* check for potential compressed data ready to be flushed */
- CHECK_F( ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress /* blockToFlush */) ); /* block if there was no forward input progress */
-
- if (input->pos < input->size) /* input not consumed : do not flush yet */
- endOp = ZSTD_e_continue;
-
- switch(endOp)
- {
- case ZSTD_e_flush:
- return ZSTDMT_flushStream(mtctx, output);
- case ZSTD_e_end:
- return ZSTDMT_endStream(mtctx, output);
- case ZSTD_e_continue:
- return 1;
- default:
- return ERROR(GENERIC); /* invalid endDirective */
+ { size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
+ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */
+ if (mtctx->jobReady) return remainingToFlush; /* some more input ready to be compressed */
+
+ switch(endOp)
+ {
+ case ZSTD_e_flush:
+ return ZSTDMT_flushStream(mtctx, output);
+ case ZSTD_e_end:
+ return ZSTDMT_endStream(mtctx, output);
+ case ZSTD_e_continue:
+ return 1;
+ default:
+ return ERROR(GENERIC); /* invalid endDirective */
+ }
}
}