]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
completed ZSTDMT streaming compression
authorYann Collet <cyan@fb.com>
Tue, 17 Jan 2017 23:31:16 +0000 (15:31 -0800)
committerYann Collet <cyan@fb.com>
Tue, 17 Jan 2017 23:31:16 +0000 (15:31 -0800)
Provides the baseline compression API :
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);

Not tested yet

lib/compress/zstdmt_compress.c

index 6f467f6a5d44eecb19f445fd3f1b9d3453e68ad7..fb9183f9e41393a99d291cba640e054362584aa2 100644 (file)
@@ -2,10 +2,11 @@
 #include <string.h>   /* memcpy */
 #include <pool.h>     /* threadpool */
 #include "threading.h"  /* mutex */
-#include "zstd_internal.h"   /* MIN, ERROR, ZSTD_* */
+#include "zstd_internal.h"   /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
 #include "zstdmt_compress.h"
 
 #if 0
+
 #  include <stdio.h>
 #  include <unistd.h>
 #  include <sys/times.h>
@@ -163,8 +164,14 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 
 /* =====   Thread worker   ===== */
 
+typedef struct {
+    buffer_t buffer;
+    size_t filled;
+} inBuff_t;
+
 typedef struct {
     ZSTD_CCtx* cctx;
+    buffer_t src;
     const void* srcStart;
     size_t srcSize;
     buffer_t dstBuff;
@@ -208,25 +215,41 @@ _endJob:
 }
 
 
+/* ------------------------------------------ */
 /* =====   Multi-threaded compression   ===== */
+/* ------------------------------------------ */
 
 struct ZSTDMT_CCtx_s {
     POOL_ctx* factory;
     ZSTDMT_bufferPool* buffPool;
     ZSTDMT_CCtxPool* cctxPool;
-    unsigned nbThreads;
     pthread_mutex_t jobCompleted_mutex;
     pthread_cond_t jobCompleted_cond;
-    ZSTDMT_jobDescription jobs[1];   /* variable size */
+    size_t targetSectionSize;
+    size_t inBuffSize;
+    inBuff_t inBuff;
+    ZSTD_parameters params;
+    unsigned nbThreads;
+    unsigned jobIDMask;
+    unsigned doneJobID;
+    unsigned nextJobID;
+    unsigned frameEnded;
+    ZSTDMT_jobDescription jobs[1];   /* variable size (must lies at the end) */
 };
 
 ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
 {
     ZSTDMT_CCtx* cctx;
+    U32 const minNbJobs = nbThreads + 1;
+    U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
+    U32 const nbJobs = 1 << nbJobsLog2;
+    DEBUGLOG(4, "nbThreads : %u  ; minNbJobs : %u ;  nbJobsLog2 : %u ;  nbJobs : %u  \n",
+            nbThreads, minNbJobs, nbJobsLog2, nbJobs);
     if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
-    cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbThreads*sizeof(ZSTDMT_jobDescription));
+    cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription));
     if (!cctx) return NULL;
     cctx->nbThreads = nbThreads;
+    cctx->jobIDMask = nbJobs - 1;
     cctx->factory = POOL_create(nbThreads, 1);
     cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
     cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
@@ -338,46 +361,46 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
 /* =======      Streaming API     ======= */
 /* ====================================== */
 
-#if 0
+#if 1
 
 size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
     zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
-    zcs->targetSectionSize = 1 << (zcs->params.cParams.windowLog + 2);
+    zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
     zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog);
     zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);   /* check for NULL ! */
-    zcs->inBuff.current = 0;
+    zcs->inBuff.filled = 0;
     zcs->doneJobID = 0;
     zcs->nextJobID = 0;
+    zcs->frameEnded = 0;
     return 0;
 }
 
-typedef struct {
-    buffer_t buffer;
-    unsigned current;
-} inBuff_t;
-
 
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
+    if (zcs->frameEnded) return ERROR(stage_wrong);
+
     /* fill input buffer */
-    {   size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.current);
-        memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.current, input->src, toLoad);
+    {   size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
+        memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad);
         input->pos += toLoad;
     }
 
-    if (zcs->inBuff.current == zcs->inBuffSize) {   /* filled enough : let's compress */
+    if (zcs->inBuff.filled == zcs->inBuffSize) {   /* filled enough : let's compress */
         size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
-        buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->targetSectionSize); /* should check for NULL */
+        buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
         ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);   /* should check for NULL */
