do { \
if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
+ DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "acquiring mutex %s", #mutex); \
ZSTD_pthread_mutex_lock(mutex); \
{ unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
unsigned long long const elapsedTime = (afterTime-beforeTime); \
} \
} while (0)
+#define COND_WAIT_DLEVEL 6
+#define ZSTD_PTHREAD_COND_WAIT(_cond, _mutex) \
+ do { \
+ if (DEBUGLEVEL >= COND_WAIT_DLEVEL) { \
+ DEBUGLOG(COND_WAIT_DLEVEL, "waiting on condition %s", #_cond); \
+ ZSTD_pthread_cond_wait(_cond,_mutex); \
+ DEBUGLOG(COND_WAIT_DLEVEL, "condition %s triggered", #_cond); \
+ } else { \
+ ZSTD_pthread_cond_wait(_cond,_mutex); \
+ } \
+ } while (0)
+
#else
# define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m)
+# define ZSTD_PTHREAD_COND_WAIT(c,m) ZSTD_pthread_cond_wait(c,m)
# define DEBUG_PRINTHEX(l,p,n) do { } while (0)
#endif
size_t const arraySize = bufPool->totalBuffers * sizeof(Buffer);
unsigned u;
size_t totalBufferSize = 0;
- ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
for (u=0; u<bufPool->totalBuffers; u++)
totalBufferSize += bufPool->buffers[u].capacity;
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
* as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */
static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize)
{
- ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize);
bufPool->bufferSize = bSize;
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
{
size_t const bSize = bufPool->bufferSize;
DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize);
- ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
if (bufPool->nbBuffers) { /* try to use an existing buffer */
Buffer const buf = bufPool->buffers[--(bufPool->nbBuffers)];
size_t const availBufferSize = buf.capacity;
{
DEBUGLOG(5, "ZSTDMT_releaseBuffer");
if (buf.start == NULL) return; /* compatible with release on NULL */
- ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&bufPool->poolMutex);
if (bufPool->nbBuffers < bufPool->totalBuffers) {
bufPool->buffers[bufPool->nbBuffers++] = buf; /* stored for later use */
DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
/* only works during initialization phase, not during compression */
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
{
- ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&cctxPool->poolMutex);
{ unsigned const nbWorkers = cctxPool->totalCCtx;
size_t const poolSize = sizeof(*cctxPool);
size_t const arraySize = cctxPool->totalCCtx * sizeof(ZSTD_CCtx*);
static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
{
DEBUGLOG(5, "ZSTDMT_getCCtx");
- ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&cctxPool->poolMutex);
if (cctxPool->availCCtx) {
cctxPool->availCCtx--;
{ ZSTD_CCtx* const cctx = cctxPool->cctxs[cctxPool->availCCtx];
static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
{
if (cctx==NULL) return; /* compatibility with release on NULL */
- ZSTD_pthread_mutex_lock(&pool->poolMutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&pool->poolMutex);
if (pool->availCCtx < pool->totalCCtx)
pool->cctxs[pool->availCCtx++] = cctx;
else {
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
while (serialState->nextJobID < jobID) {
DEBUGLOG(5, "wait for serialState->cond");
- ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);
+ ZSTD_PTHREAD_COND_WAIT(&serialState->cond, &serialState->mutex);
}
/* A future job may error and skip our job */
if (serialState->nextJobID == jobID) {
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */
- ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
+ ZSTD_pthread_mutex_t* flush_mutex; /* Thread-safe - used by mtctx and worker */
+ ZSTD_pthread_cond_t* flush_cond; /* Thread-safe - used by mtctx and worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
job->consumed = chunkSize * chunkNb;
DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
- ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */
- ZSTD_pthread_mutex_unlock(&job->job_mutex);
+ if (job->flush_mutex != NULL) {
+ ZSTD_pthread_mutex_unlock(&job->job_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(job->flush_mutex);
+ ZSTD_pthread_cond_signal(job->flush_cond); /* warns some more data is ready to be flushed */
+ ZSTD_pthread_mutex_unlock(job->flush_mutex);
+ } else {
+ ZSTD_pthread_mutex_unlock(&job->job_mutex);
+ }
}
/* last block */
assert(chunkSize > 0);
if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0);
job->cSize += lastCBlockSize;
job->consumed = job->src.size; /* when job->consumed == job->src.size , compression job is presumed completed */
- ZSTD_pthread_cond_signal(&job->job_cond);
+ if (job->flush_mutex != NULL) {
+ ZSTD_pthread_mutex_unlock(&job->job_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(job->flush_mutex);
+ ZSTD_pthread_cond_signal(job->flush_cond); /* warns some more data is ready to be flushed */
+ ZSTD_pthread_mutex_unlock(job->flush_mutex);
+ ZSTD_pthread_mutex_lock(&job->job_mutex);
+ }
+ job->flush_mutex = NULL;
+ job->flush_cond = NULL;
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
ZSTDMT_CCtxPool* cctxPool;
ZSTDMT_seqPool* seqPool;
ZSTD_CCtx_params params;
- size_t targetSectionSize;
+ size_t targetJobSize;
size_t targetPrefixSize;
int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */
InBuff_t inBuff;
RoundBuff_t roundBuff;
SerialState serial;
RSyncState_t rsync;
+ ZSTD_pthread_mutex_t flushMutex;
+ ZSTD_pthread_cond_t flushCond;
unsigned jobIDMask;
unsigned doneJobID;
unsigned nextJobID;
if (jobTable == NULL) return;
for (jobNb=0; jobNb<nbJobs; jobNb++) {
ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex);
- ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond);
}
ZSTD_customFree(jobTable, cMem);
}
*nbJobsPtr = nbJobs;
for (jobNb=0; jobNb<nbJobs; jobNb++) {
initError |= ZSTD_pthread_mutex_init(&jobTable[jobNb].job_mutex, NULL);
- initError |= ZSTD_pthread_cond_init(&jobTable[jobNb].job_cond, NULL);
}
if (initError != 0) {
ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
initError = ZSTDMT_serialState_init(&mtctx->serial);
+ initError |= ZSTD_pthread_mutex_init(&mtctx->flushMutex, NULL);
+ initError |= ZSTD_pthread_cond_init(&mtctx->flushCond, NULL);
mtctx->roundBuff = kNullRoundBuff;
if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) {
ZSTDMT_freeCCtx(mtctx);
for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
/* Copy the mutex/cond out */
ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex;
- ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond;
DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
/* Clear the job description, but keep the mutex/cond */
ZSTD_memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID]));
mtctx->jobs[jobID].job_mutex = mutex;
- mtctx->jobs[jobID].job_cond = cond;
}
mtctx->inBuff.buffer = g_nullBuffer;
mtctx->inBuff.filled = 0;
unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
- DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
- ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
+ DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);
+ /* we want to block and wait for data to flush */
+ if (mtctx->jobs[jobID].flush_mutex == NULL) {
+ mtctx->jobs[jobID].flush_mutex = &mtctx->flushMutex;
+ mtctx->jobs[jobID].flush_cond = &mtctx->flushCond;
+ }
+ ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
+ DEBUGLOG(2, "ZSTDMT_waitForAllJobsCompleted: let's wait for job progress");
+ ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex);
+ DEBUGLOG(2, "ZSTDMT_waitForAllJobsCompleted: waiting completed");
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
mtctx->doneJobID++;
ZSTD_freeCDict(mtctx->cdictLocal);
if (mtctx->roundBuff.buffer)
ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem);
+ ZSTD_pthread_mutex_destroy(&mtctx->flushMutex);
+ ZSTD_pthread_cond_destroy(&mtctx->flushCond);
ZSTD_customFree(mtctx, mtctx->cMem);
return 0;
}
for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
unsigned const wJobID = jobNb & mtctx->jobIDMask;
ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
- ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
/* look into oldest non-fully-flushed job */
{ unsigned const wJobID = jobID & mtctx->jobIDMask;
ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID];
- ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+ ZSTD_PTHREAD_MUTEX_LOCK(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
mtctx->frameContentSize = pledgedSrcSize;
mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(¶ms);
DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10));
- mtctx->targetSectionSize = params.jobSize;
- if (mtctx->targetSectionSize == 0) {
- mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(¶ms);
+ mtctx->targetJobSize = params.jobSize;
+ if (mtctx->targetJobSize == 0) {
+ mtctx->targetJobSize = 1ULL << ZSTDMT_computeTargetJobLog(¶ms);
}
- assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX);
+ assert(mtctx->targetJobSize <= (size_t)ZSTDMT_JOBSIZE_MAX);
if (params.rsyncable) {
/* Aim for the targetsectionSize as the average job size. */
- U32 const jobSizeKB = (U32)(mtctx->targetSectionSize >> 10);
+ U32 const jobSizeKB = (U32)(mtctx->targetJobSize >> 10);
U32 const rsyncBits = (assert(jobSizeKB >= 1), ZSTD_highbit32(jobSizeKB) + 10);
/* We refuse to create jobs < RSYNC_MIN_BLOCK_SIZE bytes, so make sure our
* expected job size is at least 4x larger. */
mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);
}
- if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
- DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), (U32)params.jobSize);
- DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
- ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
+ if (mtctx->targetJobSize < mtctx->targetPrefixSize) mtctx->targetJobSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
+ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetJobSize>>10), (U32)params.jobSize);
+ DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetJobSize>>10));
+ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetJobSize));
{
/* If ldm is enabled we need windowSize space. */
size_t const windowSize = mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable ? (1U << mtctx->params.cParams.windowLog) : 0;
/* Two buffers of slack, plus extra space for the overlap
* This is the minimum slack that LDM works with. One extra because
- * flush might waste up to targetSectionSize-1 bytes. Another extra
+ * flush might waste up to targetJobSize-1 bytes. Another extra
* for the overlap (if > 0), then one to fill which doesn't overlap
* with the LDM window.
*/
size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
- size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers;
+ size_t const slackSize = mtctx->targetJobSize * nbSlackBuffers;
/* Compute the total size, and always have enough slack */
size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
- size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers;
+ size_t const sectionsSize = mtctx->targetJobSize * nbWorkers;
size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
if (mtctx->roundBuff.capacity < capacity) {
if (mtctx->roundBuff.buffer)
mtctx->cdict = cdict;
}
- if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize,
+ if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetJobSize,
dict, dictSize, dictContentType))
return ERROR(memory_allocation);
mtctx->jobs[jobID].lastJob = endFrame;
mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);
mtctx->jobs[jobID].dstFlushed = 0;
+ mtctx->jobs[jobID].flush_mutex = NULL;
+ mtctx->jobs[jobID].flush_cond = NULL;
/* Update the round buffer pos and clear the input buffer to be reset */
mtctx->roundBuff.pos += srcSize;
} }
if ( (srcSize == 0)
- && (mtctx->nextJobID>0)/*single job must also write frame header*/ ) {
+ && (mtctx->nextJobID>0) /*single job must also write frame header*/ ) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
mtctx->nextJobID++;
- return 0;
+ return 1;
}
}
mtctx->nextJobID,
jobID);
- if (ZSTDMT_anythingToFlush(mtctx)) {
+ if (1 || ZSTDMT_anythingToFlush(mtctx)) {
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
+ return 1;
} else {
- DEBUGLOG(2, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
mtctx->jobReady = 1;
+ return 0;
}
} else {
/* block here, wait for next available job */
mtctx->nextJobID++;
mtctx->jobReady = 0;
}
-
- return 0;
+ return 1;
}
}
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
- ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */
+ if (mtctx->jobs[wJobID].flush_mutex == NULL) {
+ mtctx->jobs[wJobID].flush_mutex = &mtctx->flushMutex;
+ mtctx->jobs[wJobID].flush_cond = &mtctx->flushCond;
+ }
+ DEBUGLOG(6, "waiting to flush something (%zu left)", mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed);
+ ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
+ ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex); /* block waiting for something to flush */
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
+ DEBUGLOG(6, "condition triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed);
} }
/* try to flush something */
DEBUGLOG(5, "dstBuffer released");
mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */
+ mtctx->jobs[wJobID].flush_mutex = NULL;
+ mtctx->jobs[wJobID].flush_cond = NULL;
mtctx->consumed += srcSize;
mtctx->produced += cSize;
mtctx->doneJobID++;
/* no need to check during first round */
size_t roundBuffCapacity = mtctx->roundBuff.capacity;
- size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetSectionSize;
+ size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetJobSize;
if (lastJobID < nbJobs1stRoundMin) return kNullRange;
for (jobID = firstJobID; jobID < lastJobID; ++jobID) {
ZSTD_PTHREAD_MUTEX_LOCK(mutex);
while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
DEBUGLOG(5, "Waiting for LDM to finish...");
- ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
+ ZSTD_PTHREAD_COND_WAIT(&mtctx->serial.ldmWindowCond, mutex);
}
DEBUGLOG(6, "Done waiting for LDM to finish");
ZSTD_pthread_mutex_unlock(mutex);
{
Range const inUse = ZSTDMT_getInputDataInUse(mtctx);
size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos;
- size_t const spaceNeeded = mtctx->targetSectionSize;
+ size_t const spaceNeeded = mtctx->targetJobSize;
Buffer buffer;
DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
BYTE const* prev;
size_t pos;
- syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
+ syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetJobSize - mtctx->inBuff.filled);
syncPoint.flush = 0;
if (!mtctx->params.rsyncable)
/* Rsync is disabled. */
* window. However, since it depends only in the internal buffers, if the
* state is already synchronized, we will remain synchronized.
* Additionally, the probability that we miss a synchronization point is
- * low: RSYNC_LENGTH / targetSectionSize.
+ * low: RSYNC_LENGTH / targetJobSize.
*/
return syncPoint;
/* Initialize the loop variables. */
* through the input. If we hit a synchronization point, then cut the
* job off, and tell the compressor to flush the job. Otherwise, load
* all the bytes and continue as normal.
- * If we go too long without a synchronization point (targetSectionSize)
+ * If we go too long without a synchronization point (targetJobSize)
* then a block will be emitted anyways, but this is okay, since if we
* are already synchronized we will remain synchronized.
*/
size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)
{
- size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled;
- if (hintInSize==0) hintInSize = mtctx->targetSectionSize;
+ size_t hintInSize = mtctx->targetJobSize - mtctx->inBuff.filled;
+ if (hintInSize==0) hintInSize = mtctx->targetJobSize;
return hintInSize;
}
ZSTD_inBuffer* input,
ZSTD_EndDirective endOp)
{
- unsigned forwardInputProgress = 0;
+ unsigned forwardProgress = 0;
DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)",
(U32)endOp, (U32)(input->size - input->pos));
assert(output->pos <= output->size);
if (syncPoint.flush && endOp == ZSTD_e_continue) {
endOp = ZSTD_e_flush;
}
- assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);
+ assert(mtctx->inBuff.buffer.capacity >= mtctx->targetJobSize);
DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
- (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize);
+ (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetJobSize);
ZSTD_memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad);
input->pos += syncPoint.toLoad;
mtctx->inBuff.filled += syncPoint.toLoad;
- forwardInputProgress = syncPoint.toLoad>0;
+ forwardProgress = syncPoint.toLoad>0;
}
}
if ((input->pos < input->size) && (endOp == ZSTD_e_end)) {
* - We filled the input buffer: flush this job but don't end the frame.
* - We hit a synchronization point: flush this job but don't end the frame.
*/
- assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable);
+ assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetJobSize || mtctx->params.rsyncable);
endOp = ZSTD_e_flush;
}
if ( (mtctx->jobReady)
- || (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */
+ || (mtctx->inBuff.filled >= mtctx->targetJobSize) /* filled enough : let's compress */
|| ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */
|| ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */
size_t const jobSize = mtctx->inBuff.filled;
- assert(mtctx->inBuff.filled <= mtctx->targetSectionSize);
- FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) , "");
+ size_t const jobPosted = ZSTDMT_createCompressionJob(mtctx, jobSize, endOp);
+ assert(mtctx->inBuff.filled <= mtctx->targetJobSize);
+ FORWARD_IF_ERROR(jobPosted , "");
+ if (jobPosted) forwardProgress = 1;
}
/* check for potential compressed data ready to be flushed */
- { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
+ { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardProgress, endOp); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */
DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush);
return remainingToFlush;