const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */
size_t prefixSize; /* set by mtctx, then read by worker => no barrier */
size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */
- unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */
- unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */
+ unsigned firstJob; /* set by mtctx, then read by worker => no barrier */
+ unsigned lastJob; /* set by mtctx, then read by worker => no barrier */
ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */
unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */
unsigned frameChecksumNeeded; /* used only by mtctx */
} ZSTDMT_jobDescription;
-/* ZSTDMT_compressChunk() is a POOL_function type */
-void ZSTDMT_compressChunk(void* jobDescription)
+/* ZSTDMT_compressionJob() is a POOL_function type */
+void ZSTDMT_compressionJob(void* jobDescription)
{
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
/* init */
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize);
- assert(job->firstChunk); /* only allowed for first job */
+ assert(job->firstJob); /* only allowed for first job */
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { /* srcStart points at reloaded section */
- U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : job->srcSize;
+ U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->srcSize;
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
- { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
+ { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob);
if (ZSTD_isError(forceWindowError)) {
job->cSize = forceWindowError;
goto _endJob;
job->cSize = initError;
goto _endJob;
} } }
- if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */
+ if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, src, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
- DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize);
+ DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
ZSTD_invalidateRepCodes(cctx);
}
/* compress */
- { size_t const blockSize = ZSTD_BLOCKSIZE_MAX;
- int const nbBlocks = (int)((job->srcSize + (blockSize-1)) / blockSize);
+ { size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX;
+ int const nbChunks = (int)((job->srcSize + (chunkSize-1)) / chunkSize);
const BYTE* ip = (const BYTE*) src;
BYTE* const ostart = (BYTE*)dstBuff.start;
BYTE* op = ostart;
BYTE* oend = op + dstBuff.capacity;
- int blockNb;
- if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * blockSize); /* check overflow */
- DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
+ int chunkNb;
+ if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * chunkSize); /* check overflow */
+ DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->srcSize, nbChunks);
assert(job->cSize == 0);
- for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
- size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, blockSize);
+ for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
+ size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
- ip += blockSize;
+ ip += chunkSize;
op += cSize; assert(op < oend);
/* stats */
- ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); /* note : it's a mtctx mutex */
+ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->cSize += cSize;
- job->consumed = blockSize * blockNb;
- DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
+ job->consumed = chunkSize * chunkNb;
+ DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
/* last block */
- assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */
- if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
- size_t const lastBlockSize1 = job->srcSize & (blockSize-1);
- size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=blockSize)) ? blockSize : lastBlockSize1;
- size_t const cSize = (job->lastChunk) ?
+ assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
+ if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {
+ size_t const lastBlockSize1 = job->srcSize & (chunkSize-1);
+ size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=chunkSize)) ? chunkSize : lastBlockSize1;
+ size_t const cSize = (job->lastJob) ?
ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) :
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
- DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
+ DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
/* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */
-static unsigned ZSTDMT_computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) {
+static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbThreads) {
assert(nbThreads>0);
- { size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2);
- size_t const chunkMaxSize = chunkSizeTarget << 2;
- size_t const passSizeMax = chunkMaxSize * nbThreads;
+ { size_t const jobSizeTarget = (size_t)1 << (windowLog + 2);
+ size_t const jobMaxSize = jobSizeTarget << 2;
+ size_t const passSizeMax = jobMaxSize * nbThreads;
unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
- unsigned const nbChunksLarge = multiplier * nbThreads;
- unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1;
- unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads);
- return (multiplier>1) ? nbChunksLarge : nbChunksSmall;
+ unsigned const nbJobsLarge = multiplier * nbThreads;
+ unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1;
+ unsigned const nbJobsSmall = MIN(nbJobsMax, nbThreads);
+ return (multiplier>1) ? nbJobsLarge : nbJobsSmall;
} }
/* ZSTDMT_compress_advanced_internal() :
ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
- unsigned nbChunks = ZSTDMT_computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads);
- size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
- size_t const avgChunkSize = (((proposedChunkSize-1) & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
+ unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbThreads);
+ size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
+ size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */
const char* const srcStart = (const char*)src;
size_t remainingSrcSize = srcSize;
- unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
+ unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0;
XXH64_state_t xxh64;
assert(jobParams.nbThreads == 0);
assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
- DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbChunks=%2u (rawSize=%u bytes; fixedSize=%u) ",
- nbChunks, (U32)proposedChunkSize, (U32)avgChunkSize);
+ DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
+ nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
- if ((nbChunks==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
+ if ((nbJobs==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
}
- assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
- ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) );
+ assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
+ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
XXH64_reset(&xxh64, 0);
- if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
- U32 nbJobs = nbChunks;
+ if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */
+ U32 jobsTableSize = nbJobs;
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
mtctx->jobIDMask = 0;
- mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
+ mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem);
if (mtctx->jobs==NULL) return ERROR(memory_allocation);
- assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */
- mtctx->jobIDMask = nbJobs - 1;
+ assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0)); /* ensure jobsTableSize is a power of 2 */
+ mtctx->jobIDMask = jobsTableSize - 1;
}
{ unsigned u;
- for (u=0; u<nbChunks; u++) {
- size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
- size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize);
+ for (u=0; u<nbJobs; u++) {
+ size_t const jobSize = MIN(remainingSrcSize, avgJobSize);
+ size_t const dstBufferCapacity = ZSTD_compressBound(jobSize);
buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
size_t dictSize = u ? overlapSize : 0;
mtctx->jobs[u].srcBuff = g_nullBuffer;
mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize;
- mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */
+ mtctx->jobs[u].srcSize = jobSize; assert(jobSize > 0); /* avoid job.srcSize == 0 */
mtctx->jobs[u].consumed = 0;
mtctx->jobs[u].cSize = 0;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
mtctx->jobs[u].dstBuff = dstBuffer;
mtctx->jobs[u].cctxPool = mtctx->cctxPool;
mtctx->jobs[u].bufPool = mtctx->bufPool;
- mtctx->jobs[u].firstChunk = (u==0);
- mtctx->jobs[u].lastChunk = (u==nbChunks-1);
+ mtctx->jobs[u].firstJob = (u==0);
+ mtctx->jobs[u].lastJob = (u==nbJobs-1);
if (params.fParams.checksumFlag) {
- XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
+ XXH64_update(&xxh64, srcStart + frameStartPos, jobSize);
}
- DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize);
+ DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)jobSize);
DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12);
- POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
+ POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]);
- frameStartPos += chunkSize;
+ frameStartPos += jobSize;
dstBufferPos += dstBufferCapacity;
- remainingSrcSize -= chunkSize;
+ remainingSrcSize -= jobSize;
} }
/* collect result */
{ size_t error = 0, dstPos = 0;
- unsigned chunkID;
- for (chunkID=0; chunkID<nbChunks; chunkID++) {
- DEBUGLOG(5, "waiting for chunk %u ", chunkID);
- ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[chunkID].job_mutex);
- while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) {
- DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
- ZSTD_pthread_cond_wait(&mtctx->jobs[chunkID].job_cond, &mtctx->jobs[chunkID].job_mutex);
+ unsigned jobID;
+ for (jobID=0; jobID<nbJobs; jobID++) {
+ DEBUGLOG(5, "waiting for job %u ", jobID);
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
+ while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
+ DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID);
+ ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
}
- ZSTD_pthread_mutex_unlock(&mtctx->jobs[chunkID].job_mutex);
- DEBUGLOG(5, "ready to write chunk %u ", chunkID);
+ ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
+ DEBUGLOG(5, "ready to write job %u ", jobID);
- mtctx->jobs[chunkID].prefixStart = NULL;
- { size_t const cSize = mtctx->jobs[chunkID].cSize;
+ mtctx->jobs[jobID].prefixStart = NULL;
+ { size_t const cSize = mtctx->jobs[jobID].cSize;
if (ZSTD_isError(cSize)) error = cSize;
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
- if (chunkID) { /* note : chunk 0 is written directly at dst, which is correct position */
+ if (jobID) { /* note : job 0 is written directly at dst, which is correct position */
if (!error)
- memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */
- if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */
- DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst);
- ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff);
+ memmove((char*)dst + dstPos, mtctx->jobs[jobID].dstBuff.start, cSize); /* may overlap when job compressed within dst */
+ if (jobID >= compressWithinDst) { /* job compressed into its own buffer, which must be released */
+ DEBUGLOG(5, "releasing buffer %u>=%u", jobID, compressWithinDst);
+ ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
} }
- mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
- mtctx->jobs[chunkID].cSize = 0;
+ mtctx->jobs[jobID].dstBuff = g_nullBuffer;
+ mtctx->jobs[jobID].cSize = 0;
dstPos += cSize ;
}
- } /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
+ } /* for (jobID=0; jobID<nbJobs; jobID++) */
DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag);
if (params.fParams.checksumFlag) {
*/
static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{
- assert(job->lastChunk == 1);
- assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */
- assert(job->firstChunk == 0); /* cannot be first chunk, as it also needs to create frame header */
+ assert(job->lastJob == 1);
+ assert(job->srcSize == 0); /* last job is empty -> will be simplified into a last empty block */
+ assert(job->firstJob == 0); /* cannot be first job, as it also needs to create frame header */
/* A job created by streaming variant starts with a src buffer, but no dst buffer.
* It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx.
* When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx.
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
mtctx->jobs[jobID].bufPool = mtctx->bufPool;
- mtctx->jobs[jobID].firstChunk = (mtctx->nextJobID==0);
- mtctx->jobs[jobID].lastChunk = endFrame;
+ mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
+ mtctx->jobs[jobID].lastJob = endFrame;
mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
mtctx->jobs[jobID].dstFlushed = 0;
mtctx->inBuff.prefixSize = 0;
mtctx->frameEnded = endFrame;
if (mtctx->nextJobID == 0) {
- /* single chunk exception : checksum is already calculated directly within worker thread */
+ /* single job exception : checksum is already calculated directly within worker thread */
mtctx->params.fParams.checksumFlag = 0;
} }
if ( (srcSize == 0)
- && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
+ && (mtctx->nextJobID>0)/*single job must also write frame header*/ ) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))",
mtctx->nextJobID,
(U32)mtctx->jobs[jobID].srcSize,
- mtctx->jobs[jobID].lastChunk,
+ mtctx->jobs[jobID].lastJob,
mtctx->nextJobID,
jobID);
- if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) {
+ if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
} else {
if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */
if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */
if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */
- mtctx->allJobsCompleted = mtctx->frameEnded; /* all chunks are entirely flushed => if this one is last one, frame is completed */
+ mtctx->allJobsCompleted = mtctx->frameEnded; /* all jobs are entirely flushed => if this one is last one, frame is completed */
if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */
return 0; /* internal buffers fully flushed */
}