From: Yann Collet Date: Sat, 27 Jan 2018 01:08:58 +0000 (-0800) Subject: zstdmt: minor code refactor for clarity X-Git-Tag: v1.3.4~1^2~67^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=77e36273de31ded4fc35703514175af07bdf2fb5;p=thirdparty%2Fzstd.git zstdmt: minor code refactor for clarity --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index d78257b2e..e04864507 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -379,35 +379,35 @@ void ZSTDMT_compressChunk(void* jobDescription) } /* compress */ - if (sizeof(size_t) > sizeof(int)) - assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ - - { int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX); + { size_t const blockSize = ZSTD_BLOCKSIZE_MAX; + int const nbBlocks = (int)((job->srcSize + (blockSize-1)) / blockSize); const BYTE* ip = (const BYTE*) src; BYTE* const ostart = (BYTE*)dstBuff.start; BYTE* op = ostart; BYTE* oend = op + dstBuff.capacity; int blockNb; + if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * blockSize); /* check overflow */ DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); assert(job->cSize == 0); for (blockNb = 1; blockNb < nbBlocks; blockNb++) { - size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); + size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, blockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } - ip += ZSTD_BLOCKSIZE_MAX; + ip += blockSize; op += cSize; assert(op < oend); /* stats */ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; - job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; + job->consumed = blockSize * blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */ ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } /* last block */ + assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */ if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) { - size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); - size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; + size_t const lastBlockSize1 = job->srcSize & (blockSize-1); + size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=blockSize)) ? blockSize : lastBlockSize1; size_t const cSize = (job->lastChunk) ? ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); @@ -469,21 +469,10 @@ struct ZSTDMT_CCtx_s { const ZSTD_CDict* cdict; }; -/* Sets parameters relevant to the compression job, initializing others to - * default values. Notably, nbThreads should probably be zero. */ -static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) -{ - ZSTD_CCtx_params jobParams; - memset(&jobParams, 0, sizeof(jobParams)); - - jobParams.cParams = params.cParams; - jobParams.fParams = params.fParams; - jobParams.compressionLevel = params.compressionLevel; - - jobParams.ldmParams = params.ldmParams; - return jobParams; -} - +/* ZSTDMT_allocJobsTable() + * allocate, and just init to zero, a job table. + * update *nbJobsPtr to next power of 2 value, as size of table + * No reverse free() function is provided : just use ZSTD_free() */ static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) { U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; @@ -524,6 +513,7 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) mtctx->allJobsCompleted = 1; mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); + assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); @@ -649,6 +639,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, } } +/* Sets parameters relevant to the compression job, initializing others to + * default values. Notably, nbThreads should probably be zero. */ +static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) +{ + ZSTD_CCtx_params jobParams; + memset(&jobParams, 0, sizeof(jobParams)); + + jobParams.cParams = params.cParams; + jobParams.fParams = params.fParams; + jobParams.compressionLevel = params.compressionLevel; + + jobParams.ldmParams = params.ldmParams; + return jobParams; +} + /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ @@ -1139,8 +1144,9 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u } } /* try to flush something */ - { size_t cSize = mtctx->jobs[wJobID].cSize; - size_t const srcConsumed = mtctx->jobs[wJobID].consumed; + { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ + size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ + size_t const srcSize = mtctx->jobs[wJobID].srcSize; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", @@ -1150,8 +1156,8 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u return cSize; } /* add frame checksum if necessary (can only happen once) */ - assert(srcConsumed <= mtctx->jobs[wJobID].srcSize); - if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed -> worker no longer active */ + assert(srcConsumed <= srcSize); + if ( (srcConsumed == srcSize) /* job completed -> worker no longer active */ && mtctx->jobs[wJobID].frameChecksumNeeded ) { U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState); DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); @@ -1161,17 +1167,19 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u mtctx->jobs[wJobID].frameChecksumNeeded = 0; } if (cSize > 0) { /* compression is ongoing or completed */ - size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); + size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", - (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize, (U32)cSize); + (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); assert(mtctx->doneJobID < mtctx->nextJobID); assert(cSize >= mtctx->jobs[wJobID].dstFlushed); assert(mtctx->jobs[wJobID].dstBuff.start != NULL); - memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite); - output->pos += toWrite; - mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */ + memcpy((char*)output->dst + output->pos, + (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, + toFlush); + output->pos += toFlush; + mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ - if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed */ + if ( (srcConsumed == srcSize) /* job completed */ && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); @@ -1179,14 +1187,14 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ - mtctx->consumed += mtctx->jobs[wJobID].srcSize; + mtctx->consumed += srcSize; mtctx->produced += cSize; mtctx->doneJobID++; } } /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed); - if (mtctx->jobs[wJobID].srcSize > srcConsumed) return 1; /* current job not completely compressed */ + if (srcSize > srcConsumed) return 1; /* current job not completely compressed */ } if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */ if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */