From 58ecf13e025c3601391d0c001ef2369eaa0d747c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Sat, 13 Jan 2018 13:18:57 -0800 Subject: [PATCH] zstdmt : can compress at block granularity offering perspective of more accurate progression report. --- lib/compress/zstdmt_compress.c | 101 +++++++++++++++++++++------------ programs/fileio.c | 2 +- 2 files changed, 67 insertions(+), 36 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index e51edf124..3a5b58a72 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -22,6 +22,7 @@ /* ====== Dependencies ====== */ #include /* memcpy, memset */ +#include /* INT_MAX */ #include "pool.h" /* threadpool */ #include "threading.h" /* mutex */ #include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ @@ -129,7 +130,7 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) { size_t const poolSize = sizeof(*bufPool) - + (bufPool->totalBuffers - 1) * sizeof(buffer_t); + + (bufPool->totalBuffers - 1) * sizeof(buffer_t); unsigned u; size_t totalBufferSize = 0; ZSTD_pthread_mutex_lock(&bufPool->poolMutex); @@ -201,20 +202,6 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) ZSTD_free(buf.start, bufPool->cMem); } -/* Sets parameters relevant to the compression job, initializing others to - * default values. Notably, nbThreads should probably be zero. */ -static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params) -{ - ZSTD_CCtx_params jobParams; - memset(&jobParams, 0, sizeof(jobParams)); - - jobParams.cParams = params.cParams; - jobParams.fParams = params.fParams; - jobParams.compressionLevel = params.compressionLevel; - - jobParams.ldmParams = params.ldmParams; - return jobParams; -} /* ===== CCtx Pool ===== */ /* a single CCtx Pool can be invoked from multiple threads in parallel */ @@ -305,13 +292,16 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) } -/* ===== Thread worker ===== */ +/* ------------------------------------------ */ +/* ===== Thread worker ===== */ +/* ------------------------------------------ */ typedef struct { buffer_t src; const void* srcStart; size_t prefixSize; size_t srcSize; + size_t readSize; buffer_t dstBuff; size_t cSize; size_t dstFlushed; @@ -328,21 +318,19 @@ typedef struct { unsigned long long fullFrameSize; } ZSTDMT_jobDescription; -/* ZSTDMT_compressChunk() : POOL_function type */ +/* ZSTDMT_compressChunk() is a POOL_function type */ void ZSTDMT_compressChunk(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); const void* const src = (const char*)job->srcStart + job->prefixSize; buffer_t dstBuff = job->dstBuff; - DEBUGLOG(5, "ZSTDMT_compressChunk: job (first:%u) (last:%u) : prefixSize %u, srcSize %u ", - job->firstChunk, job->lastChunk, (U32)job->prefixSize, (U32)job->srcSize); + /* ressources */ if (cctx==NULL) { job->cSize = ERROR(memory_allocation); goto _endJob; } - if (dstBuff.start == NULL) { dstBuff = ZSTDMT_getBuffer(job->bufPool); if (dstBuff.start==NULL) { @@ -350,30 +338,26 @@ void ZSTDMT_compressChunk(void* jobDescription) goto _endJob; } job->dstBuff = dstBuff; - DEBUGLOG(5, "ZSTDMT_compressChunk: received dstBuff of size %u", (U32)dstBuff.size); } + /* init */ if (job->cdict) { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize); - DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict (windowLog=%u)", job->params.cParams.windowLog); assert(job->firstChunk); /* only allowed for first job */ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { /* srcStart points at reloaded section */ U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ - size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); - if (ZSTD_isError(forceWindowError)) { - DEBUGLOG(5, "ZSTD_CCtxParam_setParameter error : %s ", ZSTD_getErrorName(forceWindowError)); - job->cSize = forceWindowError; - goto _endJob; - } - DEBUGLOG(5, "ZSTDMT_compressChunk: invoking ZSTD_compressBegin_advanced_internal with windowLog = %u ", jobParams.cParams.windowLog); + { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); + if (ZSTD_isError(forceWindowError)) { + job->cSize = forceWindowError; + goto _endJob; + } } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ NULL, jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) { - DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal error : %s ", ZSTD_getErrorName(initError)); job->cSize = initError; goto _endJob; } } @@ -384,19 +368,50 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_invalidateRepCodes(cctx); } - DEBUGLOG(5, "Compressing into dstBuff of size %u", (U32)dstBuff.size); - DEBUG_PRINTHEX(6, job->srcStart, 12); + /* compress */ +#if 1 job->cSize = (job->lastChunk) ? ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); - DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u) ", - (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); - DEBUGLOG(5, "dstBuff.size : %u ; => %s ", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); +#else + if (sizeof(size_t) > sizeof(int)) + assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ + { int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX); + const BYTE* ip = (const BYTE*) src; + BYTE* const ostart = (BYTE*)dstBuff.start; + BYTE* op = ostart; + BYTE* oend = op + dstBuff.size; + int blockNb; + job->cSize = 0; + for (blockNb = 0; blockNb < nbBlocks-1; blockNb++) { + size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); + if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } + ip += ZSTD_BLOCKSIZE_MAX; + op += cSize; assert(op < oend); + /* stats */ + job->cSize += cSize; + job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1); + } + /* last block */ + { size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); + size_t const lastBlockSize = (lastBlockSize1==0) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; + size_t const cSize = (job->lastChunk) ? + ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : + ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); + if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } + /* stats */ + job->cSize += cSize; + job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1); + } + } +#endif _endJob: + /* release */ ZSTDMT_releaseCCtx(job->cctxPool, cctx); ZSTDMT_releaseBuffer(job->bufPool, job->src); job->src = g_nullBuffer; job->srcStart = NULL; + /* report */ ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); job->jobCompleted = 1; job->jobScanned = 0; @@ -440,6 +455,21 @@ struct ZSTDMT_CCtx_s { const ZSTD_CDict* cdict; }; +/* Sets parameters relevant to the compression job, initializing others to + * default values. Notably, nbThreads should probably be zero. */ +static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params) +{ + ZSTD_CCtx_params jobParams; + memset(&jobParams, 0, sizeof(jobParams)); + + jobParams.cParams = params.cParams; + jobParams.fParams = params.fParams; + jobParams.compressionLevel = params.compressionLevel; + + jobParams.ldmParams = params.ldmParams; + return jobParams; +} + static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) { U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; @@ -908,6 +938,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; + zcs->jobs[jobID].readSize = 0; zcs->jobs[jobID].prefixSize = zcs->dictSize; assert(zcs->inBuff.filled >= srcSize + zcs->dictSize); zcs->jobs[jobID].params = zcs->params; diff --git a/programs/fileio.c b/programs/fileio.c index 887cbeb37..3ae2d4057 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -828,7 +828,7 @@ finish: /* Status */ DISPLAYLEVEL(2, "\r%79s\r", ""); DISPLAYLEVEL(2,"%-20s :%6.2f%% (%6llu => %6llu bytes, %s) \n", srcFileName, - (double)compressedfilesize/(readsize+(!readsize) /* avoid div by zero */ )*100, + (double)compressedfilesize / (readsize+(!readsize)/*avoid div by zero*/) * 100, (unsigned long long)readsize, (unsigned long long) compressedfilesize, dstFileName); -- 2.47.2