/* ====== Debug ====== */
-#if defined(ZSTDMT_DEBUG)
+#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=1)
+# include <assert.h>
+#else
+# define assert(condition) ((void)0)
+#endif
+
+#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=2)
# include <stdio.h>
# include <unistd.h>
size_t sectionSize;
ZSTD_customMem cMem;
ZSTD_CDict* cdict;
- ZSTD_CStream* cstream;
};
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
- if (nbThreads==1) {
- mtctx->cstream = ZSTD_createCStream_advanced(cMem);
- if (!mtctx->cstream) {
- ZSTDMT_freeCCtx(mtctx); return NULL;
- } }
pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL); /* Todo : check init function return */
pthread_cond_init(&mtctx->jobCompleted_cond, NULL);
DEBUGLOG(4, "mt_cctx created, for %u threads", nbThreads);
ZSTD_free(mtctx->jobs, mtctx->cMem);
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdict);
- ZSTD_freeCStream(mtctx->cstream);
pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
pthread_cond_destroy(&mtctx->jobCompleted_cond);
ZSTD_free(mtctx, mtctx->cMem);
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
ZSTD_customMem const cmem = { NULL, NULL, NULL };
- DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog);
- if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize);
+ DEBUGLOG(3, "Started new compression, with windowLog : %u",
+ params.cParams.windowLog);
+ if (zcs->nbThreads==1)
+ return ZSTD_initCStream_advanced(zcs->cctxPool->cctx[0],
+ dict, dictSize,
+ params, pledgedSrcSize);
if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
* pledgedSrcSize is optional and can be zero == unknown */
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
{
- if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize);
+ if (zcs->nbThreads==1)
+ return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize);
}
return ERROR(memory_allocation);
}
- DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
+ DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ",
+ zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
- zcs->jobs[jobID].dictSize = zcs->dictSize; /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */
+ zcs->jobs[jobID].dictSize = zcs->dictSize;
+ assert(zcs->inBuff.filled >= srcSize + zcs->dictSize);
zcs->jobs[jobID].params = zcs->params;
- if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
+ /* do not calculate checksum within sections, but write it in header for first section */
+ if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
zcs->jobs[jobID].dstBuff = dstBuffer;
}
DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled);
zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
- DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize));
- memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled);
+ DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src",
+ (U32)zcs->inBuff.filled, (U32)newDictSize,
+ (U32)(zcs->inBuff.filled - newDictSize));
+ memmove(zcs->inBuff.buffer.start,
+ (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize,
+ zcs->inBuff.filled);
DEBUGLOG(5, "new inBuff pre-filled");
zcs->dictSize = newDictSize;
} else {
zcs->dictSize = 0;
zcs->frameEnded = 1;
if (zcs->nextJobID == 0)
- zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */
+ /* single chunk exception : checksum is calculated directly within worker thread */
+ zcs->params.fParams.checksumFlag = 0;
}
- DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
+ DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
+ zcs->nextJobID,
+ (U32)zcs->jobs[jobID].srcSize,
+ zcs->jobs[jobID].lastChunk,
+ zcs->doneJobID,
+ zcs->doneJobID & zcs->jobIDMask);
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
zcs->nextJobID++;
return 0;
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
- if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
- if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
+ if (zcs->frameEnded)
+ /* current frame being ended. Only flush is allowed. Restart with init */
+ return ERROR(stage_wrong);
+ if (zcs->nbThreads==1)
+ return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input);
/* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{
- if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output);
+ if (zcs->nbThreads==1)
+ return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(zcs, output, 0);
}
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{
- if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output);
+ if (zcs->nbThreads==1)
+ return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(zcs, output, 1);
}