-        unsigned const jobID = zcs->nextJobID & zcs->jobIDmask;
+        unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
-        zcs->jobs[jobID].srcStart = zcs->inBuff.start;
+        zcs->jobs[jobID].src = zcs->inBuff.buffer;
+        zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
         zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
         zcs->jobs[jobID].fullFrameSize = 0;
-        zcs->jobs[jobID].compressionLevel = zcs->compressionLevel;
+        zcs->jobs[jobID].params = zcs->params;
         zcs->jobs[jobID].dstBuff = dstBuffer;
         zcs->jobs[jobID].cctx = cctx;
-        zcs->jobs[jobID].frameID = (jobID>0);
+        zcs->jobs[jobID].firstChunk = (jobID==0);
+        zcs->jobs[jobID].lastChunk = 0;
         zcs->jobs[jobID].jobCompleted = 0;
         zcs->jobs[jobID].dstFlushed = 0;
         zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
@@ -385,22 +408,22 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
 
         /* get a new buffer for next input - save remaining into it */
         zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);   /* check for NULL ! */
-        zcs->inBuff.current = zcs->inBuffSize - zcs->targetSectionSize;
-        memcpy(zcs->inBuff.buffer.start, (char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.current);
+        zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize);
+        memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled);
 
-        DEBUGLOG(3, "posting job %u   (%u bytes)", jobID, (U32)zcs->jobs[jobID].srcSize);
+        DEBUGLOG(3, "posting job %u   (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
         POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
         zcs->nextJobID++;
     }
 
     /* check if there is any data available to flush */
-    {   unsigned const jobID = zcs->doneJobID & zcs->jobIDmask;
+    {   unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
         ZSTDMT_jobDescription job = zcs->jobs[jobID];
         if (job.jobCompleted) {   /* job completed : output can be flushed */
             size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
             ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL;   /* release cctx for future task */
-            free(job.srcStart); zcs->jobs[jobID].srcStart = NULL; /* note : need a buff_t for release */
-            memcpy((char*)output->dst + output->pos, job.dstBuff.start + job.dstFlushed, toWrite);
+            ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = (buffer_t) { NULL, 0 };
+            memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
             if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => next one */
@@ -411,10 +434,77 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
     }   }
 
     /* recommended next input size : fill current input buffer */
-    return zcs->inBuffSize - zcs->inBuff.current;
+    return zcs->inBuffSize - zcs->inBuff.filled;
+}
+
+static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
+{
+    size_t const srcSize = zcs->inBuff.filled;
+
+    if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) {
+        size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
+        buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
+        ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);   /* should check for NULL */
+        unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
+        zcs->jobs[jobID].src = zcs->inBuff.buffer;
+        zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
+        zcs->jobs[jobID].srcSize = srcSize;
+        zcs->jobs[jobID].fullFrameSize = 0;
+        zcs->jobs[jobID].params = zcs->params;
+        zcs->jobs[jobID].dstBuff = dstBuffer;
+        zcs->jobs[jobID].cctx = cctx;
+        zcs->jobs[jobID].firstChunk = (jobID==0);
+        zcs->jobs[jobID].lastChunk = endFrame;
+        zcs->jobs[jobID].jobCompleted = 0;
+        zcs->jobs[jobID].dstFlushed = 0;
+        zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
+        zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+
+        /* get a new buffer for next input */
+        if (!endFrame) {
+            zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);   /* check for NULL ! */
+            zcs->inBuff.filled = 0;
+        } else {
+            zcs->frameEnded = 1;
+        }
+
+        DEBUGLOG(3, "posting job %u   (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
+        POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
+        zcs->nextJobID++;
+    }
+
+    /* check if there is any data available to flush */
+    {   unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
+        ZSTDMT_jobDescription job = zcs->jobs[wJobID];
+        if (job.jobCompleted) {   /* job completed : output can be flushed */
+            size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+            ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL;   /* release cctx for future task */
+            ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 };
+            memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+            output->pos += toWrite;
+            job.dstFlushed += toWrite;
+            if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => next one */
+                ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = (buffer_t) { NULL, 0 };
+                zcs->doneJobID++;
+            } else {
+                zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
+        }   }
+        /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
+        if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
+        return (zcs->doneJobID < zcs->nextJobID);
+    }
+}
+
+
+size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
+{
+    return ZSTDMT_flushStream_internal(zcs, output, 0);
+}
+
+size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
+{
+    return ZSTDMT_flushStream_internal(zcs, output, 1);
 }
 
-size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
-size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
 
 #endif