* update *nbJobsPtr to next power of 2 value, as size of table */
static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
{
- U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
+ U32 const margin = MAX(4, *nbJobsPtr / 2);
+ U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr + margin) + 1;
U32 const nbJobs = 1 << nbJobsLog2;
U32 jobNb;
ZSTDMT_jobDescription* const jobTable = (ZSTDMT_jobDescription*)
}
static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {
- U32 nbJobs = nbWorkers + 2;
- if (nbJobs > mtctx->jobIDMask+1) { /* need more job capacity */
+ U32 const margin = MAX(4, nbWorkers);
+ U32 nbJobs = nbWorkers + margin;
+ if (nbJobs >= mtctx->jobIDMask) { /* need more job capacity */
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
mtctx->jobIDMask = 0;
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);
}
-MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
+MEM_STATIC ZSTDMT_CCtx*
+ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
{
ZSTDMT_CCtx* mtctx;
U32 nbJobs = nbWorkers + 2;
assert(job->consumed == 0);
}
+/* @returns 1 if there is anything ready to flush */
+static int ZSTDMT_anythingToFlush(const ZSTDMT_CCtx* mtctx)
+{
+ unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
+ int r = 0;
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
+ r = mtctx->jobs[wJobID].dstFlushed < mtctx->jobs[wJobID].cSize;
+ ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
+ return r;
+}
+
static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
{
unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
mtctx->jobs[jobID].lastJob,
mtctx->nextJobID,
jobID);
- if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
+
+ if (ZSTDMT_anythingToFlush(mtctx)) {
+ if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
+ mtctx->nextJobID++;
+ mtctx->jobReady = 0;
+ } else {
+ DEBUGLOG(2, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
+ mtctx->jobReady = 1;
+ }
+ } else {
+ /* block here, wait for next available job */
+ POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID]);
mtctx->nextJobID++;
mtctx->jobReady = 0;
- } else {
- DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID);
- mtctx->jobReady = 1;
}
+
return 0;
}