ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem);
}
-static void ZSTDMT_serialState_update(SerialState* serialState,
- ZSTD_CCtx* jobCCtx, RawSeqStore_t seqStore,
- Range src, unsigned jobID)
+static void
+ZSTDMT_serialState_genSequences(SerialState* serialState,
+ RawSeqStore_t* seqStore,
+ Range src, unsigned jobID)
{
/* Wait for our turn */
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
/* It is now our turn, do any processing necessary */
if (serialState->params.ldmParams.enableLdm == ZSTD_ps_enable) {
size_t error;
- assert(seqStore.seq != NULL && seqStore.pos == 0 &&
- seqStore.size == 0 && seqStore.capacity > 0);
+ DEBUGLOG(6, "ZSTDMT_serialState_genSequences: LDM update");
+ assert(seqStore->seq != NULL && seqStore->pos == 0 &&
+ seqStore->size == 0 && seqStore->capacity > 0);
assert(src.size <= serialState->params.jobSize);
ZSTD_window_update(&serialState->ldmState.window, src.start, src.size, /* forceNonContiguous */ 0);
error = ZSTD_ldm_generateSequences(
- &serialState->ldmState, &seqStore,
+ &serialState->ldmState, seqStore,
&serialState->params.ldmParams, src.start, src.size);
/* We provide a large enough buffer to never fail. */
assert(!ZSTD_isError(error)); (void)error;
serialState->nextJobID++;
ZSTD_pthread_cond_broadcast(&serialState->cond);
ZSTD_pthread_mutex_unlock(&serialState->mutex);
+}
- if (seqStore.size > 0) {
- ZSTD_referenceExternalSequences(jobCCtx, seqStore.seq, seqStore.size);
- assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable);
+static void
+ZSTDMT_serialState_applySequences(const SerialState* serialState, /* just for an assert() check */
+ ZSTD_CCtx* jobCCtx,
+ const RawSeqStore_t* seqStore)
+{
+ if (seqStore->size > 0) {
+ DEBUGLOG(5, "ZSTDMT_serialState_applySequences: uploading %u external sequences", (unsigned)seqStore->size);
+ assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable); (void)serialState;
+ assert(jobCCtx);
+ ZSTD_referenceExternalSequences(jobCCtx, seqStore->seq, seqStore->size);
}
}
Buffer dstBuff = job->dstBuff;
size_t lastCBlockSize = 0;
+ DEBUGLOG(5, "ZSTDMT_compressionJob: job %u", job->jobID);
/* resources */
if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */
/* init */
+
+ /* Perform serial step as early as possible */
+ ZSTDMT_serialState_genSequences(job->serial, &rawSeqStore, job->src, job->jobID);
+
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize);
assert(job->firstJob); /* only allowed for first job */
size_t const err = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_deterministicRefPrefix, 0);
if (ZSTD_isError(err)) JOB_ERROR(err);
}
+ DEBUGLOG(6, "ZSTDMT_compressionJob: job %u: loading prefix of size %zu", job->jobID, job->prefix.size);
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
- job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
+ job->prefix.start, job->prefix.size, ZSTD_dct_rawContent,
ZSTD_dtlm_fast,
NULL, /*cdict*/
&jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} }
- /* Perform serial step as early as possible, but after CCtx initialization */
- ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
+ /* External Sequences can only be applied after CCtx initialization */
+ ZSTDMT_serialState_applySequences(job->serial, cctx, &rawSeqStore);
if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue_public(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);