From: Yann Collet Date: Tue, 30 May 2017 23:37:19 +0000 (-0700) Subject: removed mtctx->cstream X-Git-Tag: v1.3.0~1^2~17^2~74 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5bcef1ada223c1b1de5634eab4c4c071d9536c5c;p=thirdparty%2Fzstd.git removed mtctx->cstream use the first cctx in pool when ZSTDMT is used in single-thread mode now that cctx and cstream are the same object. --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index bd3de4586..ca5cd6aee 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -27,7 +27,13 @@ /* ====== Debug ====== */ -#if defined(ZSTDMT_DEBUG) +#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=1) +# include +#else +# define assert(condition) ((void)0) +#endif + +#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=2) # include # include @@ -309,7 +315,6 @@ struct ZSTDMT_CCtx_s { size_t sectionSize; ZSTD_customMem cMem; ZSTD_CDict* cdict; - ZSTD_CStream* cstream; }; ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) @@ -343,11 +348,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) ZSTDMT_freeCCtx(mtctx); return NULL; } - if (nbThreads==1) { - mtctx->cstream = ZSTD_createCStream_advanced(cMem); - if (!mtctx->cstream) { - ZSTDMT_freeCCtx(mtctx); return NULL; - } } pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL); /* Todo : check init function return */ pthread_cond_init(&mtctx->jobCompleted_cond, NULL); DEBUGLOG(4, "mt_cctx created, for %u threads", nbThreads); @@ -387,7 +387,6 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) ZSTD_free(mtctx->jobs, mtctx->cMem); ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTD_freeCDict(mtctx->cdict); - ZSTD_freeCStream(mtctx->cstream); pthread_mutex_destroy(&mtctx->jobCompleted_mutex); pthread_cond_destroy(&mtctx->jobCompleted_cond); ZSTD_free(mtctx, mtctx->cMem); @@ -540,8 +539,12 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, ZSTD_parameters params, unsigned long long pledgedSrcSize) { ZSTD_customMem const cmem = { NULL, NULL, NULL }; - DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog); - if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize); + DEBUGLOG(3, "Started new compression, with windowLog : %u", + params.cParams.windowLog); + if (zcs->nbThreads==1) + return ZSTD_initCStream_advanced(zcs->cctxPool->cctx[0], + dict, dictSize, + params, pledgedSrcSize); if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */ ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_releaseAllJobResources(zcs); @@ -587,7 +590,8 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, * pledgedSrcSize is optional and can be zero == unknown */ size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) { - if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize); + if (zcs->nbThreads==1) + return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize); return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize); } @@ -612,13 +616,16 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi return ERROR(memory_allocation); } - DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); + DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", + zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; - zcs->jobs[jobID].dictSize = zcs->dictSize; /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */ + zcs->jobs[jobID].dictSize = zcs->dictSize; + assert(zcs->inBuff.filled >= srcSize + zcs->dictSize); zcs->jobs[jobID].params = zcs->params; - if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ + /* do not calculate checksum within sections, but write it in header for first section */ + if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; zcs->jobs[jobID].dstBuff = dstBuffer; @@ -643,8 +650,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi } DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled); zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; - DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize)); - memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled); + DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", + (U32)zcs->inBuff.filled, (U32)newDictSize, + (U32)(zcs->inBuff.filled - newDictSize)); + memmove(zcs->inBuff.buffer.start, + (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, + zcs->inBuff.filled); DEBUGLOG(5, "new inBuff pre-filled"); zcs->dictSize = newDictSize; } else { @@ -653,10 +664,16 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->dictSize = 0; zcs->frameEnded = 1; if (zcs->nextJobID == 0) - zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */ + /* single chunk exception : checksum is calculated directly within worker thread */ + zcs->params.fParams.checksumFlag = 0; } - DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); + DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", + zcs->nextJobID, + (U32)zcs->jobs[jobID].srcSize, + zcs->jobs[jobID].lastChunk, + zcs->doneJobID, + zcs->doneJobID & zcs->jobIDMask); POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ zcs->nextJobID++; return 0; @@ -729,8 +746,11 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize; - if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */ - if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input); + if (zcs->frameEnded) + /* current frame being ended. Only flush is allowed. Restart with init */ + return ERROR(stage_wrong); + if (zcs->nbThreads==1) + return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input); /* fill input buffer */ { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); @@ -770,12 +790,14 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { - if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output); + if (zcs->nbThreads==1) + return ZSTD_flushStream(zcs->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(zcs, output, 0); } size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { - if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output); + if (zcs->nbThreads==1) + return ZSTD_endStream(zcs->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(zcs, output, 1); }