From: Yann Collet Date: Tue, 17 Jan 2017 23:31:16 +0000 (-0800) Subject: completed ZSTDMT streaming compression X-Git-Tag: v1.1.3^2~19^2~42 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a73c4129329d6dc4c81987af987f3574569bbc0f;p=thirdparty%2Fzstd.git completed ZSTDMT streaming compression Provides the baseline compression API : size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel); size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); Not tested yet --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 6f467f6a5..fb9183f9e 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -2,10 +2,11 @@ #include /* memcpy */ #include /* threadpool */ #include "threading.h" /* mutex */ -#include "zstd_internal.h" /* MIN, ERROR, ZSTD_* */ +#include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ #include "zstdmt_compress.h" #if 0 + # include # include # include @@ -163,8 +164,14 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) /* ===== Thread worker ===== */ +typedef struct { + buffer_t buffer; + size_t filled; +} inBuff_t; + typedef struct { ZSTD_CCtx* cctx; + buffer_t src; const void* srcStart; size_t srcSize; buffer_t dstBuff; @@ -208,25 +215,41 @@ _endJob: } +/* ------------------------------------------ */ /* ===== Multi-threaded compression ===== */ +/* ------------------------------------------ */ struct ZSTDMT_CCtx_s { POOL_ctx* factory; ZSTDMT_bufferPool* buffPool; ZSTDMT_CCtxPool* cctxPool; - unsigned nbThreads; pthread_mutex_t jobCompleted_mutex; pthread_cond_t jobCompleted_cond; - ZSTDMT_jobDescription jobs[1]; /* variable size */ + size_t targetSectionSize; + size_t inBuffSize; + inBuff_t inBuff; + ZSTD_parameters params; + unsigned nbThreads; + unsigned jobIDMask; + unsigned doneJobID; + unsigned nextJobID; + unsigned frameEnded; + ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */ }; ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) { ZSTDMT_CCtx* cctx; + U32 const minNbJobs = nbThreads + 1; + U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1; + U32 const nbJobs = 1 << nbJobsLog2; + DEBUGLOG(4, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n", + nbThreads, minNbJobs, nbJobsLog2, nbJobs); if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; - cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbThreads*sizeof(ZSTDMT_jobDescription)); + cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription)); if (!cctx) return NULL; cctx->nbThreads = nbThreads; + cctx->jobIDMask = nbJobs - 1; cctx->factory = POOL_create(nbThreads, 1); cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); @@ -338,46 +361,46 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, /* ======= Streaming API ======= */ /* ====================================== */ -#if 0 +#if 1 size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { zcs->params = ZSTD_getParams(compressionLevel, 0, 0); - zcs->targetSectionSize = 1 << (zcs->params.cParams.windowLog + 2); + zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2); zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */ - zcs->inBuff.current = 0; + zcs->inBuff.filled = 0; zcs->doneJobID = 0; zcs->nextJobID = 0; + zcs->frameEnded = 0; return 0; } -typedef struct { - buffer_t buffer; - unsigned current; -} inBuff_t; - size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { + if (zcs->frameEnded) return ERROR(stage_wrong); + /* fill input buffer */ - { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.current); - memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.current, input->src, toLoad); + { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); + memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad); input->pos += toLoad; } - if (zcs->inBuff.current == zcs->inBuffSize) { /* filled enough : let's compress */ + if (zcs->inBuff.filled == zcs->inBuffSize) { /* filled enough : let's compress */ size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize); - buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->targetSectionSize); /* should check for NULL */ + buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */ - unsigned const jobID = zcs->nextJobID & zcs->jobIDmask; + unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; - zcs->jobs[jobID].srcStart = zcs->inBuff.start; + zcs->jobs[jobID].src = zcs->inBuff.buffer; + zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = zcs->targetSectionSize; zcs->jobs[jobID].fullFrameSize = 0; - zcs->jobs[jobID].compressionLevel = zcs->compressionLevel; + zcs->jobs[jobID].params = zcs->params; zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].cctx = cctx; - zcs->jobs[jobID].frameID = (jobID>0); + zcs->jobs[jobID].firstChunk = (jobID==0); + zcs->jobs[jobID].lastChunk = 0; zcs->jobs[jobID].jobCompleted = 0; zcs->jobs[jobID].dstFlushed = 0; zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; @@ -385,22 +408,22 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu /* get a new buffer for next input - save remaining into it */ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */ - zcs->inBuff.current = zcs->inBuffSize - zcs->targetSectionSize; - memcpy(zcs->inBuff.buffer.start, (char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.current); + zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize); + memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled); - DEBUGLOG(3, "posting job %u (%u bytes)", jobID, (U32)zcs->jobs[jobID].srcSize); + DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize); POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); zcs->nextJobID++; } /* check if there is any data available to flush */ - { unsigned const jobID = zcs->doneJobID & zcs->jobIDmask; + { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; ZSTDMT_jobDescription job = zcs->jobs[jobID]; if (job.jobCompleted) { /* job completed : output can be flushed */ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL; /* release cctx for future task */ - free(job.srcStart); zcs->jobs[jobID].srcStart = NULL; /* note : need a buff_t for release */ - memcpy((char*)output->dst + output->pos, job.dstBuff.start + job.dstFlushed, toWrite); + ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = (buffer_t) { NULL, 0 }; + memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */ @@ -411,10 +434,77 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu } } /* recommended next input size : fill current input buffer */ - return zcs->inBuffSize - zcs->inBuff.current; + return zcs->inBuffSize - zcs->inBuff.filled; +} + +static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame) +{ + size_t const srcSize = zcs->inBuff.filled; + + if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) { + size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); + buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */ + ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */ + unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; + zcs->jobs[jobID].src = zcs->inBuff.buffer; + zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; + zcs->jobs[jobID].srcSize = srcSize; + zcs->jobs[jobID].fullFrameSize = 0; + zcs->jobs[jobID].params = zcs->params; + zcs->jobs[jobID].dstBuff = dstBuffer; + zcs->jobs[jobID].cctx = cctx; + zcs->jobs[jobID].firstChunk = (jobID==0); + zcs->jobs[jobID].lastChunk = endFrame; + zcs->jobs[jobID].jobCompleted = 0; + zcs->jobs[jobID].dstFlushed = 0; + zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; + zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; + + /* get a new buffer for next input */ + if (!endFrame) { + zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */ + zcs->inBuff.filled = 0; + } else { + zcs->frameEnded = 1; + } + + DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize); + POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); + zcs->nextJobID++; + } + + /* check if there is any data available to flush */ + { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; + ZSTDMT_jobDescription job = zcs->jobs[wJobID]; + if (job.jobCompleted) { /* job completed : output can be flushed */ + size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); + ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */ + ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 }; + memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); + output->pos += toWrite; + job.dstFlushed += toWrite; + if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */ + ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = (buffer_t) { NULL, 0 }; + zcs->doneJobID++; + } else { + zcs->jobs[wJobID].dstFlushed = job.dstFlushed; + } } + /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ + if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); + return (zcs->doneJobID < zcs->nextJobID); + } +} + + +size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) +{ + return ZSTDMT_flushStream_internal(zcs, output, 0); +} + +size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) +{ + return ZSTDMT_flushStream_internal(zcs, output, 1); } -size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); -size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); #endif