From: Yann Collet Date: Thu, 12 Jan 2017 00:25:46 +0000 (+0100) Subject: zstdmt : changed internal naming from frame to chunk X-Git-Tag: v1.1.3^2~19^2~46 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=107bcbbbc23c73e8d48f066adc8959af690c0b12;p=thirdparty%2Fzstd.git zstdmt : changed internal naming from frame to chunk Since the result of mt compression is a single frame, changed naming, which implied the concatenation of multiple frames. minor : ensures that content size is written in header --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index ae986468b..6fe37a6f4 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1,7 +1,8 @@ #include /* malloc */ +#include /* memcpy */ #include /* threadpool */ #include "threading.h" /* mutex */ -#include "zstd_internal.h" /* MIN, ERROR */ +#include "zstd_internal.h" /* MIN, ERROR, ZSTD_* */ #include "zstdmt_compress.h" #if 0 @@ -43,7 +44,7 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \ #define ZSTDMT_NBTHREADS_MAX 128 -/* === Buffer Pool === */ +/* ===== Buffer Pool ===== */ typedef struct buffer_s { void* start; @@ -82,13 +83,12 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) size_t const availBufferSize = buf.size; if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */ return buf; - free(buf.start); /* size conditions not respected : create a new buffer */ + free(buf.start); /* size conditions not respected : scratch this buffer and create a new one */ } /* create new buffer */ - { buffer_t buf; - buf.size = bSize; - buf.start = malloc(bSize); - return buf; + { void* const start = malloc(bSize); + if (start==NULL) bSize = 0; + return (buffer_t) { start, bSize }; /* note : start can be NULL if malloc fails ! */ } } @@ -104,52 +104,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) } - -typedef struct { - ZSTD_CCtx* cctx; - const void* srcStart; - size_t srcSize; - buffer_t dstBuff; - int compressionLevel; - unsigned frameID; - unsigned long long fullFrameSize; - size_t cSize; - unsigned jobCompleted; - pthread_mutex_t* jobCompleted_mutex; - pthread_cond_t* jobCompleted_cond; -} ZSTDMT_jobDescription; - -/* ZSTDMT_compressFrame() : POOL_function type */ -void ZSTDMT_compressFrame(void* jobDescription) -{ - ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; - buffer_t dstBuff = job->dstBuff; - ZSTD_parameters const params = ZSTD_getParams(job->compressionLevel, job->fullFrameSize, 0); - size_t hSize = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, params, job->fullFrameSize); - if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } - hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0); /* flush frame header */ - if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } - if ((job->frameID & 1) == 0) { /* preserve frame header when it is first beginning of frame */ - dstBuff.start = (char*)dstBuff.start + hSize; - dstBuff.size -= hSize; - } else - hSize = 0; - - job->cSize = (job->frameID>=2) ? /* last chunk signal */ - ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) : - ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize); - if (!ZSTD_isError(job->cSize)) job->cSize += hSize; - DEBUGLOG(5, "frame %u : compressed %u bytes into %u bytes ", (unsigned)job->frameID, (unsigned)job->srcSize, (unsigned)job->cSize); - -_endJob: - PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); - job->jobCompleted = 1; - pthread_cond_signal(job->jobCompleted_cond); - pthread_mutex_unlock(job->jobCompleted_mutex); -} - - -/* === CCtx Pool === */ +/* ===== CCtx Pool ===== */ typedef struct { unsigned totalCCtx; @@ -191,11 +146,12 @@ static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool) return pool->cctx[pool->availCCtx]; } /* note : should not be possible, since totalCCtx==nbThreads */ - return ZSTD_createCCtx(); + return ZSTD_createCCtx(); /* note : can be NULL is creation fails ! */ } static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) { + if (cctx==NULL) return; /* release on NULL */ if (pool->availCCtx < pool->totalCCtx) pool->cctx[pool->availCCtx++] = cctx; else @@ -204,6 +160,55 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) } +/* ===== Thread worker ===== */ + +typedef struct { + ZSTD_CCtx* cctx; + const void* srcStart; + size_t srcSize; + buffer_t dstBuff; + size_t cSize; + size_t dstFlushed; + unsigned long long fullFrameSize; + unsigned firstChunk; + unsigned lastChunk; + unsigned jobCompleted; + pthread_mutex_t* jobCompleted_mutex; + pthread_cond_t* jobCompleted_cond; + ZSTD_parameters params; +} ZSTDMT_jobDescription; + +/* ZSTDMT_compressChunk() : POOL_function type */ +void ZSTDMT_compressChunk(void* jobDescription) +{ + ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; + buffer_t dstBuff = job->dstBuff; + size_t hSize = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, job->params, job->fullFrameSize); + if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } + hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0); /* flush frame header */ + if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } + if (job->firstChunk) { /* preserve frame header when it is first chunk - otherwise, overwrite */ + dstBuff.start = (char*)dstBuff.start + hSize; + dstBuff.size -= hSize; + } else + hSize = 0; + + job->cSize = (job->lastChunk) ? /* last chunk signal */ + ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) : + ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize); + if (!ZSTD_isError(job->cSize)) job->cSize += hSize; + DEBUGLOG(5, "chunk %u : compressed %u bytes into %u bytes ", (unsigned)job->lastChunk, (unsigned)job->srcSize, (unsigned)job->cSize); + +_endJob: + PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); + job->jobCompleted = 1; + pthread_cond_signal(job->jobCompleted_cond); + pthread_mutex_unlock(job->jobCompleted_mutex); +} + + +/* ===== Multi-threaded compression ===== */ + struct ZSTDMT_CCtx_s { POOL_ctx* factory; ZSTDMT_bufferPool* buffPool; @@ -250,64 +255,66 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, const void* src, size_t srcSize, int compressionLevel) { - ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0); - size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2); - unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */; - unsigned const nbFrames = MIN(nbFramesMax, mtctx->nbThreads); - size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames; + ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); + size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2); + unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + (srcSize < chunkTargetSize) /* min 1 */; + unsigned const nbChunks = MIN(nbChunksMax, mtctx->nbThreads); + size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; + size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ size_t remainingSrcSize = srcSize; const char* const srcStart = (const char*)src; size_t frameStartPos = 0; - - DEBUGLOG(2, "windowLog : %u => frameSizeTarget : %u ", params.cParams.windowLog, (U32)frameSizeTarget); - DEBUGLOG(2, "nbFrames : %u (size : %u bytes) ", nbFrames, (U32)avgFrameSize); + DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); + DEBUGLOG(2, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); + params.fParams.contentSizeFlag = 1; { unsigned u; - for (u=0; ubuffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity }; - ZSTD_CCtx* cctx = ZSTDMT_getCCtx(mtctx->cctxPool); + ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool); /* should check for NULL ! */ mtctx->jobs[u].srcStart = srcStart + frameStartPos; - mtctx->jobs[u].srcSize = frameSize; + mtctx->jobs[u].srcSize = chunkSize; mtctx->jobs[u].fullFrameSize = srcSize; - mtctx->jobs[u].compressionLevel = compressionLevel; + mtctx->jobs[u].params = params; mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctx = cctx; - mtctx->jobs[u].frameID = (u>0) | ((u==nbFrames-1)<<1); + mtctx->jobs[u].firstChunk = (u==0); + mtctx->jobs[u].lastChunk = (u==nbChunks-1); mtctx->jobs[u].jobCompleted = 0; mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; - DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)frameSize); - POOL_add(mtctx->factory, ZSTDMT_compressFrame, &mtctx->jobs[u]); + DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)chunkSize); + POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); - frameStartPos += frameSize; - remainingSrcSize -= frameSize; + frameStartPos += chunkSize; + remainingSrcSize -= chunkSize; } } - /* note : since nbFrames <= nbThreads, all jobs should be running immediately in parallel */ + /* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */ - { unsigned frameID; + { unsigned chunkID; size_t dstPos = 0; - for (frameID=0; frameIDjobCompleted_mutex); - while (mtctx->jobs[frameID].jobCompleted==0) { - DEBUGLOG(4, "waiting for jobCompleted signal from frame %u", frameID); + while (mtctx->jobs[chunkID].jobCompleted==0) { + DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID); pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); } pthread_mutex_unlock(&mtctx->jobCompleted_mutex); - ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[frameID].cctx); - { size_t const cSize = mtctx->jobs[frameID].cSize; + ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx); + { size_t const cSize = mtctx->jobs[chunkID].cSize; if (ZSTD_isError(cSize)) return cSize; if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall); - if (frameID) { /* note : frame 0 is already written directly into dst */ - memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, cSize); - ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[frameID].dstBuff); + if (chunkID) { /* note : chunk 0 is already written directly into dst */ + memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); } dstPos += cSize ; } @@ -317,3 +324,89 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, } } + + +/* ====================================== */ +/* ======= Streaming API ======= */ +/* ====================================== */ + +#if 0 + +size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { + zcs->params = ZSTD_getParams(compressionLevel, 0, 0); + zcs->targetSectionSize = 1 << (zcs->params.cParams.windowLog + 2); + zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog); + zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */ + zcs->inBuff.current = 0; + zcs->doneJobID = 0; + zcs->nextJobID = 0; + return 0; +} + +typedef struct { + buffer_t buffer; + unsigned current; +} inBuff_t; + + +size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +{ + /* fill input buffer */ + { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.current); + memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.current, input->src, toLoad); + input->pos += toLoad; + } + + if (zcs->inBuff.current == zcs->inBuffSize) { /* filled enough : let's compress */ + size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize); + buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->targetSectionSize); /* should check for NULL */ + ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */ + unsigned const jobID = zcs->nextJobID & zcs->jobIDmask; + + zcs->jobs[jobID].srcStart = zcs->inBuff.start; + zcs->jobs[jobID].srcSize = zcs->targetSectionSize; + zcs->jobs[jobID].fullFrameSize = 0; + zcs->jobs[jobID].compressionLevel = zcs->compressionLevel; + zcs->jobs[jobID].dstBuff = dstBuffer; + zcs->jobs[jobID].cctx = cctx; + zcs->jobs[jobID].frameID = (jobID>0); + zcs->jobs[jobID].jobCompleted = 0; + zcs->jobs[jobID].dstFlushed = 0; + zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; + zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; + + /* get a new buffer for next input - save remaining into it */ + zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */ + zcs->inBuff.current = zcs->inBuffSize - zcs->targetSectionSize; + memcpy(zcs->inBuff.buffer.start, (char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.current); + + DEBUGLOG(3, "posting job %u (%u bytes)", jobID, (U32)zcs->jobs[jobID].srcSize); + POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); + zcs->nextJobID++; + } + + /* check if there is any data available to flush */ + { unsigned const jobID = zcs->doneJobID & zcs->jobIDmask; + ZSTDMT_jobDescription job = zcs->jobs[jobID]; + if (job.jobCompleted) { /* job completed : output can be flushed */ + size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); + ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL; /* release cctx for future task */ + free(job.srcStart); zcs->jobs[jobID].srcStart = NULL; /* note : need a buff_t for release */ + memcpy((char*)output->dst + output->pos, job.dstBuff.start + job.dstFlushed, toWrite); + output->pos += toWrite; + job.dstFlushed += toWrite; + if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */ + ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); + zcs->doneJobID++; + } else + zcs->jobs[jobID].dstFlushed = job.dstFlushed; + } } + + /* recommended next input size : fill current input buffer */ + return zcs->inBuffSize - zcs->inBuff.current; +} + +size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); +size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); + +#endif diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 73ee379b8..ca5d6b601 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -1,12 +1,24 @@ +/* === Dependencies === */ #include /* size_t */ +#include "zstd.h" /* ZSTD_inBuffer, ZSTD_outBuffer */ -typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx; -ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads); +/* === Simple one-pass functions === */ + +typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx; +ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads); size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx); size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, int compressionLevel); + + +/* === Streaming functions === */ + +size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel); +size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); +size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); +size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);