unsigned lastChunk;
unsigned jobCompleted;
unsigned frameChecksumNeeded;
- ZSTD_pthread_mutex_t* jobCompleted_mutex;
- ZSTD_pthread_cond_t* jobCompleted_cond;
+ ZSTD_pthread_mutex_t* mtctx_mutex;
+ ZSTD_pthread_cond_t* mtctx_cond;
ZSTD_CCtx_params params;
const ZSTD_CDict* cdict;
ZSTDMT_CCtxPool* cctxPool;
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
- ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
+ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->dstBuff = dstBuff;
- ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
+ ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
/* init */
ip += ZSTD_BLOCKSIZE_MAX;
op += cSize; assert(op < oend);
/* stats */
- ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
+ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
- ZSTD_pthread_cond_signal(job->jobCompleted_cond);
- ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
+ ZSTD_pthread_cond_signal(job->mtctx_cond);
+ ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
/* last block */
if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */
- ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
+ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = job->srcSize;
- ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
+ ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
}
#endif
ZSTDMT_releaseBuffer(job->bufPool, job->src);
job->src = g_nullBuffer; job->srcStart = NULL;
/* report */
- ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->consumed = job->srcSize;
job->jobCompleted = 1;
- ZSTD_pthread_cond_signal(job->jobCompleted_cond);
- ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
+ ZSTD_pthread_cond_signal(job->mtctx_cond);
+ ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
ZSTDMT_jobDescription* jobs;
ZSTDMT_bufferPool* bufPool;
ZSTDMT_CCtxPool* cctxPool;
- ZSTD_pthread_mutex_t jobCompleted_mutex;
- ZSTD_pthread_cond_t jobCompleted_cond;
+ ZSTD_pthread_mutex_t mtctx_mutex;
+ ZSTD_pthread_cond_t mtctx_cond;
ZSTD_CCtx_params params;
size_t targetSectionSize;
size_t inBuffSize;
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
- if (ZSTD_pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) {
+ if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) {
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
- if (ZSTD_pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) {
+ if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) {
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
while (zcs->doneJobID < zcs->nextJobID) {
unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
- ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
while (zcs->jobs[jobID].jobCompleted==0) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
- ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+ ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex);
}
- ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+ ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
zcs->doneJobID++;
}
}
ZSTDMT_freeBufferPool(mtctx->bufPool);
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdictLocal);
- ZSTD_pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
- ZSTD_pthread_cond_destroy(&mtctx->jobCompleted_cond);
+ ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex);
+ ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond);
ZSTD_free(mtctx, mtctx->cMem);
return 0;
}
{
ZSTD_frameProgression fs;
DEBUGLOG(6, "ZSTDMT_getFrameProgression");
- ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
fs.consumed = mtctx->consumed;
fs.produced = mtctx->produced;
assert(mtctx->inBuff.filled >= mtctx->prefixSize);
fs.produced += produced;
}
}
- ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
+ ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
return fs;
}
mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1);
mtctx->jobs[u].jobCompleted = 0;
- mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
- mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
+ mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
+ mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
if (params.fParams.checksumFlag) {
XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
unsigned chunkID;
for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
- ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
while (mtctx->jobs[chunkID].jobCompleted==0) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
- ZSTD_pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
+ ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
}
- ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
+ ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
mtctx->jobs[chunkID].srcStart = NULL;
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
zcs->jobs[jobID].dstFlushed = 0;
- zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
- zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+ zcs->jobs[jobID].mtctx_mutex = &zcs->mtctx_mutex;
+ zcs->jobs[jobID].mtctx_cond = &zcs->mtctx_cond;
if (zcs->params.fParams.checksumFlag)
XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
zcs->params.fParams.checksumFlag = 0;
} } }
- DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
+ DEBUGLOG(2, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
zcs->nextJobID,
(U32)zcs->jobs[jobID].srcSize,
zcs->jobs[jobID].lastChunk,
DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
assert(output->size >= output->pos);
- ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
if (zcs->jobs[wJobID].jobCompleted==1) break;
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
- ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
+ ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex); /* block when nothing available to flush but more to come */
} }
/* some output is available to be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
- ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+ ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
if (ZSTD_isError(job.cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
zcs->doneJobID, ZSTD_getErrorName(job.cSize));
/* single-pass shortcut (note : synchronous-mode) */
if ( (mtctx->nextJobID == 0) /* just started */
&& (mtctx->inBuff.filled == 0) /* nothing buffered */
+ && (!mtctx->jobReady) /* no job already created */
&& (endOp == ZSTD_e_end) /* end order */
- && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */
+ && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */
size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
(char*)output->dst + output->pos, output->size - output->pos,
(const char*)input->src + input->pos, input->size - input->pos,
/* check for potential compressed data ready to be flushed */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
- if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */
+ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */
return remainingToFlush;
}
}