typedef struct {
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
- size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
- ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */
- ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */
- ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */
- ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */
- buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
+ size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
+ ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) workers */
+ ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) workers */
+ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
+ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
+ buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */
- const void* prefixStart; /* set by mtctx, then read by worker => no barrier */
+ const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */
size_t prefixSize; /* set by mtctx, then read by worker => no barrier */
- size_t srcSize; /* set by mtctx, then read by worker => no barrier */
+ size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */
unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */
unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */
ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
- if (dstBuff.start == NULL) {
+ if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */
dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
- ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
- ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
/* init */
if ( (srcSize == 0)
&& (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
mtctx->nextJobID++;
}
}
- DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))",
mtctx->nextJobID,
(U32)mtctx->jobs[jobID].srcSize,
mtctx->jobs[jobID].lastChunk,
- mtctx->doneJobID,
- mtctx->doneJobID & mtctx->jobIDMask);
+ mtctx->nextJobID,
+ jobID);
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
{
unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
- DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
+ DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u , job %u <= %u)",
+ blockToFlush, mtctx->doneJobID, mtctx->nextJobID);
assert(output->size >= output->pos);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
- if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) {
+ if ( blockToFlush
+ && (mtctx->doneJobID < mtctx->nextJobID) ) {
assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
- while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) {
+ while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { /* nothing to flush */
if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) {
- DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond",
+ DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize);
break;
}
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */
} }
- /* some output is available to be flushed */
- { ZSTDMT_jobDescription job = mtctx->jobs[wJobID];
+ /* try to flush something */
+ { size_t cSize = mtctx->jobs[wJobID].cSize;
+ size_t const srcConsumed = mtctx->jobs[wJobID].consumed;
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
- if (ZSTD_isError(job.cSize)) {
+ if (ZSTD_isError(cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
- mtctx->doneJobID, ZSTD_getErrorName(job.cSize));
+ mtctx->doneJobID, ZSTD_getErrorName(cSize));
ZSTDMT_waitForAllJobsCompleted(mtctx);
ZSTDMT_releaseAllJobResources(mtctx);
- return job.cSize;
+ return cSize;
}
/* add frame checksum if necessary (can only happen once) */
- assert(job.consumed <= job.srcSize);
- if ( (job.consumed == job.srcSize)
- && job.frameChecksumNeeded ) {
+ assert(srcConsumed <= mtctx->jobs[wJobID].srcSize);
+ if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed -> worker no longer active */
+ && mtctx->jobs[wJobID].frameChecksumNeeded ) {
U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState);
DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
- MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
- job.cSize += 4;
- mtctx->jobs[wJobID].cSize += 4;
+ MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum);
+ cSize += 4;
+ mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */
mtctx->jobs[wJobID].frameChecksumNeeded = 0;
}
- assert(job.cSize >= job.dstFlushed);
- if (job.dstBuff.start != NULL) { /* dst buffer present : some work is ongoing or completed */
- size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
- DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
- (U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100);
- memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+ if (cSize > 0) { /* compression is ongoing or completed */
+ size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
+ DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u)",
+ (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize);
+ assert(cSize >= mtctx->jobs[wJobID].dstFlushed);
+ memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite);
output->pos += toWrite;
- job.dstFlushed += toWrite;
+ mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */
- if ( (job.consumed == job.srcSize) /* job completed */
- && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => free this job position */
+ if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed */
+ && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
- mtctx->doneJobID, (U32)job.dstFlushed);
- assert(job.srcBuff.start == NULL); /* srcBuff supposed already released */
- ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff);
+ mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
+ assert(mtctx->jobs[wJobID].srcBuff.start == NULL); /* srcBuff supposed already released */
+ ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
- mtctx->consumed += job.srcSize;
- mtctx->produced += job.cSize;
+ mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */
+ mtctx->consumed += mtctx->jobs[wJobID].srcSize;
+ mtctx->produced += cSize;
mtctx->doneJobID++;
- } else {
- mtctx->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */
} }
/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
- if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
- if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */
+ if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed);
+ if (mtctx->jobs[wJobID].srcSize > srcConsumed) return 1; /* current job not completely compressed */
}
- if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs to flush */
- if (mtctx->jobReady) return 1; /* one job is ready and queued! */
- if (mtctx->inBuff.filled > 0) return 1; /* input not empty */
- mtctx->allJobsCompleted = mtctx->frameEnded; /* last frame entirely flushed */
- if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
- return 0; /* everything flushed */
+ if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */
+ if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */
+ if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */
+ mtctx->allJobsCompleted = mtctx->frameEnded; /* all chunks are entirely flushed => if this one is last one, frame is completed */
+ if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */
+ return 0; /* internal buffers fully flushed */
}
}
/* single-pass shortcut (note : synchronous-mode) */
- if ( (mtctx->nextJobID == 0) /* just started */
- && (mtctx->inBuff.filled == 0) /* nothing buffered */
- && (!mtctx->jobReady) /* no job already created */
- && (endOp == ZSTD_e_end) /* end order */
+ if ( (mtctx->nextJobID == 0) /* just started */
+ && (mtctx->inBuff.filled == 0) /* nothing buffered */
+ && (!mtctx->jobReady) /* no job already created */
+ && (endOp == ZSTD_e_end) /* end order */
&& (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */
size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
(char*)output->dst + output->pos, output->size - output->pos,