static const Range kNullRange = { NULL, 0 };
typedef struct {
- size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
- size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
- ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */
- ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
- ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
- ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
- ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
+ size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
+ size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
+ ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */
+ ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
+ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
+ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
+ ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
SerialState* serial; /* Thread-safe - used by mtctx and (all) workers */
Buffer dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
Range prefix; /* set by mtctx, then read by worker & mtctx => no barrier */
Range src; /* set by mtctx, then read by worker & mtctx => no barrier */
- unsigned jobID; /* set by mtctx, then read by worker => no barrier */
- unsigned firstJob; /* set by mtctx, then read by worker => no barrier */
- unsigned lastJob; /* set by mtctx, then read by worker => no barrier */
- ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
- const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */
- unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */
- size_t dstFlushed; /* used only by mtctx */
- unsigned frameChecksumNeeded; /* used only by mtctx */
+ unsigned jobID; /* set by mtctx, then read by worker => no barrier */
+ unsigned firstJob; /* set by mtctx, then read by worker => no barrier */
+ unsigned lastJob; /* set by mtctx, then read by worker => no barrier */
+ ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
+ const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */
+ unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */
+ size_t dstFlushed; /* used only by mtctx */
+ unsigned frameChecksumNeeded; /* used only by mtctx */
} ZSTDMT_jobDescription;
#define JOB_ERROR(e) \
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize);
assert(job->firstJob); /* only allowed for first job */
if (ZSTD_isError(initError)) JOB_ERROR(initError);
- } else { /* srcStart points at reloaded section */
+ } else {
U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
{ size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
ZSTD_invalidateRepCodes(cctx);
}
- /* compress */
+ /* compress the entire job by smaller chunks, for better granularity */
{ size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX;
int const nbChunks = (int)((job->src.size + (chunkSize-1)) / chunkSize);
const BYTE* ip = (const BYTE*) job->src.start;
mtctx->params = params;
mtctx->frameContentSize = pledgedSrcSize;
- ZSTD_freeCDict(mtctx->cdictLocal);
- if (dict) {
- mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
- dictLoadMethod, dictContentType, /* note : a loadPrefix becomes an internal CDict */
- params.cParams, mtctx->cMem);
- mtctx->cdict = mtctx->cdictLocal;
- if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation);
- } else {
- mtctx->cdictLocal = NULL;
- mtctx->cdict = cdict;
- }
-
mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(¶ms);
DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10));
mtctx->targetSectionSize = params.jobSize;
mtctx->allJobsCompleted = 0;
mtctx->consumed = 0;
mtctx->produced = 0;
+
+ /* update dictionary */
+ ZSTD_freeCDict(mtctx->cdictLocal);
+ mtctx->cdictLocal = NULL;
+ if (dict) {
+ if (dictContentType == ZSTD_dct_rawContent) {
+ mtctx->inBuff.prefix.start = (const BYTE*)dict;
+ mtctx->inBuff.prefix.size = dictSize;
+ } else {
+ /* note : a loadPrefix becomes an internal CDict */
+ mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
+ dictLoadMethod, dictContentType,
+ params.cParams, mtctx->cMem);
+ mtctx->cdict = mtctx->cdictLocal;
+ if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation);
+ }
+ } else {
+ mtctx->cdict = cdict;
+ }
+
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize,
dict, dictSize, dictContentType))
return ERROR(memory_allocation);
+
+
return 0;
}
unsigned const lastJobID = mtctx->nextJobID;
unsigned jobID;
+ /* no need to check during first round */
+ size_t roundBuffCapacity = mtctx->roundBuff.capacity;
+ size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetSectionSize;
+ if (lastJobID < nbJobs1stRoundMin) return kNullRange;
+
for (jobID = firstJobID; jobID < lastJobID; ++jobID) {
unsigned const wJobID = jobID & mtctx->jobIDMask;
size_t consumed;
{
Range const inUse = ZSTDMT_getInputDataInUse(mtctx);
size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos;
- size_t const target = mtctx->targetSectionSize;
+ size_t const spaceNeeded = mtctx->targetSectionSize;
Buffer buffer;
DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
assert(mtctx->inBuff.buffer.start == NULL);
- assert(mtctx->roundBuff.capacity >= target);
+ assert(mtctx->roundBuff.capacity >= spaceNeeded);
- if (spaceLeft < target) {
+ if (spaceLeft < spaceNeeded) {
/* ZSTD_invalidateRepCodes() doesn't work for extDict variants.
* Simply copy the prefix to the beginning in that case.
*/
mtctx->roundBuff.pos = prefixSize;
}
buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos;
- buffer.capacity = target;
+ buffer.capacity = spaceNeeded;
if (ZSTDMT_isOverlapped(buffer, inUse)) {
DEBUGLOG(5, "Waiting for buffer...");