typedef struct {
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
- ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) workers */
- ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) workers */
+ ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */
+ ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
ip += blockSize;
op += cSize; assert(op < oend);
/* stats */
- ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */
+ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = blockSize * blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
- ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */
- ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
+ ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */
+ ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
/* last block */
assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */
- ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->cSize += cSize;
- ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
+ ZSTD_pthread_mutex_unlock(&job->job_mutex);
} }
_endJob:
ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff);
job->srcBuff = g_nullBuffer; job->prefixStart = NULL;
/* report */
- ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->consumed = job->srcSize;
- ZSTD_pthread_cond_signal(job->mtctx_cond);
- ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
+ ZSTD_pthread_cond_signal(&job->job_cond);
+ ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
ZSTDMT_jobDescription* jobs;
ZSTDMT_bufferPool* bufPool;
ZSTDMT_CCtxPool* cctxPool;
- ZSTD_pthread_mutex_t mtctx_mutex;
- ZSTD_pthread_cond_t mtctx_cond;
ZSTD_CCtx_params params;
size_t targetSectionSize;
size_t targetPrefixSize;
};
/* ZSTDMT_allocJobsTable()
- * allocate, and just init to zero, a job table.
+ * allocate and init a job table.
* update *nbJobsPtr to next power of 2 value, as size of table
* No reverse free() function is provided : just use ZSTD_free() */
-static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
+static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
{
U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
U32 const nbJobs = 1 << nbJobsLog2;
- *nbJobsPtr = nbJobs;
- return (ZSTDMT_jobDescription*) ZSTD_calloc(
+ ZSTDMT_jobDescription* const jobTable = ZSTD_calloc(
nbJobs * sizeof(ZSTDMT_jobDescription), cMem);
+ U32 jobNb;
+ if (jobTable==NULL) return NULL;
+ *nbJobsPtr = nbJobs;
+ for (jobNb=0; jobNb<nbJobs; jobNb++) {
+ ZSTD_pthread_mutex_init(&jobTable[jobNb].job_mutex, NULL); /* <======== should check init result */
+ ZSTD_pthread_cond_init(&jobTable[jobNb].job_cond, NULL); /* <======== should check init result */
+ }
+ return jobTable;
+}
+
+static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem)
+{
+ U32 jobNb;
+ if (jobTable == NULL) return;
+ for (jobNb=0; jobNb<nbJobs; jobNb++) {
+ ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex);
+ ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond);
+ }
+ ZSTD_free(jobTable, cMem);
}
/* ZSTDMT_CCtxParam_setNbThreads():
mtctx->cMem = cMem;
mtctx->allJobsCompleted = 1;
mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem);
- mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
+ mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1;
mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem);
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
- if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) {
- ZSTDMT_freeCCtx(mtctx);
- return NULL;
- }
- if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) {
- ZSTDMT_freeCCtx(mtctx);
- return NULL;
- }
DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads);
return mtctx;
}
DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
while (mtctx->doneJobID < mtctx->nextJobID) {
unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
- ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
- ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
+ ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
}
- ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
+ ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
mtctx->doneJobID++;
}
}
if (mtctx==NULL) return 0; /* compatible with free on NULL */
POOL_free(mtctx->factory); /* stop and free worker threads */
ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */
- ZSTD_free(mtctx->jobs, mtctx->cMem);
+ ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
ZSTDMT_freeBufferPool(mtctx->bufPool);
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdictLocal);
- ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex);
- ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond);
ZSTD_free(mtctx, mtctx->cMem);
return 0;
}
{
ZSTD_frameProgression fps;
DEBUGLOG(6, "ZSTDMT_getFrameProgression");
- ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
fps.consumed = mtctx->consumed;
fps.produced = mtctx->produced;
assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize);
mtctx->doneJobID, lastJobNb, mtctx->jobReady)
for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
unsigned const wJobID = jobNb & mtctx->jobIDMask;
- size_t const cResult = mtctx->jobs[wJobID].cSize;
- size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
- fps.consumed += mtctx->jobs[wJobID].consumed;
- fps.ingested += mtctx->jobs[wJobID].srcSize;
- fps.produced += produced;
+ ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex);
+ { size_t const cResult = mtctx->jobs[wJobID].cSize;
+ size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
+ fps.consumed += mtctx->jobs[wJobID].consumed;
+ fps.ingested += mtctx->jobs[wJobID].srcSize;
+ fps.produced += produced;
+ }
+ ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
}
}
- ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
return fps;
}
if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
U32 nbJobs = nbChunks;
- ZSTD_free(mtctx->jobs, mtctx->cMem);
+ ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
mtctx->jobIDMask = 0;
- mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem);
+ mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
if (mtctx->jobs==NULL) return ERROR(memory_allocation);
assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1;
mtctx->jobs[u].bufPool = mtctx->bufPool;
mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1);
- mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
- mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
if (params.fParams.checksumFlag) {
XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
unsigned chunkID;
for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
- ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[chunkID].job_mutex);
while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
- ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
+ ZSTD_pthread_cond_wait(&mtctx->jobs[chunkID].job_cond, &mtctx->jobs[chunkID].job_mutex);
}
- ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
+ ZSTD_pthread_mutex_unlock(&mtctx->jobs[chunkID].job_mutex);
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
mtctx->jobs[chunkID].prefixStart = NULL;
mtctx->jobs[jobID].lastChunk = endFrame;
mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
mtctx->jobs[jobID].dstFlushed = 0;
- mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex;
- mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond;
if (mtctx->params.fParams.checksumFlag)
XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize);
blockToFlush, mtctx->doneJobID, mtctx->nextJobID);
assert(output->size >= output->pos);
- ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
if ( blockToFlush
&& (mtctx->doneJobID < mtctx->nextJobID) ) {
assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
}
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
- ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */
+ ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */
} }
/* try to flush something */
{ size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */
size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */
size_t const srcSize = mtctx->jobs[wJobID].srcSize; /* read-only, could be done after mutex lock, but no-declaration-after-statement */
- ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
+ ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
if (ZSTD_isError(cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
mtctx->doneJobID, ZSTD_getErrorName(cSize));