From d5c046c609c77bd2ab8035d7b0e11d2084272a36 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 30 Jun 2017 14:51:01 -0700 Subject: [PATCH] implemented shortcut for zstd_compress_generic() in MT mode added ZSTDMT_compress_advanced() API --- lib/common/zstd_internal.h | 6 ++ lib/compress/zstd_compress.c | 8 +- lib/compress/zstdmt_compress.c | 146 ++++++++++++++++++++++++++------- lib/compress/zstdmt_compress.h | 8 ++ 4 files changed, 133 insertions(+), 35 deletions(-) diff --git a/lib/common/zstd_internal.h b/lib/common/zstd_internal.h index 59761b134..a0cc938a3 100644 --- a/lib/common/zstd_internal.h +++ b/lib/common/zstd_internal.h @@ -333,6 +333,12 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, ZSTD_parameters params, unsigned long long pledgedSrcSize); +/*! ZSTD_initCStream_internal() : + * Private use only. To be called from zstdmt_compress.c in single-thread mode. */ +size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, + ZSTD_outBuffer* output, + ZSTD_inBuffer* input, + ZSTD_EndDirective const flushMode); /*! ZSTD_getParamsFromCDict() : * as the name implies */ diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d4c029e90..f2afe625a 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3771,10 +3771,10 @@ MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, /** ZSTD_compressStream_generic(): * internal function for all *compressStream*() variants and *compress_generic() * @return : hint size for next input */ -static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, - ZSTD_outBuffer* output, - ZSTD_inBuffer* input, - ZSTD_EndDirective const flushMode) +size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, + ZSTD_outBuffer* output, + ZSTD_inBuffer* input, + ZSTD_EndDirective const flushMode) { const char* const istart = (const char*)input->src; const char* const iend = istart + input->size; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index d5f08c76f..f7ee7502d 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -261,7 +261,7 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; const void* const src = (const char*)job->srcStart + job->dictSize; buffer_t const dstBuff = job->dstBuff; - DEBUGLOG(4, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", + DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); if (job->cdict) { /* should only happen for first segment */ size_t const initError = ZSTD_compressBegin_usingCDict_advanced(job->cctx, job->cdict, job->params.fParams, job->fullFrameSize); @@ -285,7 +285,7 @@ void ZSTDMT_compressChunk(void* jobDescription) job->cSize = (job->lastChunk) ? ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); - DEBUGLOG(4, "compressed %u bytes into %u bytes (first:%u) (last:%u)", + DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); @@ -374,10 +374,11 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads) } /* ZSTDMT_releaseAllJobResources() : - * Ensure all workers are killed first. */ + * note : ensure all workers are killed first ! */ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) { unsigned jobID; + DEBUGLOG(4, "ZSTDMT_releaseAllJobResources"); for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff); mtctx->jobs[jobID].dstBuff = g_nullBuffer; @@ -439,14 +440,14 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ -size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, +size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, - int compressionLevel) + const ZSTD_CDict* cdict, + ZSTD_parameters const params, + unsigned overlapRLog) { - ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); - U32 const overlapLog = (compressionLevel >= ZSTD_maxCLevel()) ? 0 : 3; - size_t const overlapSize = (size_t)1 << (params.cParams.windowLog - overlapLog); + size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2); unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + 1; unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads); @@ -459,11 +460,10 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, DEBUGLOG(4, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); - params.fParams.contentSizeFlag = 1; if (nbChunks==1) { /* fallback to single-thread mode */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; - return ZSTD_compressCCtx(cctx, dst, dstCapacity, src, srcSize, compressionLevel); + return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params); } { unsigned u; @@ -485,8 +485,11 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].dictSize = dictSize; mtctx->jobs[u].srcSize = chunkSize; + mtctx->jobs[u].cdict = mtctx->nextJobID==0 ? cdict : NULL; mtctx->jobs[u].fullFrameSize = srcSize; mtctx->jobs[u].params = params; + /* do not calculate checksum within sections, but write it in header for first section */ + if (mtctx->nextJobID) mtctx->jobs[u].params.fParams.checksumFlag = 0; mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctx = cctx; mtctx->jobs[u].firstChunk = (u==0); @@ -495,8 +498,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; - DEBUGLOG(4, "posting job %u (%u bytes)", u, (U32)chunkSize); - DEBUG_PRINTHEX(5, mtctx->jobs[u].srcStart, 12); + DEBUGLOG(5, "posting job %u (%u bytes)", u, (U32)chunkSize); + DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12); POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); frameStartPos += chunkSize; @@ -526,8 +529,10 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, if (chunkID) { /* note : chunk 0 is already written directly into dst */ if (!error) memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap if chunk decompressed within dst */ - if (chunkID >= compressWithinDst) /* otherwise, it decompresses within dst */ + if (chunkID >= compressWithinDst) { /* otherwise, it decompresses within dst */ + DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst); ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); + } mtctx->jobs[chunkID].dstBuff = g_nullBuffer; } dstPos += cSize ; @@ -536,7 +541,18 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); return error ? error : dstPos; } +} + +size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + int compressionLevel) +{ + U32 const overlapRLog = (compressionLevel >= ZSTD_maxCLevel()) ? 0 : 3; + ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); + params.fParams.contentSizeFlag = 1; + return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapRLog); } @@ -546,6 +562,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) { + DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); while (zcs->doneJobID < zcs->nextJobID) { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); @@ -559,17 +576,19 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) } +/** ZSTDMT_initCStream_internal() : + * internal usage only */ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, const ZSTD_CDict* cdict, ZSTD_parameters params, unsigned long long pledgedSrcSize) { - DEBUGLOG(5, "ZSTDMT_initCStream_internal"); + DEBUGLOG(4, "ZSTDMT_initCStream_internal"); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ if (zcs->nbThreads==1) { - DEBUGLOG(5, "single thread mode"); + DEBUGLOG(4, "single thread mode"); return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], dict, dictSize, cdict, params, pledgedSrcSize); @@ -584,6 +603,7 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->params = params; zcs->frameContentSize = pledgedSrcSize; if (dict) { + DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal); ZSTD_freeCDict(zcs->cdictLocal); zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, 0 /* byRef */, ZSTD_dm_auto, /* note : a loadPrefix becomes an internal CDict */ @@ -591,18 +611,19 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->cdict = zcs->cdictLocal; if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); } else { + DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal); ZSTD_freeCDict(zcs->cdictLocal); zcs->cdictLocal = NULL; zcs->cdict = cdict; } zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog); - DEBUGLOG(5, "overlapRLog : %u ", zcs->overlapRLog); - DEBUGLOG(5, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); + DEBUGLOG(4, "overlapRLog : %u ", zcs->overlapRLog); + DEBUGLOG(4, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); - DEBUGLOG(5, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); + DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); zcs->marginSize = zcs->targetSectionSize >> 2; zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize; zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); @@ -797,6 +818,75 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi } } +/** ZSTDMT_compressStream_generic() : + * internal use only + * assumption : output and input are valid (pos <= size) + * @return : minimum amount of data remaining to flush, 0 if none */ +size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, + ZSTD_outBuffer* output, + ZSTD_inBuffer* input, + ZSTD_EndDirective endOp) +{ + size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize + mtctx->marginSize; + assert(output->pos <= output->size); + assert(input->pos <= input->size); + if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) { + /* current frame being ended. Only flush/end are allowed. Or start new job with init */ + return ERROR(stage_wrong); + } + if (mtctx->nbThreads==1) { + return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp); + } + if ( (mtctx->nextJobID==0) /* just started */ + && (mtctx->inBuff.filled==0) /* nothing buffered yet */ + && (endOp==ZSTD_e_end) /* end order, immediately at beginning */ + && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) /* enough room */ + && (mtctx->cdict==NULL) ) { /* no dictionary */ + size_t const cSize = ZSTDMT_compress_advanced(mtctx, + (char*)output->dst + output->pos, output->size - output->pos, + (const char*)input->src + input->pos, input->size - input->pos, + mtctx->cdict, mtctx->params, mtctx->overlapRLog); + if (ZSTD_isError(cSize)) return cSize; + input->pos = input->size; + output->pos += cSize; + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); /* was allocated in initStream */ + mtctx->allJobsCompleted = 1; + mtctx->frameEnded = 1; + return 0; + } + + /* fill input buffer */ + { size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); + memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); + input->pos += toLoad; + mtctx->inBuff.filled += toLoad; + } + + if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ + && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { /* avoid overwriting job round buffer */ + CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) ); + } + + /* check for potential compressed data ready to be flushed */ + CHECK_F( ZSTDMT_flushNextJob(mtctx, output, (mtctx->inBuff.filled == mtctx->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */ + + if (input->pos < input->size) /* input not consumed : do not flush yet */ + endOp = ZSTD_e_continue; + + switch(endOp) + { + case ZSTD_e_flush: + return ZSTDMT_flushStream(mtctx, output); + case ZSTD_e_end: + return ZSTDMT_endStream(mtctx, output); + case ZSTD_e_continue: + return 1; + default: + return ERROR(GENERIC); /* invalid endDirective */ + } +} + + size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize; @@ -833,21 +923,13 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp { size_t const srcSize = zcs->inBuff.filled - zcs->dictSize; - if (srcSize) - DEBUGLOG(5, "ZSTDMT_flushStream_internal : %u bytes left to compress", - (U32)srcSize); if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { - DEBUGLOG(5, "create new job with %u bytes to compress", (U32)srcSize); - DEBUGLOG(5, "end order : %u", endFrame); CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); - DEBUGLOG(5, "resulting zcs->frameEnded : %u", zcs->frameEnded); } /* check if there is any data available to flush */ - DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u", - zcs->doneJobID, zcs->nextJobID); - return ZSTDMT_flushNextJob(zcs, output, 1 /*blockToFlush */); + return ZSTDMT_flushNextJob(zcs, output, 1 /* blockToFlush */); } @@ -861,22 +943,24 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { - DEBUGLOG(5, "ZSTDMT_endStream"); + DEBUGLOG(4, "ZSTDMT_endStream"); if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */); } -size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, +size_t ZSTDMT_compressStream_generic2(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { + DEBUGLOG(5, "ZSTDMT_compressStream_generic"); DEBUGLOG(5, "in: pos:%u / size:%u ; endOp=%u", (U32)input->pos, (U32)input->size, (U32)endOp); - if (input->pos < input->size) /* exclude final flushes */ + if (input->pos < input->size) /* some input to consume */ CHECK_F(ZSTDMT_compressStream(mtctx, output, input)); - if (input->pos < input->size) endOp = ZSTD_e_continue; + if (input->pos < input->size) /* input not consumed : do not flush yet */ + endOp = ZSTD_e_continue; switch(endOp) { case ZSTD_e_flush: diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index d36914de5..fad63b6d8 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -44,6 +44,7 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, int compressionLevel); + /* === Streaming functions === */ ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); @@ -61,6 +62,13 @@ ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); # define ZSTDMT_SECTION_SIZE_MIN (1U << 20) /* 1 MB - Minimum size of each compression job */ #endif +ZSTDLIB_API size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + const ZSTD_CDict* cdict, + ZSTD_parameters const params, + unsigned overlapRLog); + ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /* dict can be released after init, a local copy is preserved within zcs */ ZSTD_parameters params, -- 2.47.2