From: Yann Collet Date: Fri, 26 Jan 2018 01:35:49 +0000 (-0800) Subject: zstdmt: removed job->jobCompleted X-Git-Tag: v1.3.4~1^2~67^2~14 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a1d4041e69601b84c08082f866665a6a2eaaa8d4;p=thirdparty%2Fzstd.git zstdmt: removed job->jobCompleted replaced by equivalent signal job->consumer == job->srcSize. created additional functions ZSTD_writeLastEmptyBlock() and ZSTDMT_writeLastEmptyBlock() required when it's necessary to finish a frame with a last empty job, to create an "end of frame" marker. It avoids creating a job with srcSize==0. --- diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index a87c368ea..7b16d7364 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -1824,7 +1824,7 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, void* dst, size_t dstCapacity, const void* src, size_t srcSize) { - DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u) (dictLimit=%u, nextToUpdate=%u)", + DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u, dictLimit=%u, nextToUpdate=%u)", (U32)dstCapacity, zc->blockState.matchState.dictLimit, zc->blockState.matchState.nextToUpdate); if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) return 0; /* don't even attempt compression below a certain srcSize */ @@ -1837,9 +1837,9 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, if (current > zc->blockState.matchState.nextToUpdate + 384) zc->blockState.matchState.nextToUpdate = current - MIN(192, (U32)(current - zc->blockState.matchState.nextToUpdate - 384)); } - /* find and store sequences */ - { - U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit; + + /* select and store sequences */ + { U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit; size_t lastLLSize; { int i; for (i = 0; i < ZSTD_REP_NUM; ++i) zc->blockState.nextCBlock->rep[i] = zc->blockState.prevCBlock->rep[i]; } if (zc->appliedParams.ldmParams.enableLdm) { @@ -1848,26 +1848,20 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, U32 rep[ZSTD_REP_NUM], ZSTD_CCtx_params const* params, void const* src, size_t srcSize); ZSTD_ldmBlockCompressor const ldmBlockCompressor = extDict ? ZSTD_compressBlock_ldm_extDict : ZSTD_compressBlock_ldm; - lastLLSize = ldmBlockCompressor(&zc->ldmState, &zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams, src, srcSize); - } else { + } else { /* not long range mode */ ZSTD_blockCompressor const blockCompressor = ZSTD_selectBlockCompressor(zc->appliedParams.cParams.strategy, extDict); - lastLLSize = blockCompressor(&zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams.cParams, src, srcSize); } - { - const BYTE* const anchor = (const BYTE*)src + srcSize - lastLLSize; - ZSTD_storeLastLiterals(&zc->seqStore, anchor, lastLLSize); - } - } - /* encode */ - { - size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace); - if (ZSTD_isError(cSize) || cSize == 0) - return cSize; + { const BYTE* const lastLiterals = (const BYTE*)src + srcSize - lastLLSize; + ZSTD_storeLastLiterals(&zc->seqStore, lastLiterals, lastLLSize); + } } + + /* encode sequences and literals */ + { size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace); + if (ZSTD_isError(cSize) || cSize == 0) return cSize; /* confirm repcodes and entropy tables */ - { - ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock; + { ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock; zc->blockState.prevCBlock = zc->blockState.nextCBlock; zc->blockState.nextCBlock = tmp; } @@ -2030,6 +2024,19 @@ static size_t ZSTD_writeFrameHeader(void* dst, size_t dstCapacity, return pos; } +/* ZSTD_writeLastEmptyBlock() : + * output an empty Block with end-of-frame mark to complete a frame + * @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h)) + * or an error code if `dstCapcity` is too small (mtctx_mutex); - job->dstBuff = dstBuff; + job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } @@ -369,8 +368,7 @@ void ZSTDMT_compressChunk(void* jobDescription) if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; - } } - } + } } } if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0); if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } @@ -379,12 +377,9 @@ void ZSTDMT_compressChunk(void* jobDescription) } /* compress */ -#if 0 - job->cSize = (job->lastChunk) ? - ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : - ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); -#else - if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ + if (sizeof(size_t) > sizeof(int)) + assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ + { int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX); const BYTE* ip = (const BYTE*) src; BYTE* const ostart = (BYTE*)dstBuff.start; @@ -404,7 +399,7 @@ void ZSTDMT_compressChunk(void* jobDescription) job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); - ZSTD_pthread_cond_signal(job->mtctx_cond); + ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */ ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } /* last block */ @@ -416,23 +411,19 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); job->cSize += cSize; - job->consumed = job->srcSize; ZSTD_pthread_mutex_unlock(job->mtctx_mutex); - } - } -#endif + } } _endJob: - /* release */ + /* release resources */ ZSTDMT_releaseCCtx(job->cctxPool, cctx); ZSTDMT_releaseBuffer(job->bufPool, job->src); job->src = g_nullBuffer; job->srcStart = NULL; /* report */ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); job->consumed = job->srcSize; - job->jobCompleted = 1; ZSTD_pthread_cond_signal(job->mtctx_cond); ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } @@ -577,18 +568,18 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) mtctx->allJobsCompleted = 1; } -static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) +static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) { DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); - while (zcs->doneJobID < zcs->nextJobID) { - unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; - ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex); - while (zcs->jobs[jobID].jobCompleted==0) { - DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ - ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex); + while (mtctx->doneJobID < mtctx->nextJobID) { + unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); + while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { + DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ + ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); } - ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex); - zcs->doneJobID++; + ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); + mtctx->doneJobID++; } } @@ -769,7 +760,7 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].src = g_nullBuffer; mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].prefixSize = dictSize; - mtctx->jobs[u].srcSize = chunkSize; + mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */ mtctx->jobs[u].consumed = 0; mtctx->jobs[u].cSize = 0; mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; @@ -782,7 +773,6 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].bufPool = mtctx->bufPool; mtctx->jobs[u].firstChunk = (u==0); mtctx->jobs[u].lastChunk = (u==nbChunks-1); - mtctx->jobs[u].jobCompleted = 0; mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex; mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond; @@ -805,7 +795,7 @@ static size_t ZSTDMT_compress_advanced_internal( for (chunkID=0; chunkIDmtctx_mutex); - while (mtctx->jobs[chunkID].jobCompleted==0) { + while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) { DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID); ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); } @@ -879,7 +869,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, /* ====================================== */ size_t ZSTDMT_initCStream_internal( - ZSTDMT_CCtx* zcs, + ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode, const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) @@ -888,8 +878,8 @@ size_t ZSTDMT_initCStream_internal( /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ - assert(zcs->cctxPool->totalCCtx == params.nbThreads); - zcs->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ + assert(mtctx->cctxPool->totalCCtx == params.nbThreads); + mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ if (params.jobSize == 0) { if (params.cParams.windowLog >= 29) params.jobSize = ZSTDMT_JOBSIZE_MAX; @@ -898,56 +888,56 @@ size_t ZSTDMT_initCStream_internal( } if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; - if (zcs->singleBlockingThread) { + if (mtctx->singleBlockingThread) { ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); assert(singleThreadParams.nbThreads == 0); - return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], + return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], dict, dictSize, cdict, singleThreadParams, pledgedSrcSize); } DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads); - if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - zcs->allJobsCompleted = 1; + if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ + ZSTDMT_waitForAllJobsCompleted(mtctx); + ZSTDMT_releaseAllJobResources(mtctx); + mtctx->allJobsCompleted = 1; } - zcs->params = params; - zcs->frameContentSize = pledgedSrcSize; + mtctx->params = params; + mtctx->frameContentSize = pledgedSrcSize; if (dict) { - ZSTD_freeCDict(zcs->cdictLocal); - zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, + ZSTD_freeCDict(mtctx->cdictLocal); + mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, ZSTD_dlm_byCopy, dictMode, /* note : a loadPrefix becomes an internal CDict */ - params.cParams, zcs->cMem); - zcs->cdict = zcs->cdictLocal; - if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); + params.cParams, mtctx->cMem); + mtctx->cdict = mtctx->cdictLocal; + if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); } else { - ZSTD_freeCDict(zcs->cdictLocal); - zcs->cdictLocal = NULL; - zcs->cdict = cdict; + ZSTD_freeCDict(mtctx->cdictLocal); + mtctx->cdictLocal = NULL; + mtctx->cdict = cdict; } assert(params.overlapSizeLog <= 9); - zcs->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); - DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(zcs->targetPrefixSize>>10)); - zcs->targetSectionSize = params.jobSize; - if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN; - if (zcs->targetSectionSize < zcs->targetPrefixSize) zcs->targetSectionSize = zcs->targetPrefixSize; /* job size must be >= overlap size */ - DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize); - zcs->inBuffSize = zcs->targetPrefixSize + zcs->targetSectionSize; - DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10)); - ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) ); - zcs->inBuff.buffer = g_nullBuffer; - zcs->prefixSize = 0; - zcs->doneJobID = 0; - zcs->nextJobID = 0; - zcs->frameEnded = 0; - zcs->allJobsCompleted = 0; - zcs->consumed = 0; - zcs->produced = 0; - if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); + mtctx->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); + DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10)); + mtctx->targetSectionSize = params.jobSize; + if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; + if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ + DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); + mtctx->inBuffSize = mtctx->targetPrefixSize + mtctx->targetSectionSize; + DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuffSize>>10)); + ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuffSize, ZSTD_compressBound(mtctx->targetSectionSize)) ); + mtctx->inBuff.buffer = g_nullBuffer; + mtctx->prefixSize = 0; + mtctx->doneJobID = 0; + mtctx->nextJobID = 0; + mtctx->frameEnded = 0; + mtctx->allJobsCompleted = 0; + mtctx->consumed = 0; + mtctx->produced = 0; + if (params.fParams.checksumFlag) XXH64_reset(&mtctx->xxhState, 0); return 0; } @@ -982,103 +972,134 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, * pledgedSrcSize can be zero == unknown (for the time being) * prefer using ZSTD_CONTENTSIZE_UNKNOWN, * as `0` might mean "empty" in the future */ -size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) +size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) { if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; - if (zcs->params.nbThreads==1) - return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize); - return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, 0, zcs->params, + if (mtctx->params.nbThreads==1) + return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize); + return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params, pledgedSrcSize); } -size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { +size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0); - ZSTD_CCtx_params cctxParams = zcs->params; /* retrieve sticky params */ + ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */ DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel); cctxParams.cParams = params.cParams; cctxParams.fParams = params.fParams; - return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); + return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); } -static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD_EndDirective endOp) +/* ZSTDMT_writeLastEmptyBlock() + * Write a single empty block with an end-of-frame + * to finish a frame. + * Completed synchronously. + * @return : 0, or an error code (can fail due to memory allocation) + */ +static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) +{ + assert(job->srcSize == 0); + assert(job->lastChunk == 1); + assert(job->firstChunk == 0); /* first chunk needs to create frame header too */ + assert(job->dstBuff.start == NULL); /* invoked from streaming variant only */ + { buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool); + if (dstBuff.start==NULL) return ERROR(memory_allocation); + job->dstBuff = dstBuff; /* will be released by ZSTDMT_flushProduced() */ + assert(dstBuff.size >= ZSTD_blockHeaderSize); + job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size); + assert(!ZSTD_isError(job->cSize)); + assert(job->consumed == 0); + } + return 0; +} + +static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp) { - unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; + unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask; int const endFrame = (endOp == ZSTD_e_end); - if (zcs->nextJobID > zcs->doneJobID + zcs->jobIDMask) { + if (mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full"); - assert((zcs->nextJobID & zcs->jobIDMask) == (zcs->doneJobID & zcs->jobIDMask)); + assert((mtctx->nextJobID & mtctx->jobIDMask) == (mtctx->doneJobID & mtctx->jobIDMask)); return 0; } - if (!zcs->jobReady) { + if (!mtctx->jobReady) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", - zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize); - zcs->jobs[jobID].src = zcs->inBuff.buffer; - zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; - zcs->jobs[jobID].srcSize = srcSize; - zcs->jobs[jobID].consumed = 0; - zcs->jobs[jobID].cSize = 0; - zcs->jobs[jobID].prefixSize = zcs->prefixSize; - assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize); - zcs->jobs[jobID].params = zcs->params; + mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize); + mtctx->jobs[jobID].src = mtctx->inBuff.buffer; + mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start; + mtctx->jobs[jobID].srcSize = srcSize; + mtctx->jobs[jobID].consumed = 0; + mtctx->jobs[jobID].cSize = 0; + mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; + assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize); + mtctx->jobs[jobID].params = mtctx->params; /* do not calculate checksum within sections, but write it in header for first section */ - if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; - zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; - zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; - zcs->jobs[jobID].dstBuff = g_nullBuffer; - zcs->jobs[jobID].cctxPool = zcs->cctxPool; - zcs->jobs[jobID].bufPool = zcs->bufPool; - zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); - zcs->jobs[jobID].lastChunk = endFrame; - zcs->jobs[jobID].jobCompleted = 0; - zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag; - zcs->jobs[jobID].dstFlushed = 0; - zcs->jobs[jobID].mtctx_mutex = &zcs->mtctx_mutex; - zcs->jobs[jobID].mtctx_cond = &zcs->mtctx_cond; - - if (zcs->params.fParams.checksumFlag) - XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize); + if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0; + mtctx->jobs[jobID].cdict = mtctx->nextJobID==0 ? mtctx->cdict : NULL; + mtctx->jobs[jobID].fullFrameSize = mtctx->frameContentSize; + mtctx->jobs[jobID].dstBuff = g_nullBuffer; + mtctx->jobs[jobID].cctxPool = mtctx->cctxPool; + mtctx->jobs[jobID].bufPool = mtctx->bufPool; + mtctx->jobs[jobID].firstChunk = (mtctx->nextJobID==0); + mtctx->jobs[jobID].lastChunk = endFrame; + mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; + mtctx->jobs[jobID].dstFlushed = 0; + mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex; + mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond; + + if (mtctx->params.fParams.checksumFlag) + XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->prefixSize, srcSize); /* get a new buffer for next input */ if (!endFrame) { - size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize); - zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool); - if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ - zcs->jobs[jobID].jobCompleted = 1; - zcs->nextJobID++; - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); + size_t const newPrefixSize = MIN(srcSize + mtctx->prefixSize, mtctx->targetPrefixSize); + mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); + if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ + mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0; + mtctx->nextJobID++; + ZSTDMT_waitForAllJobsCompleted(mtctx); + ZSTDMT_releaseAllJobResources(mtctx); return ERROR(memory_allocation); } - zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize; - memmove(zcs->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ - (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize, - zcs->inBuff.filled); - zcs->prefixSize = newPrefixSize; + mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize; + memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ + (const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize, + mtctx->inBuff.filled); + mtctx->prefixSize = newPrefixSize; } else { /* endFrame==1 => no need for another input buffer */ - zcs->inBuff.buffer = g_nullBuffer; - zcs->inBuff.filled = 0; - zcs->prefixSize = 0; - zcs->frameEnded = endFrame; - if (zcs->nextJobID == 0) { - /* single chunk exception : checksum is calculated directly within worker thread */ - zcs->params.fParams.checksumFlag = 0; - } } } + mtctx->inBuff.buffer = g_nullBuffer; + mtctx->inBuff.filled = 0; + mtctx->prefixSize = 0; + mtctx->frameEnded = endFrame; + if (mtctx->nextJobID == 0) { + /* single chunk exception : checksum is already calculated directly within worker thread */ + mtctx->params.fParams.checksumFlag = 0; + } } - DEBUGLOG(2, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", - zcs->nextJobID, - (U32)zcs->jobs[jobID].srcSize, - zcs->jobs[jobID].lastChunk, - zcs->doneJobID, - zcs->doneJobID & zcs->jobIDMask); - if (POOL_tryAdd(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID])) { - zcs->nextJobID++; - zcs->jobReady = 0; + if ( (srcSize == 0) + && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) { + assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */ + CHECK_F( ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID) ); + mtctx->nextJobID++; + return 0; + } + } + + DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", + mtctx->nextJobID, + (U32)mtctx->jobs[jobID].srcSize, + mtctx->jobs[jobID].lastChunk, + mtctx->doneJobID, + mtctx->doneJobID & mtctx->jobIDMask); + if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) { + mtctx->nextJobID++; + mtctx->jobReady = 0; } else { - DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", zcs->nextJobID); - zcs->jobReady = 1; + DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID); + mtctx->jobReady = 1; } return 0; } @@ -1088,73 +1109,78 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD * `output` : `pos` will be updated with amount of data flushed . * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ -static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end) +static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end) { - unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; + unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask; DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush); assert(output->size >= output->pos); - ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex); - if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) { - while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) { - if (zcs->jobs[wJobID].jobCompleted==1) break; + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); + if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) { + assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize); + while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { + if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) { + DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond", + mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize); + break; + } DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", - zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed); - ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex); /* block when nothing available to flush but more to come */ + mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); + ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */ } } /* some output is available to be flushed */ - { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; - ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex); + { ZSTDMT_jobDescription job = mtctx->jobs[wJobID]; + ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); if (ZSTD_isError(job.cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", - zcs->doneJobID, ZSTD_getErrorName(job.cSize)); - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); + mtctx->doneJobID, ZSTD_getErrorName(job.cSize)); + ZSTDMT_waitForAllJobsCompleted(mtctx); + ZSTDMT_releaseAllJobResources(mtctx); return job.cSize; } /* add frame checksum if necessary (can only happen once) */ - if ( job.jobCompleted + assert(job.consumed <= job.srcSize); + if ( (job.consumed == job.srcSize) && job.frameChecksumNeeded ) { - U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); + U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState); DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); job.cSize += 4; - zcs->jobs[wJobID].cSize += 4; - zcs->jobs[wJobID].frameChecksumNeeded = 0; + mtctx->jobs[wJobID].cSize += 4; + mtctx->jobs[wJobID].frameChecksumNeeded = 0; } assert(job.cSize >= job.dstFlushed); - if (job.dstBuff.start != NULL) { /* one buffer present : some job is ongoing */ + if (job.dstBuff.start != NULL) { /* dst buffer present : some work is ongoing or completed */ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)", - (U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100); + (U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; - if ( job.jobCompleted + if ( (job.consumed == job.srcSize) && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", - zcs->doneJobID, (U32)job.dstFlushed); - ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); - zcs->jobs[wJobID].dstBuff = g_nullBuffer; - zcs->jobs[wJobID].jobCompleted = 0; - zcs->consumed += job.srcSize; - zcs->produced += job.cSize; - zcs->doneJobID++; + mtctx->doneJobID, (U32)job.dstFlushed); + ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff); + mtctx->jobs[wJobID].dstBuff = g_nullBuffer; + mtctx->consumed += job.srcSize; + mtctx->produced += job.cSize; + mtctx->doneJobID++; } else { - zcs->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */ + mtctx->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */ } } /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */ } - if (zcs->doneJobID < zcs->nextJobID) return 1; /* some more jobs to flush */ - if (zcs->jobReady) return 1; /* one job is ready and queued! */ - if (zcs->inBuff.filled > 0) return 1; /* input not empty */ - zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */ - if (end == ZSTD_e_end) return !zcs->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */ + if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs to flush */ + if (mtctx->jobReady) return 1; /* one job is ready and queued! */ + if (mtctx->inBuff.filled > 0) return 1; /* input not empty */ + mtctx->allJobsCompleted = mtctx->frameEnded; /* last frame entirely flushed */ + if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */ return 0; /* everything flushed */ } @@ -1241,12 +1267,12 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } -size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { - CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) ); + CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) ); /* recommended next input size : fill current input buffer */ - return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ + return mtctx->inBuffSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ }