]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : changed internal naming from frame to chunk
authorYann Collet <cyan@fb.com>
Thu, 12 Jan 2017 00:25:46 +0000 (01:25 +0100)
committerYann Collet <cyan@fb.com>
Thu, 12 Jan 2017 00:25:46 +0000 (01:25 +0100)
Since the result of mt compression is a single frame,
changed naming, which implied the concatenation of multiple frames.

minor : ensures that content size is written in header

lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h

index ae986468baeaec51bbce8dba822936514b13635d..6fe37a6f46f55110896faf08dcc5357d21f4dc7c 100644 (file)
@@ -1,7 +1,8 @@
 #include <stdlib.h>   /* malloc */
+#include <string.h>   /* memcpy */
 #include <pool.h>     /* threadpool */
 #include "threading.h"  /* mutex */
-#include "zstd_internal.h"   /* MIN, ERROR */
+#include "zstd_internal.h"   /* MIN, ERROR, ZSTD_* */
 #include "zstdmt_compress.h"
 
 #if 0
@@ -43,7 +44,7 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
 
 #define ZSTDMT_NBTHREADS_MAX 128
 
-/* ===   Buffer Pool   === */
+/* =====   Buffer Pool   ===== */
 
 typedef struct buffer_s {
     void* start;
@@ -82,13 +83,12 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
         size_t const availBufferSize = buf.size;
         if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize))   /* large enough, but not too much */
             return buf;
-        free(buf.start);   /* size conditions not respected : create a new buffer */
+        free(buf.start);   /* size conditions not respected : scratch this buffer and create a new one */
     }
     /* create new buffer */
-    {   buffer_t buf;
-        buf.size = bSize;
-        buf.start = malloc(bSize);
-        return buf;
+    {   void* const start = malloc(bSize);
+        if (start==NULL) bSize = 0;
+        return (buffer_t) { start, bSize };   /* note : start can be NULL if malloc fails ! */
     }
 }
 
@@ -104,52 +104,7 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
 }
 
 
-
-typedef struct {
-    ZSTD_CCtx* cctx;
-    const void* srcStart;
-    size_t srcSize;
-    buffer_t dstBuff;
-    int compressionLevel;
-    unsigned frameID;
-    unsigned long long fullFrameSize;
-    size_t cSize;
-    unsigned jobCompleted;
-    pthread_mutex_t* jobCompleted_mutex;
-    pthread_cond_t* jobCompleted_cond;
-} ZSTDMT_jobDescription;
-
-/* ZSTDMT_compressFrame() : POOL_function type */
-void ZSTDMT_compressFrame(void* jobDescription)
-{
-    ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
-    buffer_t dstBuff = job->dstBuff;
-    ZSTD_parameters const params = ZSTD_getParams(job->compressionLevel, job->fullFrameSize, 0);
-    size_t hSize = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, params, job->fullFrameSize);
-    if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
-    hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0);   /* flush frame header */
-    if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
-    if ((job->frameID & 1) == 0) {   /* preserve frame header when it is first beginning of frame */
-        dstBuff.start = (char*)dstBuff.start + hSize;
-        dstBuff.size -= hSize;
-    } else
-        hSize = 0;
-
-    job->cSize = (job->frameID>=2) ?   /* last chunk signal */
-                 ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) :
-                 ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize);
-    if (!ZSTD_isError(job->cSize)) job->cSize += hSize;
-    DEBUGLOG(5, "frame %u : compressed %u bytes into %u bytes  ", (unsigned)job->frameID, (unsigned)job->srcSize, (unsigned)job->cSize);
-
-_endJob:
-    PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
-    job->jobCompleted = 1;
-    pthread_cond_signal(job->jobCompleted_cond);
-    pthread_mutex_unlock(job->jobCompleted_mutex);
-}
-
-
-/* ===   CCtx Pool   === */
+/* =====   CCtx Pool   ===== */
 
 typedef struct {
     unsigned totalCCtx;
@@ -191,11 +146,12 @@ static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool)
         return pool->cctx[pool->availCCtx];
     }
     /* note : should not be possible, since totalCCtx==nbThreads */
-    return ZSTD_createCCtx();
+    return ZSTD_createCCtx();   /* note : can be NULL is creation fails ! */
 }
 
 static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 {
+    if (cctx==NULL) return;   /* release on NULL */
     if (pool->availCCtx < pool->totalCCtx)
         pool->cctx[pool->availCCtx++] = cctx;
     else
@@ -204,6 +160,55 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 }
 
 
+/* =====   Thread worker   ===== */
+
+typedef struct {
+    ZSTD_CCtx* cctx;
+    const void* srcStart;
+    size_t srcSize;
+    buffer_t dstBuff;
+    size_t cSize;
+    size_t dstFlushed;
+    unsigned long long fullFrameSize;
+    unsigned firstChunk;
+    unsigned lastChunk;
+    unsigned jobCompleted;
+    pthread_mutex_t* jobCompleted_mutex;
+    pthread_cond_t* jobCompleted_cond;
+    ZSTD_parameters params;
+} ZSTDMT_jobDescription;
+
+/* ZSTDMT_compressChunk() : POOL_function type */
+void ZSTDMT_compressChunk(void* jobDescription)
+{
+    ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
+    buffer_t dstBuff = job->dstBuff;
+    size_t hSize = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, job->params, job->fullFrameSize);
+    if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
+    hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0);   /* flush frame header */
+    if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
+    if (job->firstChunk) {   /* preserve frame header when it is first chunk - otherwise, overwrite */
+        dstBuff.start = (char*)dstBuff.start + hSize;
+        dstBuff.size -= hSize;
+    } else
+        hSize = 0;
+
+    job->cSize = (job->lastChunk) ?   /* last chunk signal */
+                 ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) :
+                 ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize);
+    if (!ZSTD_isError(job->cSize)) job->cSize += hSize;
+    DEBUGLOG(5, "chunk %u : compressed %u bytes into %u bytes  ", (unsigned)job->lastChunk, (unsigned)job->srcSize, (unsigned)job->cSize);
+
+_endJob:
+    PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
+    job->jobCompleted = 1;
+    pthread_cond_signal(job->jobCompleted_cond);
+    pthread_mutex_unlock(job->jobCompleted_mutex);
+}
+
+
+/* =====   Multi-threaded compression   ===== */
+
 struct ZSTDMT_CCtx_s {
     POOL_ctx* factory;
     ZSTDMT_bufferPool* buffPool;
@@ -250,64 +255,66 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
                      const void* src, size_t srcSize,
                            int compressionLevel)
 {
-    ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
-    size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
-    unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
-    unsigned const nbFrames = MIN(nbFramesMax, mtctx->nbThreads);
-    size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames;
+    ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0);
+    size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2);
+    unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + (srcSize < chunkTargetSize) /* min 1 */;
+    unsigned const nbChunks = MIN(nbChunksMax, mtctx->nbThreads);
+    size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
+    size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize;   /* avoid too small last block */
     size_t remainingSrcSize = srcSize;
     const char* const srcStart = (const char*)src;
     size_t frameStartPos = 0;
 
-
-    DEBUGLOG(2, "windowLog : %u   => frameSizeTarget : %u      ", params.cParams.windowLog, (U32)frameSizeTarget);
-    DEBUGLOG(2, "nbFrames : %u   (size : %u bytes)   ", nbFrames, (U32)avgFrameSize);
+    DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes  ", params.cParams.windowLog, (U32)chunkTargetSize);
+    DEBUGLOG(2, "nbChunks  : %2u   (chunkSize : %u bytes)   ", nbChunks, (U32)avgChunkSize);
+    params.fParams.contentSizeFlag = 1;
 
     {   unsigned u;
-        for (u=0; u<nbFrames; u++) {
-            size_t const frameSize = MIN(remainingSrcSize, avgFrameSize);
-            size_t const dstBufferCapacity = u ? ZSTD_compressBound(frameSize) : dstCapacity;
+        for (u=0; u<nbChunks; u++) {
+            size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
+            size_t const dstBufferCapacity = u ? ZSTD_compressBound(chunkSize) : dstCapacity;
             buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity };
-            ZSTD_CCtx* cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
+            ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool);   /* should check for NULL ! */
 
             mtctx->jobs[u].srcStart = srcStart + frameStartPos;
-            mtctx->jobs[u].srcSize = frameSize;
+            mtctx->jobs[u].srcSize = chunkSize;
             mtctx->jobs[u].fullFrameSize = srcSize;
-            mtctx->jobs[u].compressionLevel = compressionLevel;
+            mtctx->jobs[u].params = params;
             mtctx->jobs[u].dstBuff = dstBuffer;
             mtctx->jobs[u].cctx = cctx;
-            mtctx->jobs[u].frameID = (u>0) | ((u==nbFrames-1)<<1);
+            mtctx->jobs[u].firstChunk = (u==0);
+            mtctx->jobs[u].lastChunk = (u==nbChunks-1);
             mtctx->jobs[u].jobCompleted = 0;
             mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
             mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
 
-            DEBUGLOG(3, "posting job %u   (%u bytes)", u, (U32)frameSize);
-            POOL_add(mtctx->factory, ZSTDMT_compressFrame, &mtctx->jobs[u]);
+            DEBUGLOG(3, "posting job %u   (%u bytes)", u, (U32)chunkSize);
+            POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
 
-            frameStartPos += frameSize;
-            remainingSrcSize -= frameSize;
+            frameStartPos += chunkSize;
+            remainingSrcSize -= chunkSize;
     }   }
-    /* note : since nbFrames <= nbThreads, all jobs should be running immediately in parallel */
+    /* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */
 
-    {   unsigned frameID;
+    {   unsigned chunkID;
         size_t dstPos = 0;
-        for (frameID=0; frameID<nbFrames; frameID++) {
-            DEBUGLOG(3, "ready to write frame %u ", frameID);
+        for (chunkID=0; chunkID<nbChunks; chunkID++) {
+            DEBUGLOG(3, "ready to write chunk %u ", chunkID);
 
             PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
-            while (mtctx->jobs[frameID].jobCompleted==0) {
-                DEBUGLOG(4, "waiting for jobCompleted signal from frame %u", frameID);
+            while (mtctx->jobs[chunkID].jobCompleted==0) {
+                DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID);
                 pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
             }
             pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
 
-            ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[frameID].cctx);
-            {   size_t const cSize = mtctx->jobs[frameID].cSize;
+            ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx);
+            {   size_t const cSize = mtctx->jobs[chunkID].cSize;
                 if (ZSTD_isError(cSize)) return cSize;
                 if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall);
-                if (frameID) {   /* note : frame 0 is already written directly into dst */
-                    memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, cSize);
-                    ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[frameID].dstBuff);
+                if (chunkID) {   /* note : chunk 0 is already written directly into dst */
+                    memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize);
+                    ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff);
                 }
                 dstPos += cSize ;
             }
@@ -317,3 +324,89 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
     }
 
 }
+
+
+/* ====================================== */
+/* =======      Streaming API     ======= */
+/* ====================================== */
+
+#if 0
+
+size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
+    zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
+    zcs->targetSectionSize = 1 << (zcs->params.cParams.windowLog + 2);
+    zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog);
+    zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);   /* check for NULL ! */
+    zcs->inBuff.current = 0;
+    zcs->doneJobID = 0;
+    zcs->nextJobID = 0;
+    return 0;
+}
+
+typedef struct {
+    buffer_t buffer;
+    unsigned current;
+} inBuff_t;
+
+
+size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
+{
+    /* fill input buffer */
+    {   size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.current);
+        memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.current, input->src, toLoad);
+        input->pos += toLoad;
+    }
+
+    if (zcs->inBuff.current == zcs->inBuffSize) {   /* filled enough : let's compress */
+        size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
+        buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->targetSectionSize); /* should check for NULL */
+        ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);   /* should check for NULL */
+        unsigned const jobID = zcs->nextJobID & zcs->jobIDmask;
+
+        zcs->jobs[jobID].srcStart = zcs->inBuff.start;
+        zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
+        zcs->jobs[jobID].fullFrameSize = 0;
+        zcs->jobs[jobID].compressionLevel = zcs->compressionLevel;
+        zcs->jobs[jobID].dstBuff = dstBuffer;
+        zcs->jobs[jobID].cctx = cctx;
+        zcs->jobs[jobID].frameID = (jobID>0);
+        zcs->jobs[jobID].jobCompleted = 0;
+        zcs->jobs[jobID].dstFlushed = 0;
+        zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
+        zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+
+        /* get a new buffer for next input - save remaining into it */
+        zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);   /* check for NULL ! */
+        zcs->inBuff.current = zcs->inBuffSize - zcs->targetSectionSize;
+        memcpy(zcs->inBuff.buffer.start, (char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.current);
+
+        DEBUGLOG(3, "posting job %u   (%u bytes)", jobID, (U32)zcs->jobs[jobID].srcSize);
+        POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
+        zcs->nextJobID++;
+    }
+
+    /* check if there is any data available to flush */
+    {   unsigned const jobID = zcs->doneJobID & zcs->jobIDmask;
+        ZSTDMT_jobDescription job = zcs->jobs[jobID];
+        if (job.jobCompleted) {   /* job completed : output can be flushed */
+            size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+            ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL;   /* release cctx for future task */
+            free(job.srcStart); zcs->jobs[jobID].srcStart = NULL; /* note : need a buff_t for release */
+            memcpy((char*)output->dst + output->pos, job.dstBuff.start + job.dstFlushed, toWrite);
+            output->pos += toWrite;
+            job.dstFlushed += toWrite;
+            if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => next one */
+                ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
+                zcs->doneJobID++;
+            } else
+                zcs->jobs[jobID].dstFlushed = job.dstFlushed;
+    }   }
+
+    /* recommended next input size : fill current input buffer */
+    return zcs->inBuffSize - zcs->inBuff.current;
+}
+
+size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
+size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
+
+#endif
index 73ee379b8e159b8bb9a75fdfb261858a01d4a606..ca5d6b60135eef7fc719d01978bdf8d919f67f52 100644 (file)
@@ -1,12 +1,24 @@
 
+/* ===   Dependencies   === */
 #include <stddef.h>   /* size_t */
+#include "zstd.h"     /* ZSTD_inBuffer, ZSTD_outBuffer */
 
-typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
 
-ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads);
+/* ===   Simple one-pass functions   === */
+
+typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
+ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads);
 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx);
 
 size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
                            void* dst, size_t dstCapacity,
                      const void* src, size_t srcSize,
                            int compressionLevel);
+
+
+/* ===   Streaming functions   === */
+
+size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
+size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
+size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
+size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);