size_t dstFlushed;
unsigned firstChunk;
unsigned lastChunk;
- unsigned jobCompleted;
unsigned frameChecksumNeeded;
ZSTD_pthread_mutex_t* mtctx_mutex;
ZSTD_pthread_cond_t* mtctx_cond;
goto _endJob;
}
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
- job->dstBuff = dstBuff;
+ job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
if (ZSTD_isError(initError)) {
job->cSize = initError;
goto _endJob;
- } }
- }
+ } } }
if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
}
/* compress */
-#if 0
- job->cSize = (job->lastChunk) ?
- ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
- ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
-#else
- if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */
+ if (sizeof(size_t) > sizeof(int))
+ assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */
+
{ int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX);
const BYTE* ip = (const BYTE*) src;
BYTE* const ostart = (BYTE*)dstBuff.start;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
- ZSTD_pthread_cond_signal(job->mtctx_cond);
+ ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
/* last block */
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */
- ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */
+ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->cSize += cSize;
- job->consumed = job->srcSize;
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
- }
- }
-#endif
+ } }
_endJob:
- /* release */
+ /* release resources */
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
ZSTDMT_releaseBuffer(job->bufPool, job->src);
job->src = g_nullBuffer; job->srcStart = NULL;
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->consumed = job->srcSize;
- job->jobCompleted = 1;
ZSTD_pthread_cond_signal(job->mtctx_cond);
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
mtctx->allJobsCompleted = 1;
}
-static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
+static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
{
DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
- while (zcs->doneJobID < zcs->nextJobID) {
- unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
- ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
- while (zcs->jobs[jobID].jobCompleted==0) {
- DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
- ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex);
+ while (mtctx->doneJobID < mtctx->nextJobID) {
+ unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+ while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
+ DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
+ ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
}
- ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
- zcs->doneJobID++;
+ ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
+ mtctx->doneJobID++;
}
}
mtctx->jobs[u].src = g_nullBuffer;
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize;
- mtctx->jobs[u].srcSize = chunkSize;
+ mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */
mtctx->jobs[u].consumed = 0;
mtctx->jobs[u].cSize = 0;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
mtctx->jobs[u].bufPool = mtctx->bufPool;
mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1);
- mtctx->jobs[u].jobCompleted = 0;
mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
- while (mtctx->jobs[chunkID].jobCompleted==0) {
+ while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
}
/* ====================================== */
size_t ZSTDMT_initCStream_internal(
- ZSTDMT_CCtx* zcs,
+ ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize)
/* params are supposed to be fully validated at this point */
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
assert(!((dict) && (cdict))); /* either dict or cdict, not both */
- assert(zcs->cctxPool->totalCCtx == params.nbThreads);
- zcs->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
+ assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
+ mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
if (params.jobSize == 0) {
if (params.cParams.windowLog >= 29)
params.jobSize = ZSTDMT_JOBSIZE_MAX;
}
if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
- if (zcs->singleBlockingThread) {
+ if (mtctx->singleBlockingThread) {
ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
assert(singleThreadParams.nbThreads == 0);
- return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0],
+ return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
dict, dictSize, cdict,
singleThreadParams, pledgedSrcSize);
}
DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads);
- if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
- zcs->allJobsCompleted = 1;
+ if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */
+ ZSTDMT_waitForAllJobsCompleted(mtctx);
+ ZSTDMT_releaseAllJobResources(mtctx);
+ mtctx->allJobsCompleted = 1;
}
- zcs->params = params;
- zcs->frameContentSize = pledgedSrcSize;
+ mtctx->params = params;
+ mtctx->frameContentSize = pledgedSrcSize;
if (dict) {
- ZSTD_freeCDict(zcs->cdictLocal);
- zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
+ ZSTD_freeCDict(mtctx->cdictLocal);
+ mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
ZSTD_dlm_byCopy, dictMode, /* note : a loadPrefix becomes an internal CDict */
- params.cParams, zcs->cMem);
- zcs->cdict = zcs->cdictLocal;
- if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
+ params.cParams, mtctx->cMem);
+ mtctx->cdict = mtctx->cdictLocal;
+ if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation);
} else {
- ZSTD_freeCDict(zcs->cdictLocal);
- zcs->cdictLocal = NULL;
- zcs->cdict = cdict;
+ ZSTD_freeCDict(mtctx->cdictLocal);
+ mtctx->cdictLocal = NULL;
+ mtctx->cdict = cdict;
}
assert(params.overlapSizeLog <= 9);
- zcs->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog));
- DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(zcs->targetPrefixSize>>10));
- zcs->targetSectionSize = params.jobSize;
- if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
- if (zcs->targetSectionSize < zcs->targetPrefixSize) zcs->targetSectionSize = zcs->targetPrefixSize; /* job size must be >= overlap size */
- DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize);
- zcs->inBuffSize = zcs->targetPrefixSize + zcs->targetSectionSize;
- DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10));
- ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
- zcs->inBuff.buffer = g_nullBuffer;
- zcs->prefixSize = 0;
- zcs->doneJobID = 0;
- zcs->nextJobID = 0;
- zcs->frameEnded = 0;
- zcs->allJobsCompleted = 0;
- zcs->consumed = 0;
- zcs->produced = 0;
- if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
+ mtctx->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog));
+ DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
+ mtctx->targetSectionSize = params.jobSize;
+ if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
+ if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
+ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
+ mtctx->inBuffSize = mtctx->targetPrefixSize + mtctx->targetSectionSize;
+ DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuffSize>>10));
+ ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuffSize, ZSTD_compressBound(mtctx->targetSectionSize)) );
+ mtctx->inBuff.buffer = g_nullBuffer;
+ mtctx->prefixSize = 0;
+ mtctx->doneJobID = 0;
+ mtctx->nextJobID = 0;
+ mtctx->frameEnded = 0;
+ mtctx->allJobsCompleted = 0;
+ mtctx->consumed = 0;
+ mtctx->produced = 0;
+ if (params.fParams.checksumFlag) XXH64_reset(&mtctx->xxhState, 0);
return 0;
}
* pledgedSrcSize can be zero == unknown (for the time being)
* prefer using ZSTD_CONTENTSIZE_UNKNOWN,
* as `0` might mean "empty" in the future */
-size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
+size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize)
{
if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
- if (zcs->params.nbThreads==1)
- return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
- return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, 0, zcs->params,
+ if (mtctx->params.nbThreads==1)
+ return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize);
+ return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params,
pledgedSrcSize);
}
-size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
+size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) {
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0);
- ZSTD_CCtx_params cctxParams = zcs->params; /* retrieve sticky params */
+ ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */
DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel);
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
- return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN);
+ return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN);
}
-static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD_EndDirective endOp)
+/* ZSTDMT_writeLastEmptyBlock()
+ * Write a single empty block with an end-of-frame
+ * to finish a frame.
+ * Completed synchronously.
+ * @return : 0, or an error code (can fail due to memory allocation)
+ */
+static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
+{
+ assert(job->srcSize == 0);
+ assert(job->lastChunk == 1);
+ assert(job->firstChunk == 0); /* first chunk needs to create frame header too */
+ assert(job->dstBuff.start == NULL); /* invoked from streaming variant only */
+ { buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool);
+ if (dstBuff.start==NULL) return ERROR(memory_allocation);
+ job->dstBuff = dstBuff; /* will be released by ZSTDMT_flushProduced() */
+ assert(dstBuff.size >= ZSTD_blockHeaderSize);
+ job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size);
+ assert(!ZSTD_isError(job->cSize));
+ assert(job->consumed == 0);
+ }
+ return 0;
+}
+
+static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
{
- unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
+ unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
int const endFrame = (endOp == ZSTD_e_end);
- if (zcs->nextJobID > zcs->doneJobID + zcs->jobIDMask) {
+ if (mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");
- assert((zcs->nextJobID & zcs->jobIDMask) == (zcs->doneJobID & zcs->jobIDMask));
+ assert((mtctx->nextJobID & mtctx->jobIDMask) == (mtctx->doneJobID & mtctx->jobIDMask));
return 0;
}
- if (!zcs->jobReady) {
+ if (!mtctx->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
- zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
- zcs->jobs[jobID].src = zcs->inBuff.buffer;
- zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
- zcs->jobs[jobID].srcSize = srcSize;
- zcs->jobs[jobID].consumed = 0;
- zcs->jobs[jobID].cSize = 0;
- zcs->jobs[jobID].prefixSize = zcs->prefixSize;
- assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
- zcs->jobs[jobID].params = zcs->params;
+ mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize);
+ mtctx->jobs[jobID].src = mtctx->inBuff.buffer;
+ mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start;
+ mtctx->jobs[jobID].srcSize = srcSize;
+ mtctx->jobs[jobID].consumed = 0;
+ mtctx->jobs[jobID].cSize = 0;
+ mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
+ assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize);
+ mtctx->jobs[jobID].params = mtctx->params;
/* do not calculate checksum within sections, but write it in header for first section */
- if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
- zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
- zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
- zcs->jobs[jobID].dstBuff = g_nullBuffer;
- zcs->jobs[jobID].cctxPool = zcs->cctxPool;
- zcs->jobs[jobID].bufPool = zcs->bufPool;
- zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
- zcs->jobs[jobID].lastChunk = endFrame;
- zcs->jobs[jobID].jobCompleted = 0;
- zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
- zcs->jobs[jobID].dstFlushed = 0;
- zcs->jobs[jobID].mtctx_mutex = &zcs->mtctx_mutex;
- zcs->jobs[jobID].mtctx_cond = &zcs->mtctx_cond;
-
- if (zcs->params.fParams.checksumFlag)
- XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
+ if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0;
+ mtctx->jobs[jobID].cdict = mtctx->nextJobID==0 ? mtctx->cdict : NULL;
+ mtctx->jobs[jobID].fullFrameSize = mtctx->frameContentSize;
+ mtctx->jobs[jobID].dstBuff = g_nullBuffer;
+ mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
+ mtctx->jobs[jobID].bufPool = mtctx->bufPool;
+ mtctx->jobs[jobID].firstChunk = (mtctx->nextJobID==0);
+ mtctx->jobs[jobID].lastChunk = endFrame;
+ mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
+ mtctx->jobs[jobID].dstFlushed = 0;
+ mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex;
+ mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond;
+
+ if (mtctx->params.fParams.checksumFlag)
+ XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->prefixSize, srcSize);
/* get a new buffer for next input */
if (!endFrame) {
- size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
- zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
- if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
- zcs->jobs[jobID].jobCompleted = 1;
- zcs->nextJobID++;
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
+ size_t const newPrefixSize = MIN(srcSize + mtctx->prefixSize, mtctx->targetPrefixSize);
+ mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
+ if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
+ mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0;
+ mtctx->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(mtctx);
+ ZSTDMT_releaseAllJobResources(mtctx);
return ERROR(memory_allocation);
}
- zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
- memmove(zcs->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
- (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
- zcs->inBuff.filled);
- zcs->prefixSize = newPrefixSize;
+ mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize;
+ memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
+ (const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize,
+ mtctx->inBuff.filled);
+ mtctx->prefixSize = newPrefixSize;
} else { /* endFrame==1 => no need for another input buffer */
- zcs->inBuff.buffer = g_nullBuffer;
- zcs->inBuff.filled = 0;
- zcs->prefixSize = 0;
- zcs->frameEnded = endFrame;
- if (zcs->nextJobID == 0) {
- /* single chunk exception : checksum is calculated directly within worker thread */
- zcs->params.fParams.checksumFlag = 0;
- } } }
+ mtctx->inBuff.buffer = g_nullBuffer;
+ mtctx->inBuff.filled = 0;
+ mtctx->prefixSize = 0;
+ mtctx->frameEnded = endFrame;
+ if (mtctx->nextJobID == 0) {
+ /* single chunk exception : checksum is already calculated directly within worker thread */
+ mtctx->params.fParams.checksumFlag = 0;
+ } }
- DEBUGLOG(2, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
- zcs->nextJobID,
- (U32)zcs->jobs[jobID].srcSize,
- zcs->jobs[jobID].lastChunk,
- zcs->doneJobID,
- zcs->doneJobID & zcs->jobIDMask);
- if (POOL_tryAdd(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID])) {
- zcs->nextJobID++;
- zcs->jobReady = 0;
+ if ( (srcSize == 0)
+ && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
+ assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
+ CHECK_F( ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID) );
+ mtctx->nextJobID++;
+ return 0;
+ }
+ }
+
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
+ mtctx->nextJobID,
+ (U32)mtctx->jobs[jobID].srcSize,
+ mtctx->jobs[jobID].lastChunk,
+ mtctx->doneJobID,
+ mtctx->doneJobID & mtctx->jobIDMask);
+ if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) {
+ mtctx->nextJobID++;
+ mtctx->jobReady = 0;
} else {
- DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", zcs->nextJobID);
- zcs->jobReady = 1;
+ DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID);
+ mtctx->jobReady = 1;
}
return 0;
}
* `output` : `pos` will be updated with amount of data flushed .
* `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
* @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
-static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
+static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
{
- unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
+ unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
assert(output->size >= output->pos);
- ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
- if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
- while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
- if (zcs->jobs[wJobID].jobCompleted==1) break;
+ ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+ if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) {
+ assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
+ while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) {
+ if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) {
+ DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond",
+ mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize);
+ break;
+ }
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
- zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
- ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex); /* block when nothing available to flush but more to come */
+ mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
+ ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */
} }
/* some output is available to be flushed */
- { ZSTDMT_jobDescription job = zcs->jobs[wJobID];
- ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
+ { ZSTDMT_jobDescription job = mtctx->jobs[wJobID];
+ ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
if (ZSTD_isError(job.cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
- zcs->doneJobID, ZSTD_getErrorName(job.cSize));
- ZSTDMT_waitForAllJobsCompleted(zcs);
- ZSTDMT_releaseAllJobResources(zcs);
+ mtctx->doneJobID, ZSTD_getErrorName(job.cSize));
+ ZSTDMT_waitForAllJobsCompleted(mtctx);
+ ZSTDMT_releaseAllJobResources(mtctx);
return job.cSize;
}
/* add frame checksum if necessary (can only happen once) */
- if ( job.jobCompleted
+ assert(job.consumed <= job.srcSize);
+ if ( (job.consumed == job.srcSize)
&& job.frameChecksumNeeded ) {
- U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
+ U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState);
DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4;
- zcs->jobs[wJobID].cSize += 4;
- zcs->jobs[wJobID].frameChecksumNeeded = 0;
+ mtctx->jobs[wJobID].cSize += 4;
+ mtctx->jobs[wJobID].frameChecksumNeeded = 0;
}
assert(job.cSize >= job.dstFlushed);
- if (job.dstBuff.start != NULL) { /* one buffer present : some job is ongoing */
+ if (job.dstBuff.start != NULL) { /* dst buffer present : some work is ongoing or completed */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
- (U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
+ (U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
- if ( job.jobCompleted
+ if ( (job.consumed == job.srcSize)
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
- zcs->doneJobID, (U32)job.dstFlushed);
- ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
- zcs->jobs[wJobID].dstBuff = g_nullBuffer;
- zcs->jobs[wJobID].jobCompleted = 0;
- zcs->consumed += job.srcSize;
- zcs->produced += job.cSize;
- zcs->doneJobID++;
+ mtctx->doneJobID, (U32)job.dstFlushed);
+ ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff);
+ mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
+ mtctx->consumed += job.srcSize;
+ mtctx->produced += job.cSize;
+ mtctx->doneJobID++;
} else {
- zcs->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */
+ mtctx->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */
} }
/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */
}
- if (zcs->doneJobID < zcs->nextJobID) return 1; /* some more jobs to flush */
- if (zcs->jobReady) return 1; /* one job is ready and queued! */
- if (zcs->inBuff.filled > 0) return 1; /* input not empty */
- zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
- if (end == ZSTD_e_end) return !zcs->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
+ if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs to flush */
+ if (mtctx->jobReady) return 1; /* one job is ready and queued! */
+ if (mtctx->inBuff.filled > 0) return 1; /* input not empty */
+ mtctx->allJobsCompleted = mtctx->frameEnded; /* last frame entirely flushed */
+ if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
return 0; /* everything flushed */
}
}
-size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
+size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
- CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) );
+ CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
/* recommended next input size : fill current input buffer */
- return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
+ return mtctx->inBuffSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
}